From 95dd0e6bd69fc2669fe42c23657536a84afb14b2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Jul 2022 16:09:32 -0400 Subject: [PATCH] `ib` rt pps update hotfix.. Not sure this didn't get caught in usage, but basically real-time updates got broken by a rework of `update_ledger_from_api_trades()`. The issue is that the ledger was being updated **before** calling `piker.pp.update_pps_conf()` which resulted in the `Position.size` not being updated correctly since the [latest added] clears passed in via the `trade_records` arg were already found in the `.clears` table and thus were causing the loop to skip the `Position.lifo_update()` call.. The solution here is to not update the ledger **until after** we call `update_pps_conf()` - it's more read/writes but it's correct and we figure out a less io heavy way to do the file writing later. Further this includes a fix to avoid double emitting a pp update caused by non-thorough logic that waits for a commission report to arrive during a fill event; previously we were emitting the same message twice due to the lack of a check for an existing comms report in the case where the report arrives *after* the fill. --- piker/brokers/ib/broker.py | 64 ++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 911d399d..b768c9a1 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -295,7 +295,10 @@ async def update_ledger_from_api_trades( trade_entries: list[dict[str, Any]], client: Union[Client, MethodProxy], -) -> dict[str, pp.Transaction]: +) -> tuple[ + dict[str, pp.Transaction], + dict[str, dict], +]: conf = get_config() @@ -326,14 +329,12 @@ async def update_ledger_from_api_trades( # write recent session's trades to the user's (local) ledger file. records: dict[str, pp.Transactions] = {} - for acctid, trades_by_id in entries.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) + for acctid, trades_by_id in entries.items(): # normalize to transaction form records[acctid] = norm_trade_records(trades_by_id) - return records + return records, entries async def update_and_audit_msgs( @@ -518,10 +519,14 @@ async def trades_dialogue( new_trades = {} for account, proxy in proxies.items(): trades = await proxy.trades() - new_trades.update(await update_ledger_from_api_trades( + ( + records_by_acct, + ledger_entries, + ) = await update_ledger_from_api_trades( trades, proxy, - )) + ) + new_trades.update(records_by_acct) for acctid, trans in new_trades.items(): for t in trans: @@ -573,6 +578,16 @@ async def trades_dialogue( tuple(name for name in accounts_def if name in accounts), )) + # TODO: maybe just write on teardown? + # we might also want to delegate a specific actor for + # ledger writing / reading for speed? + + # write ledger with all new trades **AFTER** we've updated the + # `pps.toml` from the original ledger state! + for acctid, trades_by_id in ledger_entries.items(): + with pp.open_trade_ledger('ib', acctid) as ledger: + ledger.update(trades_by_id) + async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, @@ -609,10 +624,11 @@ async def emit_pp_update( proxy = proxies[acctid] acctname = acctid.strip('ib.') - records = (await update_ledger_from_api_trades( + records_by_acct, ledger_entries = await update_ledger_from_api_trades( [trade_entry], proxy, - ))[acctname] + ) + records = records_by_acct[acctname] r = records[0] # update and load all positions from `pps.toml`, cross check with @@ -627,6 +643,12 @@ async def emit_pp_update( ledger_reload={r.bsuid: r.fqsn}, ) + # NOTE: write ledger with all new trades **AFTER** we've updated the + # `pps.toml` from the original ledger state! + for acctid, trades_by_id in ledger_entries.items(): + with pp.open_trade_ledger('ib', acctid) as ledger: + ledger.update(trades_by_id) + for pos in filter( bool, [active.get(r.bsuid), closed.get(r.bsuid)] @@ -760,7 +782,7 @@ async def deliver_trade_events( { 'contract': asdict(fill.contract), 'execution': asdict(fill.execution), - 'commissionReport': asdict(fill.commissionReport), + # 'commissionReport': asdict(fill.commissionReport), # supposedly server fill time? 'broker_time': execu.time, 'name': 'ib', @@ -813,14 +835,22 @@ async def deliver_trade_events( trade_entry = ids2fills.setdefault(execid, {}) fill_already_rx = bool(trade_entry) - # no fill msg has arrived yet so just fill out the - # cost report for now and when the fill arrives a pp - # msg can be emitted. - trade_entry.update( - {'commissionReport': asdict(cr)} - ) + # only fire a pp msg update if, + # - we haven't already + # - the fill event has already arrived + # but it didn't yet have a commision report + # which we fill in now. + if ( + fill_already_rx + and 'commissionReport' not in trade_entry + ): + # no fill msg has arrived yet so just fill out the + # cost report for now and when the fill arrives a pp + # msg can be emitted. + trade_entry.update( + {'commissionReport': asdict(cr)} + ) - if fill_already_rx: await emit_pp_update( ems_stream, trade_entry,