diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 9be0e13e..e4ac0598 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -61,7 +61,7 @@ from piker.accounting import ( TransactionLedger, iter_by_dt, open_pps, - PpTable, + Account, ) from piker.clearing._messages import ( Order, @@ -287,6 +287,9 @@ async def recv_trade_updates( await client.ib.disconnectedEvent +# TODO: maybe we should allow the `trade_entries` input to be +# a list of the actual `Contract` types instead, though a couple +# other callers will need to be changed as well. async def update_ledger_from_api_trades( trade_entries: list[dict[str, Any]], client: Union[Client, MethodProxy], @@ -383,25 +386,33 @@ async def update_and_audit_msgs( # if ib reports a lesser pp it's not as bad since we can # presume we're at least not more in the shit then we # thought. - if diff and pikersize: - reverse_split_ratio = pikersize / ibsize - split_ratio = 1/reverse_split_ratio + if ( + diff + and ( + pikersize + or ibsize + ) + ): + # if 'mbt.cme' in msg.symbol: + # await tractor.pause() - if split_ratio >= reverse_split_ratio: - entry = f'split_ratio = {int(split_ratio)}' - else: - entry = f'split_ratio = 1/{int(reverse_split_ratio)}' + # reverse_split_ratio = pikersize / ibsize + # split_ratio = 1/reverse_split_ratio + # if split_ratio >= reverse_split_ratio: + # entry = f'split_ratio = {int(split_ratio)}' + # else: + # entry = f'split_ratio = 1/{int(reverse_split_ratio)}' msg.size = ibsize - logmsg: str = ( f'Pos mismatch in ib vs. the piker ledger!\n' f'IB:\n{ibfmtmsg}\n\n' f'PIKER:\n{pikerfmtmsg}\n\n' - 'If you are expecting a (reverse) split in this ' - 'instrument you should probably put the following' - 'in the `pps.toml` section:\n' - f'{entry}\n' + + # 'If you are expecting a (reverse) split in this ' + # 'instrument you should probably put the following' + # 'in the `pps.toml` section:\n' + # f'{entry}\n' # f'reverse_split_ratio: {reverse_split_ratio}\n' # f'split_ratio: {split_ratio}\n\n' ) @@ -416,8 +427,9 @@ async def update_and_audit_msgs( # TODO: make this a "propaganda" log level? log.warning( f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' - f'ib: {ibppmsg.avg_price}\n' - f'piker: {msg.avg_price}' + f'ib: {pformat(ibppmsg)}\n' + '---------------------------\n' + f'piker: {msg.to_dict()}' ) else: @@ -537,6 +549,9 @@ async def open_trade_dialog( accounts_def = config.load_accounts(['ib']) + # TODO: do this as part of `open_account()`!? + from piker.data._symcache import open_symcache + global _client_cache # deliver positions to subscriber before anything else @@ -550,12 +565,13 @@ async def open_trade_dialog( proxies, aioclients, ), + open_symcache('ib', only_from_memcache=True) as symcache, ): # Open a trade ledgers stack for appending trade records over # multiple accounts. # TODO: we probably want to generalize this into a "ledgers" api.. ledgers: dict[str, dict] = {} - tables: dict[str, PpTable] = {} + tables: dict[str, Account] = {} order_msgs: list[Status] = [] conf = get_config() accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse @@ -582,8 +598,12 @@ async def open_trade_dialog( parsers={ 'dateTime': parse_flex_dt, 'datetime': pendulum.parse, + # for some some fucking 2022 and + # back options records...fuck me. + 'date': pendulum.parse, }, ), + symcache=symcache, ) ) @@ -616,7 +636,7 @@ async def open_trade_dialog( ) acctid: str = account.strip('ib.') ledger: dict = ledgers[acctid] - table: PpTable = tables[acctid] + table: Account = tables[acctid] # update position table with latest ledger from all # gathered transactions: ledger file + api records. @@ -624,9 +644,8 @@ async def open_trade_dialog( # update trades ledgers for all accounts from connected # api clients which report trades for **this session**. - api_trades = await proxy.trades() + api_trades: list[dict] = await proxy.trades() if api_trades: - api_trans_by_acct: dict[str, Transaction] api_to_ledger_entries: dict[str, dict] ( @@ -660,7 +679,10 @@ async def open_trade_dialog( trans.update(api_trans) # update account (and thus pps) from all gathered transactions - table.update_from_trans(trans) + table.update_from_ledger( + trans, + symcache=ledger.symcache, + ) # process pp value reported from ib's system. we only # use these to cross-check sizing since average pricing @@ -772,7 +794,7 @@ async def emit_pp_update( cids2pps: dict, ledgers: dict[str, dict[str, Any]], - tables: dict[str, PpTable], + acnts: dict[str, Account], ) -> None: @@ -794,9 +816,11 @@ async def emit_pp_update( tx: Transaction = list(trans.values())[0] acctid = fq_acctid.strip('ib.') - table = tables[acctid] - table.update_from_trans(trans) - active, closed = table.dump_active() + acnt = acnts[acctid] + + acnt.update_from_ledger(trans) + + active, closed = acnt.dump_active() # NOTE: update ledger with all new trades for fq_acctid, trades_by_id in api_to_ledger_entries.items(): diff --git a/piker/brokers/ib/ledger.py b/piker/brokers/ib/ledger.py index 805cdaf6..e12bab13 100644 --- a/piker/brokers/ib/ledger.py +++ b/piker/brokers/ib/ledger.py @@ -224,7 +224,6 @@ def norm_trade_records( # `trades_dialogue()` above). trans = Transaction( fqme=fqme, - sym=pair, tid=tid, size=size, price=price,