Ugh, positions relay hotfix

Must have run into some confusion with data structures in `brokerd` vs.
`emsd`. This fixes the ems `relay.positions` state tracking to be
composed maps, vs. messages from `brokerd` should just be a sequence.
fsp_feeds
Tyler Goodlet 2021-09-12 19:30:43 -04:00
parent cecba8904d
commit 6acfd6c38a
2 changed files with 6 additions and 5 deletions

View File

@ -1465,7 +1465,7 @@ async def trades_dialogue(
global _client_cache global _client_cache
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
all_positions = {} all_positions = []
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
for account, client in _accounts2clients.items(): for account, client in _accounts2clients.items():
@ -1480,9 +1480,7 @@ async def trades_dialogue(
for client in _client_cache.values(): for client in _client_cache.values():
for pos in client.positions(): for pos in client.positions():
msg = pack_position(pos) msg = pack_position(pos)
all_positions.setdefault( all_positions.append(msg.dict())
msg.symbol, []
).append(msg.dict())
await ctx.started(all_positions) await ctx.started(all_positions)

View File

@ -429,10 +429,13 @@ async def open_brokerd_trades_dialogue(
# by receiving order submission response messages, # by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to # normalizing them to EMS messages and relaying back to
# the piker order client set. # the piker order client set.
pps = {}
for msg in positions:
pps.setdefault(msg['symbol'], {})['account'] = msg
relay = TradesRelay( relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream, brokerd_dialogue=brokerd_trades_stream,
positions=positions, positions=pps,
consumers=1 consumers=1
) )