From f9c4b3cc96019fe53674f0886c0915689643c5fd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 22 Jun 2022 18:18:02 -0400 Subject: [PATCH] Fixes for newly opened and closed pps Before we weren't emitting pp msgs when a position went back to "net zero" (aka the size is zero) nor when a new one was opened (wasn't previously loaded from the `pps.toml`). This reworks a bunch of the incremental update logic as well as ports to the changes in the `piker.pp` module: - rename a few of the normalizing helpers to be more explicit. - drop calling `pp.get_pps()` in the trades dialog task and instead create msgs iteratively, per account, by iterating through collected position and API trade records and calling instead `pp.update_pps_conf()`. - always from-ledger-update both positions reported from ib's pp sys and session api trades detected on ems-trade-dialog startup. - `update_ledger_from_api_trades()` now does **just** that: only updates the trades ledger and returns the transaction set. - `update_and_audit_msgs()` now only the input list of msgs and properly generates new msgs for newly created positions that weren't previously loaded from the `pps.toml`. --- piker/brokers/ib/broker.py | 257 ++++++++++++++++++++++--------------- 1 file changed, 156 insertions(+), 101 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 661022fe..b6d780de 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -286,8 +286,7 @@ async def update_ledger_from_api_trades( trade_entries: list[dict[str, Any]], client: Union[Client, MethodProxy], -) -> dict[str, Any]: - +) -> dict[str, pp.Transaction]: # construct piker pps from trade ledger, underneath using # LIFO style breakeven pricing calcs. conf = get_config() @@ -312,95 +311,110 @@ async def update_ledger_from_api_trades( entry['listingExchange'] = pexch - records = trades_to_records( + entries = trades_to_ledger_entries( conf['accounts'].inverse, trade_entries, ) - actives = {} # write recent session's trades to the user's (local) ledger file. - for acctid, trades_by_id in records.items(): - + records: dict[str, pp.Transactions] = {} + for acctid, trades_by_id in entries.items(): with pp.open_trade_ledger('ib', acctid) as ledger: ledger.update(trades_by_id) - # normalize - records = norm_trade_records(trades_by_id) + # normalize to transaction form + records[acctid] = norm_trade_records(trades_by_id) - # (incrementally) update the user's pps in mem and - # in the `pps.toml`. - active = pp.update_pps_conf('ib', acctid, records) - actives.update(active) - - return actives + return records -async def update_and_audit( - acctid: str, - by_fqsn: dict[str, pp.Position], +async def update_and_audit_msgs( + acctid: str, # no `ib.` prefix is required! + pps: list[pp.Position], cids2pps: dict[tuple[str, int], BrokerdPosition], validate: bool = False, ) -> list[BrokerdPosition]: msgs: list[BrokerdPosition] = [] - pps: dict[int, pp.Position] = {} + # pps: dict[int, pp.Position] = {} - for fqsn, p in by_fqsn.items(): + for p in pps: bsuid = p.bsuid # build trade-session-actor local table # of pps from unique symbol ids. - pps[bsuid] = p + # pps[bsuid] = p # retreive equivalent ib reported position message # for comparison/audit versus the piker equivalent # breakeven pp calcs. - ibppmsg = cids2pps[(acctid, bsuid)] + ibppmsg = cids2pps.get((acctid, bsuid)) - msg = BrokerdPosition( - broker='ib', + if ibppmsg: + msg = BrokerdPosition( + broker='ib', - # XXX: ok so this is annoying, we're relaying - # an account name with the backend suffix prefixed - # but when reading accounts from ledgers we don't - # need it and/or it's prefixed in the section - # table.. - account=ibppmsg.account, - # XXX: the `.ib` is stripped..? - symbol=ibppmsg.symbol, - currency=ibppmsg.currency, - size=p.size, - avg_price=p.be_price, - ) - msgs.append(msg) - - if validate: - ibsize = ibppmsg.size - pikersize = msg.size - diff = pikersize - ibsize - - # if ib reports a lesser pp it's not as bad since we can - # presume we're at least not more in the shit then we - # thought. - if diff: - raise ValueError( - f'POSITION MISMATCH ib <-> piker ledger:\n' - f'ib: {ibppmsg}\n' - f'piker: {msg}\n' - 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' - ) - msg.size = ibsize - - if ibppmsg.avg_price != msg.avg_price: - - # TODO: make this a "propoganda" log level? - log.warning( - 'The mega-cucks at IB want you to believe with their ' - f'"FIFO" positioning for {msg.symbol}:\n' - f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n' - f'piker, LIFO breakeven PnL price: {msg.avg_price}' + # XXX: ok so this is annoying, we're relaying + # an account name with the backend suffix prefixed + # but when reading accounts from ledgers we don't + # need it and/or it's prefixed in the section + # table.. + account=ibppmsg.account, + # XXX: the `.ib` is stripped..? + symbol=ibppmsg.symbol, + currency=ibppmsg.currency, + size=p.size, + avg_price=p.be_price, ) + msgs.append(msg) + + if validate: + ibsize = ibppmsg.size + pikersize = msg.size + diff = pikersize - ibsize + + # if ib reports a lesser pp it's not as bad since we can + # presume we're at least not more in the shit then we + # thought. + if diff: + raise ValueError( + f'POSITION MISMATCH ib <-> piker ledger:\n' + f'ib: {ibppmsg}\n' + f'piker: {msg}\n' + 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + ) + msg.size = ibsize + + if ibppmsg.avg_price != msg.avg_price: + + # TODO: make this a "propoganda" log level? + log.warning( + 'The mega-cucks at IB want you to believe with their ' + f'"FIFO" positioning for {msg.symbol}:\n' + f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n' + f'piker, LIFO breakeven PnL price: {msg.avg_price}' + ) + + else: + # make brand new message + msg = BrokerdPosition( + broker='ib', + + # XXX: ok so this is annoying, we're relaying + # an account name with the backend suffix prefixed + # but when reading accounts from ledgers we don't + # need it and/or it's prefixed in the section + # table.. we should just strip this from the message + # right since `.broker` is already included? + account=f'ib.{acctid}', + # XXX: the `.ib` is stripped..? + symbol=p.symbol.front_fqsn(), + # currency=ibppmsg.currency, + size=p.size, + avg_price=p.be_price, + ) + msgs.append(msg) return msgs @@ -455,7 +469,7 @@ async def trades_dialogue( accounts.add(account) cids2pps: dict[str, BrokerdPosition] = {} - active_accts: set[str] = set() + update_records: dict[str, bidict] = {} # process pp value reported from ib's system. we only use these # to cross-check sizing since average pricing on their end uses @@ -464,39 +478,67 @@ async def trades_dialogue( # money.. xb for client in aioclients.values(): for pos in client.positions(): + cid, msg = pack_position(pos) acctid = msg.account = accounts_def.inverse[msg.account] - active_accts.add(acctid) - cids2pps[(acctid.strip('ib.'), cid)] = msg + acctid = acctid.strip('ib.') + cids2pps[(acctid, cid)] = msg assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') + # collect all ib-pp reported positions so that we can be + # sure know which positions to update from the ledger if + # any are missing from the ``pps.toml`` + update_records.setdefault(acctid, bidict())[cid] = msg.symbol + # update trades ledgers for all accounts from - # connected api clients. + # connected api clients which report trades for **this session**. + new_trades = {} for account, proxy in proxies.items(): trades = await proxy.trades() - if trades: - await update_ledger_from_api_trades( - trades, - proxy, - ) + new_trades.update(await update_ledger_from_api_trades( + trades, + proxy, + )) + + for acctid, trans in new_trades.items(): + for t in trans: + bsuid = t.bsuid + if bsuid in update_records: + assert update_records[bsuid] == t.fqsn + else: + update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn # load all positions from `pps.toml`, cross check with ib's # positions data, and relay re-formatted pps as msgs to the ems. - pps_by_account = pp.get_pps('ib', acctids=active_accts) - - for acctid, by_fqsn in pps_by_account.items(): - msgs = await update_and_audit( + # __2 cases__: + # - new trades have taken place this session that we want to + # always reprocess indempotently, + # - no new trades yet but we want to reload and audit any + # positions reported by ib's sys that may not yet be in + # piker's ``pps.toml`` state-file. + for acctid, to_update in update_records.items(): + trans = new_trades.get(acctid) + active, closed = pp.update_pps_conf( + 'ib', acctid, - by_fqsn, - cids2pps, - validate=True, + trade_records=trans, + ledger_reload=to_update, ) - all_positions.extend(msg.dict() for msg in msgs) + for pps in [active, closed]: + msgs = await update_and_audit_msgs( + acctid, + pps.values(), + cids2pps, + validate=True, + ) + all_positions.extend(msg.dict() for msg in msgs) if not all_positions and cids2pps: raise RuntimeError( - 'Positions report by ib but not found in `pps.toml` !?') + 'Positions reported by ib but not found in `pps.toml`!?\n' + f'{pformat(cids2pps)}' + ) # log.info(f'Loaded {len(trades)} from this session') # TODO: write trades to local ``trades.toml`` @@ -543,26 +585,39 @@ async def emit_pp_update( # compute and relay incrementally updated piker pp acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] proxy = proxies[acctid] - await update_ledger_from_api_trades( + + acctname = acctid.strip('ib.') + records = (await update_ledger_from_api_trades( [trade_entry], proxy, - ) - # load all positions from `pps.toml`, cross check with - # ib's positions data, and relay re-formatted pps as - # msgs to the ems. - by_acct = pp.get_pps('ib', acctids={acctid}) - acctname = acctid.strip('ib.') - by_fqsn = by_acct[acctname] + ))[acctname] + r = records[0] - for fqsn, p in by_fqsn.items(): - if p.bsuid == trade_entry['contract']['conId']: - # should only be one right? - msgs = await update_and_audit( - acctname, - {fqsn: p}, - cids2pps, - validate=False, - ) + # update and load all positions from `pps.toml`, cross check with + # ib's positions data, and relay re-formatted pps as msgs to the + # ems. we report both the open and closed updates in one map since + # for incremental update we may have just fully closed a pp and need + # to relay that msg as well! + active, closed = pp.update_pps_conf( + 'ib', + acctname, + trade_records=records, + ledger_reload={r.bsuid: r.fqsn}, + ) + + for pos in filter( + bool, + [active.get(r.bsuid), closed.get(r.bsuid)] + ): + msgs = await update_and_audit_msgs( + acctname, + [pos], + cids2pps, + + # ib pp event might not have arrived yet + validate=False, + ) + if msgs: msg = msgs[0] break @@ -669,7 +724,7 @@ async def deliver_trade_events( # TODO: # - normalize out commissions details? # - this is the same as the unpacking loop above in - # ``trades_to_records()`` no? + # ``trades_to_ledger_entries()`` no? trade_entry = ids2fills.setdefault(execid, {}) cost_already_rx = bool(trade_entry) @@ -800,7 +855,7 @@ async def deliver_trade_events( def norm_trade_records( ledger: dict[str, Any], -) -> dict[str, list[pp.Transaction]]: +) -> list[pp.Transaction]: ''' Normalize a flex report or API retrieved executions ledger into our standard record format. @@ -899,7 +954,7 @@ def norm_trade_records( return records -def trades_to_records( +def trades_to_ledger_entries( accounts: bidict, trade_entries: list[object], source_type: str = 'api', @@ -1026,7 +1081,7 @@ def load_flex_trades( # log.info(f'Loaded {ln} trades from flex query') print(f'Loaded {ln} trades from flex query') - trades_by_account = trades_to_records( + trades_by_account = trades_to_ledger_entries( # get reverse map to user account names conf['accounts'].inverse, trade_entries,