From 3991d8f9112ccb8699c8691651bc17122d51934e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jun 2022 11:56:49 -0400 Subject: [PATCH] Add `update_and_audit()` in prep for rt per-trade-event pp udpates --- piker/brokers/ib/broker.py | 174 ++++++++++++++++++++++--------------- 1 file changed, 102 insertions(+), 72 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index aa3fa07c..c2967d1b 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -267,8 +267,9 @@ async def recv_trade_updates( async def update_ledger_from_api_trades( - clients: list[Union[Client, MethodProxy]], + trade_entries: dict[str, Any], ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg + client: Union[Client, MethodProxy], ) -> dict[str, Any]: @@ -279,34 +280,34 @@ async def update_ledger_from_api_trades( # retreive new trade executions from the last session # and/or day's worth of trading and convert into trade # records suitable for a local ledger file. - trades_by_account: dict = {} - for client in clients: + # trades_by_account: dict = {} + # for client in clients: - trade_entries = await client.trades() + # trade_entries = await client.trades() - # XXX; ERRGGG.. - # pack in the "primary/listing exchange" value from a - # contract lookup since it seems this isn't available by - # default from the `.fills()` method endpoint... - for entry in trade_entries: - condict = entry['contract'] - conid = condict['conId'] - pexch = condict['primaryExchange'] + # XXX; ERRGGG.. + # pack in the "primary/listing exchange" value from a + # contract lookup since it seems this isn't available by + # default from the `.fills()` method endpoint... + for entry in trade_entries: + condict = entry['contract'] + conid = condict['conId'] + pexch = condict['primaryExchange'] - if not pexch: - con = (await client.get_con(conid=conid))[0] - pexch = con.primaryExchange + if not pexch: + con = (await client.get_con(conid=conid))[0] + pexch = con.primaryExchange - entry['listingExchange'] = pexch + entry['listingExchange'] = pexch - records = trades_to_records( - conf['accounts'].inverse, - trade_entries, - ) - trades_by_account.update(records) + records = trades_to_records( + conf['accounts'].inverse, + trade_entries, + ) + # trades_by_account.update(records) # write recent session's trades to the user's (local) ledger file. - for acctid, trades_by_id in trades_by_account.items(): + for acctid, trades_by_id in records.items(): with pp.open_trade_ledger('ib', acctid) as ledger: ledger.update(trades_by_id) @@ -327,7 +328,76 @@ async def update_ledger_from_api_trades( ) r.fqsn = normed_msg.symbol - pp.update_pps_conf('ib', acctid, records) + active = pp.update_pps_conf('ib', acctid, records) + + return active + + +async def update_and_audit( + by_fqsn: dict[str, pp.Position], + cids2pps: dict[int, BrokerdPosition], + +) -> list[BrokerdPosition]: + + msgs: list[BrokerdPosition] = [] + pps: dict[int, pp.Position] = {} + + for fqsn, p in by_fqsn.items(): + bsuid = p.bsuid + + # build trade-session-actor local table + # of pps from unique symbol ids. + pps[bsuid] = p + + # retreive equivalent ib reported position message + # for comparison/audit versus the piker equivalent + # breakeven pp calcs. + ibppmsg = cids2pps[bsuid] + + msg = BrokerdPosition( + broker='ib', + + # XXX: ok so this is annoying, we're relaying + # an account name with the backend suffix prefixed + # but when reading accounts from ledgers we don't + # need it and/or it's prefixed in the section + # table.. + account=ibppmsg.account, + # XXX: the `.ib` is stripped..? + symbol=ibppmsg.symbol, + currency=ibppmsg.currency, + size=p.size, + avg_price=p.avg_price, + ) + ibsize = ibppmsg.size + pikersize = msg.size + diff = pikersize - ibsize + + # if ib reports a lesser pp it's not as bad since we can + # presume we're at least not more in the shit then we + # thought. + if diff: + raise ValueError( + f'POSITION MISMATCH ib <-> piker ledger:\n' + f'ib: {msg}\n' + f'piker: {ibppmsg}\n' + 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + ) + msg.size = ibsize + + if ibppmsg.avg_price != msg.avg_price: + + # TODO: make this a "propoganda" log level? + log.warning( + 'The mega-cucks at IB want you to believe with their ' + f'"FIFO" positioning for {msg.symbol}:\n' + f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n' + f'piker, LIFO breakeven PnL price: {msg.avg_price}' + ) + + msgs.append(msg) + + return msgs @tractor.context @@ -389,7 +459,6 @@ async def trades_dialogue( # money.. xb for client in aioclients.values(): for pos in client.positions(): - cid, msg = pack_position(pos) acctid = msg.account = accounts_def.inverse[msg.account] used_accounts.add(acctid) @@ -399,61 +468,21 @@ async def trades_dialogue( # update trades ledgers for all accounts from # connected api clients. - await update_ledger_from_api_trades( - proxies.values(), - cids2pps, # pass these in to map to correct fqsns.. - ) + for account, proxy in proxies.items(): + await update_ledger_from_api_trades( + await proxy.trades(), + cids2pps, # pass these in to map to correct fqsns.. + proxy, + ) # load all positions from `pps.toml`, cross check with ib's # positions data, and relay re-formatted pps as msgs to the ems. for acctid, by_fqsn in pp.get_pps( 'ib', acctids=used_accounts, ).items(): - for fqsn, posdict in by_fqsn.items(): - ibppmsg = cids2pps[posdict['bsuid']] - msg = BrokerdPosition( - broker='ib', - # XXX: ok so this is annoying, we're relaying - # an account name with the backend suffix prefixed - # but when reading accounts from ledgers we don't - # need it and/or it's prefixed in the section - # table.. - account=ibppmsg.account, - # XXX: the `.ib` is stripped..? - symbol=ibppmsg.symbol, - currency=ibppmsg.currency, - size=posdict['size'], - avg_price=posdict['avg_price'], - ) - print(msg) - ibsize = ibppmsg.size - pikersize = msg.size - diff = pikersize - ibsize - - # if ib reports a lesser pp it's not as bad since we can - # presume we're at least not more in the shit then we - # thought. - if diff: - raise ValueError( - f'POSITION MISMATCH ib <-> piker ledger:\n' - f'ib: {ibsize}\n' - f'piker: {pikersize}\n' - 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' - ) - msg.size = ibsize - - if ibppmsg.avg_price != msg.avg_price: - - # TODO: make this a "propoganda" log level? - log.warning( - 'The mega-cucks at IB want you to believe with their ' - f'"FIFO" positioning for {msg.symbol}:\n' - f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n' - f'piker, LIFO breakeven PnL price: {msg.avg_price}' - ) - - all_positions.append(msg.dict()) + msgs = await update_and_audit(by_fqsn, cids2pps) + all_positions.extend(msg.dict() for msg in msgs) if not all_positions and cids2pps: raise RuntimeError( @@ -621,6 +650,7 @@ async def deliver_trade_events( continue elif event_name == 'position': + cid, msg = pack_position(item) msg.account = accounts_def.inverse[msg.account]