diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index d40857a5..a9d04156 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1465,7 +1465,7 @@ async def trades_dialogue( global _client_cache # deliver positions to subscriber before anything else - all_positions = {} + all_positions = [] clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] for account, client in _accounts2clients.items(): @@ -1480,9 +1480,7 @@ async def trades_dialogue( for client in _client_cache.values(): for pos in client.positions(): msg = pack_position(pos) - all_positions.setdefault( - msg.symbol, [] - ).append(msg.dict()) + all_positions.append(msg.dict()) await ctx.started(all_positions) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3c689ff4..3df1cd00 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -429,10 +429,13 @@ async def open_brokerd_trades_dialogue( # by receiving order submission response messages, # normalizing them to EMS messages and relaying back to # the piker order client set. + pps = {} + for msg in positions: + pps.setdefault(msg['symbol'], {})['account'] = msg relay = TradesRelay( brokerd_dialogue=brokerd_trades_stream, - positions=positions, + positions=pps, consumers=1 )