Only emit pps msg for trade triggering instrument

We can probably make this better (and with less file sys accesses) later
such that we keep a consistent pps state in mem and only write async
maybe from another side-task?
lifo_pps_ib
Tyler Goodlet 2022-06-16 15:31:22 -04:00
parent ecdc747ced
commit b6f344f34a
1 changed files with 14 additions and 18 deletions

View File

@ -71,7 +71,6 @@ from .api import (
Client, Client,
MethodProxy, MethodProxy,
) )
# from .feed import open_data_client
def pack_position( def pack_position(
@ -285,7 +284,6 @@ async def recv_trade_updates(
async def update_ledger_from_api_trades( async def update_ledger_from_api_trades(
trade_entries: list[dict[str, Any]], trade_entries: list[dict[str, Any]],
ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg
client: Union[Client, MethodProxy], client: Union[Client, MethodProxy],
) -> dict[str, Any]: ) -> dict[str, Any]:
@ -478,7 +476,6 @@ async def trades_dialogue(
trades = await proxy.trades() trades = await proxy.trades()
await update_ledger_from_api_trades( await update_ledger_from_api_trades(
trades, trades,
cids2pps, # pass these in to map to correct fqsns..
proxy, proxy,
) )
@ -542,24 +539,25 @@ async def emit_pp_update(
proxy = proxies[acctid] proxy = proxies[acctid]
await update_ledger_from_api_trades( await update_ledger_from_api_trades(
[trade_entry], [trade_entry],
cids2pps, # pass these in to map to correct fqsns..
proxy, proxy,
) )
# load all positions from `pps.toml`, cross check with # load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as # ib's positions data, and relay re-formatted pps as
# msgs to the ems. # msgs to the ems.
for acctid, by_fqsn in pp.get_pps( by_acct = pp.get_pps('ib', acctids={acctid})
'ib', by_fqsn = by_acct[acctid.strip('ib.')]
acctids={acctid},
).items():
for fqsn, p in by_fqsn.items():
if p.bsuid == trade_entry['contract']['conId']:
# should only be one right? # should only be one right?
msgs = await update_and_audit( msgs = await update_and_audit(
by_fqsn, {fqsn: p},
cids2pps, cids2pps,
validate=False, validate=False,
) )
for msg in msgs: msg = msgs[0]
break
await ems_stream.send(msg.dict()) await ems_stream.send(msg.dict())
@ -714,7 +712,6 @@ async def deliver_trade_events(
if comms or cost_already_rx: if comms or cost_already_rx:
# only send a pp update once we have a cost report # only send a pp update once we have a cost report
print("EMITTING PP")
await emit_pp_update( await emit_pp_update(
ems_stream, ems_stream,
trade_entry, trade_entry,
@ -739,7 +736,6 @@ async def deliver_trade_events(
) )
if fill_already_rx: if fill_already_rx:
print("EMITTING PP")
await emit_pp_update( await emit_pp_update(
ems_stream, ems_stream,
trade_entry, trade_entry,