From b6f344f34a75b329d649cc2bd99704e4713485f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 16 Jun 2022 15:31:22 -0400 Subject: [PATCH] Only emit pps msg for trade triggering instrument We can probably make this better (and with less file sys accesses) later such that we keep a consistent pps state in mem and only write async maybe from another side-task? --- piker/brokers/ib/broker.py | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index a8484941..eac7a8a4 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -71,7 +71,6 @@ from .api import ( Client, MethodProxy, ) -# from .feed import open_data_client def pack_position( @@ -285,7 +284,6 @@ async def recv_trade_updates( async def update_ledger_from_api_trades( trade_entries: list[dict[str, Any]], - ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg client: Union[Client, MethodProxy], ) -> dict[str, Any]: @@ -478,7 +476,6 @@ async def trades_dialogue( trades = await proxy.trades() await update_ledger_from_api_trades( trades, - cids2pps, # pass these in to map to correct fqsns.. proxy, ) @@ -542,25 +539,26 @@ async def emit_pp_update( proxy = proxies[acctid] await update_ledger_from_api_trades( [trade_entry], - cids2pps, # pass these in to map to correct fqsns.. proxy, ) # load all positions from `pps.toml`, cross check with # ib's positions data, and relay re-formatted pps as # msgs to the ems. - for acctid, by_fqsn in pp.get_pps( - 'ib', - acctids={acctid}, - ).items(): + by_acct = pp.get_pps('ib', acctids={acctid}) + by_fqsn = by_acct[acctid.strip('ib.')] - # should only be one right? - msgs = await update_and_audit( - by_fqsn, - cids2pps, - validate=False, - ) - for msg in msgs: - await ems_stream.send(msg.dict()) + for fqsn, p in by_fqsn.items(): + if p.bsuid == trade_entry['contract']['conId']: + # should only be one right? + msgs = await update_and_audit( + {fqsn: p}, + cids2pps, + validate=False, + ) + msg = msgs[0] + break + + await ems_stream.send(msg.dict()) async def deliver_trade_events( @@ -714,7 +712,6 @@ async def deliver_trade_events( if comms or cost_already_rx: # only send a pp update once we have a cost report - print("EMITTING PP") await emit_pp_update( ems_stream, trade_entry, @@ -739,7 +736,6 @@ async def deliver_trade_events( ) if fill_already_rx: - print("EMITTING PP") await emit_pp_update( ems_stream, trade_entry,