diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index d60a9d43..3303d9dc 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations +from bisect import insort from contextlib import ExitStack from dataclasses import asdict from functools import partial @@ -435,7 +436,6 @@ async def trades_dialogue( # deliver positions to subscriber before anything else all_positions = [] accounts = set() - clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] acctids = set() cids2pps: dict[str, BrokerdPosition] = {} @@ -486,7 +486,6 @@ async def trades_dialogue( # the so called (bs) "FIFO" style which more or less results in # a price that's not useful for traders who want to not lose # money.. xb - # for client in aioclients.values(): for pos in client.positions(): # collect all ib-pp reported positions so that we can be @@ -500,7 +499,9 @@ async def trades_dialogue( assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') + ledger = ledgers[acctid] table = tables[acctid] + pp = table.pps.get(bsuid) if ( not pp @@ -596,6 +597,7 @@ async def trades_dialogue( # proxy wrapper for starting trade event stream async def open_trade_event_stream( + client: Client, task_status: TaskStatus[ trio.abc.ReceiveChannel ] = trio.TASK_STATUS_IGNORED, @@ -613,18 +615,25 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): - trade_event_stream = await n.start(open_trade_event_stream) - clients.append((client, trade_event_stream)) - # start order request handler **before** local trades - # event loop - n.start_soon(handle_order_requests, ems_stream, accounts_def) + for client in aioclients.values(): + trade_event_stream = await n.start( + open_trade_event_stream, + client, + ) - # allocate event relay tasks for each client connection - for client, stream in clients: + # start order request handler **before** local trades + # event loop + n.start_soon( + handle_order_requests, + ems_stream, + accounts_def, + ) + + # allocate event relay tasks for each client connection n.start_soon( deliver_trade_events, - stream, + trade_event_stream, ems_stream, accounts_def, cids2pps, @@ -968,7 +977,7 @@ def norm_trade_records( ledger into our standard record format. ''' - records: dict[str, Transaction] = {} + records: list[Transaction] = [] for tid, record in ledger.items(): conid = record.get('conId') or record['conid'] @@ -1047,19 +1056,22 @@ def norm_trade_records( # should already have entries if the pps are still open, in # which case, we can pull the fqsn from that table (see # `trades_dialogue()` above). - - records[tid] = Transaction( - fqsn=fqsn, - tid=tid, - size=size, - price=price, - cost=comms, - dt=dt, - expiry=expiry, - bsuid=conid, + insort( + records, + Transaction( + fqsn=fqsn, + tid=tid, + size=size, + price=price, + cost=comms, + dt=dt, + expiry=expiry, + bsuid=conid, + ), + key=lambda t: t.dt ) - return records + return {r.tid: r for r in records} def trades_to_ledger_entries(