Pack startup pps into a table keyed by fqmes

pre_overruns_ctxcancelled
Tyler Goodlet 2023-04-08 22:48:30 -04:00
parent ac6eac046a
commit 6163067b38
1 changed files with 21 additions and 26 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -315,9 +315,6 @@ class TradesRelay(Struct):
# allowed account names
accounts: tuple[str]
# count of connected ems clients for this ``brokerd``
consumers: int = 0
class Router(Struct):
'''
@ -468,9 +465,15 @@ class Router(Struct):
# client set.
# locally cache and track positions per account with
# a table of (brokername, acctid) -> `BrokerdPosition`
# msgs.
pps = {}
# a nested table of msgs:
# tuple(brokername, acctid) ->
# (fqme: str ->
# `BrokerdPosition`)
relay = TradesRelay(
brokerd_stream=brokerd_trades_stream,
positions={},
accounts=accounts,
)
for msg in positions:
msg = BrokerdPosition(**msg)
@ -483,17 +486,10 @@ class Router(Struct):
account = msg.account
assert account in accounts
pps.setdefault(
relay.positions.setdefault(
(broker, account),
[],
).append(msg)
relay = TradesRelay(
brokerd_stream=brokerd_trades_stream,
positions=pps,
accounts=accounts,
consumers=1,
)
{},
)[msg.symbol] = msg
self.relays[broker] = relay
@ -521,8 +517,9 @@ class Router(Struct):
) -> tuple[TradesRelay, Feed]:
'''
Open and yield ``brokerd`` trades dialogue context-stream if
none already exists.
Maybe open a live feed to the target fqme, start `brokerd` order
msg relay and dark clearing tasks to run in the background
indefinitely.
'''
broker, symbol, suffix = unpack_fqme(fqme)
@ -701,13 +698,12 @@ async def translate_and_relay_brokerd_events(
f'Rx brokerd trade msg:\n'
f'{fmsg}'
)
status_msg: Optional[Status] = None
status_msg: Status | None = None
match brokerd_msg:
# BrokerdPosition
case {
'name': 'position',
'symbol': sym,
'broker': broker,
}:
pos_msg = BrokerdPosition(**brokerd_msg)
@ -718,9 +714,9 @@ async def translate_and_relay_brokerd_events(
relay.positions.setdefault(
# NOTE: translate to a FQSN!
(broker, sym),
[]
).append(pos_msg)
(broker, pos_msg.account),
{}
)[pos_msg.symbol] = pos_msg
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
@ -787,12 +783,11 @@ async def translate_and_relay_brokerd_events(
# no msg to client necessary
continue
# BrokerdOrderError
# BrokerdError
case {
'name': 'error',
'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id
'symbol': sym,
}:
status_msg = book._active.get(oid)
msg = BrokerdError(**brokerd_msg)