From 3be1d610e091ffe42a86063b20916bbe352d6211 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 24 Jun 2023 17:12:43 -0400 Subject: [PATCH] ib: expose trade EP as `open_trade_dialog()` Should be the final production backend to switch this over B) Also tidy up the `update_and_audit_msgs()` validator to log vs. raise when `validate: bool` is set; turn it off by default to avoid raises until we figure out wtf is up with ib ledger processing or wtv.. --- piker/brokers/ib/__init__.py | 4 +- piker/brokers/ib/_util.py | 2 +- piker/brokers/ib/api.py | 3 +- piker/brokers/ib/broker.py | 90 ++++++++++++++++++++---------------- 4 files changed, 56 insertions(+), 43 deletions(-) diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 80bc228f..1acef974 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -34,12 +34,12 @@ from .feed import ( stream_quotes, ) from .broker import ( - trades_dialogue, + open_trade_dialog, ) __all__ = [ 'get_client', - 'trades_dialogue', + 'open_trade_dialog', 'open_history_client', 'open_symbol_search', 'stream_quotes', diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index f23aa99b..05417d98 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -29,7 +29,7 @@ import subprocess import tractor -from .._util import get_logger +from piker.brokers._util import get_logger if TYPE_CHECKING: from .api import Client diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 171578aa..fd0d024d 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -85,8 +85,9 @@ import numpy as np # non-relative for backends so that non-builting backends # can be easily modelled after this style B) from piker import config -from piker.brokers._util import ( +from ._util import ( log, + # only for the ib_sync internal logging get_logger, ) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index d6c36133..21d4baa5 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -60,6 +60,7 @@ from piker.accounting import ( Position, Transaction, open_trade_ledger, + TransactionLedger, iter_by_dt, open_pps, PpTable, @@ -78,10 +79,10 @@ from piker.clearing._messages import ( from piker.accounting import ( MktPair, ) +from ._util import log from .api import ( _accounts2clients, con2fqme, - log, get_config, open_client_proxies, Client, @@ -90,6 +91,7 @@ from .api import ( from ._flex_reports import parse_flex_dt + def pack_position( pos: IbPosition @@ -339,7 +341,7 @@ async def update_and_audit_msgs( acctid: str, # no `ib.` prefix is required! pps: list[Position], cids2pps: dict[tuple[str, int], BrokerdPosition], - validate: bool = False, + validate: bool = True, ) -> list[BrokerdPosition]: @@ -352,9 +354,9 @@ async def update_and_audit_msgs( # for comparison/audit versus the piker equivalent # breakeven pp calcs. ibppmsg = cids2pps.get((acctid, bs_mktid)) - if ibppmsg: - symbol = ibppmsg.symbol + + symbol: str = ibppmsg.symbol msg = BrokerdPosition( broker='ib', @@ -375,36 +377,41 @@ async def update_and_audit_msgs( ibfmtmsg = pformat(ibppmsg.to_dict()) pikerfmtmsg = pformat(msg.to_dict()) - if validate: - ibsize = ibppmsg.size - pikersize = msg.size - diff = pikersize - ibsize + ibsize = ibppmsg.size + pikersize = msg.size + diff = pikersize - ibsize - # if ib reports a lesser pp it's not as bad since we can - # presume we're at least not more in the shit then we - # thought. - if diff and pikersize: - reverse_split_ratio = pikersize / ibsize - split_ratio = 1/reverse_split_ratio + # if ib reports a lesser pp it's not as bad since we can + # presume we're at least not more in the shit then we + # thought. + if diff and pikersize: + reverse_split_ratio = pikersize / ibsize + split_ratio = 1/reverse_split_ratio - if split_ratio >= reverse_split_ratio: - entry = f'split_ratio = {int(split_ratio)}' - else: - entry = f'split_ratio = 1/{int(reverse_split_ratio)}' + if split_ratio >= reverse_split_ratio: + entry = f'split_ratio = {int(split_ratio)}' + else: + entry = f'split_ratio = 1/{int(reverse_split_ratio)}' - # raise ValueError( - log.error( - f'Pos mismatch in ib vs. the piker ledger!\n' - f'IB:\n{ibfmtmsg}\n\n' - f'PIKER:\n{pikerfmtmsg}\n\n' - 'If you are expecting a (reverse) split in this ' - 'instrument you should probably put the following' - 'in the `pps.toml` section:\n' - f'{entry}\n' - # f'reverse_split_ratio: {reverse_split_ratio}\n' - # f'split_ratio: {split_ratio}\n\n' - ) - msg.size = ibsize + msg.size = ibsize + + logmsg: str = ( + f'Pos mismatch in ib vs. the piker ledger!\n' + f'IB:\n{ibfmtmsg}\n\n' + f'PIKER:\n{pikerfmtmsg}\n\n' + 'If you are expecting a (reverse) split in this ' + 'instrument you should probably put the following' + 'in the `pps.toml` section:\n' + f'{entry}\n' + # f'reverse_split_ratio: {reverse_split_ratio}\n' + # f'split_ratio: {split_ratio}\n\n' + ) + + if validate: + raise ValueError(logmsg) + else: + # await tractor.pause() + log.error(logmsg) if ibppmsg.avg_price != msg.avg_price: # TODO: make this a "propaganda" log level? @@ -432,12 +439,16 @@ async def update_and_audit_msgs( size=p.size, avg_price=p.ppu, ) - if validate and p.size: - # raise ValueError( - log.error( + if p.size: + logmsg: str = ( f'UNEXPECTED POSITION says IB => {msg.symbol}\n' 'Maybe they LIQUIDATED YOU or are missing ledger entries?\n' ) + log.error(logmsg) + + # if validate: + # raise ValueError(logmsg) + msgs.append(msg) return msgs @@ -520,10 +531,8 @@ async def open_trade_event_stream( @tractor.context -async def trades_dialogue( - +async def open_trade_dialog( ctx: tractor.Context, - # loglevel: str = None, ) -> AsyncIterator[dict[str, Any]]: @@ -575,6 +584,7 @@ async def trades_dialogue( # open ledger and pptable wrapper for each # detected account. + ledger: TransactionLedger ledger = ledgers[acctid] = lstack.enter_context( open_trade_ledger( 'ib', @@ -643,13 +653,14 @@ async def trades_dialogue( # TODO: fix this `tractor` BUG! # https://github.com/goodboy/tractor/issues/354 - # await tractor.breakpoint() + # await tractor.pp() if trade_entries: # write ledger with all new api_trades # **AFTER** we've updated the `pps.toml` # from the original ledger state! (i.e. this # is currently done on exit) + for tid, entry in trade_entries.items(): ledger.setdefault(tid, {}).update(entry) @@ -670,6 +681,7 @@ async def trades_dialogue( # -> collect all ib-pp reported positions so that we can be # sure know which positions to update from the ledger if # any are missing from the ``pps.toml`` + # await tractor.pp() pos: IbPosition # named tuple subtype for pos in client.positions(): @@ -702,7 +714,7 @@ async def trades_dialogue( acctid, pps.values(), cids2pps, - validate=True, + validate=False, ) all_positions.extend(msg for msg in msgs)