From aa5f25231a1b3b360788721d8f06d5741201f959 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Mar 2023 12:52:53 -0400 Subject: [PATCH] `ib`: never override existing ledger records If user has loaded from a flex report then we don't want the API records from the same period to override those; instead just update with any missing fields from the API schema. Also, always `str`-ify the contract id (what is set for the `.bs_mktid` *before* packing into transaction type to ensure when serialized to `pps.toml` there are no discrepancies at the codec level.. smh --- piker/brokers/ib/broker.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 7cd857d7..fa94044c 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -342,12 +342,6 @@ async def update_and_audit_msgs( # retreive equivalent ib reported position message # for comparison/audit versus the piker equivalent # breakeven pp calcs. - # if ( - # acctid == 'reg' - # and bs_mktid == 36285627 - # ): - # await tractor.breakpoint() - ibppmsg = cids2pps.get((acctid, bs_mktid)) if ibppmsg: @@ -777,15 +771,15 @@ async def emit_pp_update( proxies: dict, cids2pps: dict, - ledgers, - tables, + ledgers: dict[str, dict[str, Any]], + tables: dict[str, PpTable], ) -> None: # compute and relay incrementally updated piker pp accounts_def_inv: bidict[str, str] = accounts_def.inverse - acctid = accounts_def_inv[trade_entry['execution']['acctNumber']] - proxy = proxies[acctid] + fq_acctid = accounts_def_inv[trade_entry['execution']['acctNumber']] + proxy = proxies[fq_acctid] ( records_by_acct, api_to_ledger_entries, @@ -794,9 +788,10 @@ async def emit_pp_update( proxy, accounts_def_inv, ) - trans = records_by_acct[acctid] + trans = records_by_acct[fq_acctid] r = list(trans.values())[0] + acctid = fq_acctid.strip('ib.') table = tables[acctid] table.update_from_trans(trans) active, closed = table.dump_active() @@ -804,7 +799,11 @@ async def emit_pp_update( # NOTE: update ledger with all new trades for acctid, trades_by_id in api_to_ledger_entries.items(): ledger = ledgers[acctid] - ledger.update(trades_by_id) + + for tid, tdict in trades_by_id.items(): + # NOTE: don't override flex/previous entries with new API + # ones, just update with new fields! + ledger.setdefaults(tid, {}).update(tdict) # generate pp msgs and cross check with ib's positions data, relay # re-formatted pps as msgs to the ems. @@ -909,8 +908,8 @@ async def deliver_trade_events( # https://github.com/erdewit/ib_insync/issues/363 # acctid = accounts_def.inverse[trade.order.account] - # # double check there is no error when - # # cancelling.. gawwwd + # double check there is no error when + # cancelling.. gawwwd # if ib_status_key == 'cancelled': # last_log = trade.log[-1] # if ( @@ -1050,6 +1049,7 @@ async def deliver_trade_events( accounts_def, proxies, cids2pps, + ledgers, tables, ) @@ -1084,6 +1084,7 @@ async def deliver_trade_events( accounts_def, proxies, cids2pps, + ledgers, tables, ) @@ -1145,7 +1146,7 @@ async def deliver_trade_events( def norm_trade_records( ledger: dict[str, Any], -) -> list[Transaction]: +) -> dict[str, Transaction]: ''' Normalize a flex report or API retrieved executions ledger into our standard record format. @@ -1275,7 +1276,7 @@ def norm_trade_records( cost=comms, dt=dt, expiry=expiry, - bs_mktid=conid, + bs_mktid=str(conid), ), key=lambda t: t.dt )