Pack startup pps into a table keyed by fqmes

rekt_pps
Tyler Goodlet 2023-04-08 22:48:30 -04:00
parent 484565988d
commit abbba1fa6e
1 changed files with 21 additions and 26 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0) # Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -315,9 +315,6 @@ class TradesRelay(Struct):
# allowed account names # allowed account names
accounts: tuple[str] accounts: tuple[str]
# count of connected ems clients for this ``brokerd``
consumers: int = 0
class Router(Struct): class Router(Struct):
''' '''
@ -468,9 +465,15 @@ class Router(Struct):
# client set. # client set.
# locally cache and track positions per account with # locally cache and track positions per account with
# a table of (brokername, acctid) -> `BrokerdPosition` # a nested table of msgs:
# msgs. # tuple(brokername, acctid) ->
pps = {} # (fqme: str ->
# `BrokerdPosition`)
relay = TradesRelay(
brokerd_stream=brokerd_trades_stream,
positions={},
accounts=accounts,
)
for msg in positions: for msg in positions:
msg = BrokerdPosition(**msg) msg = BrokerdPosition(**msg)
@ -483,17 +486,10 @@ class Router(Struct):
account = msg.account account = msg.account
assert account in accounts assert account in accounts
pps.setdefault( relay.positions.setdefault(
(broker, account), (broker, account),
[], {},
).append(msg) )[msg.symbol] = msg
relay = TradesRelay(
brokerd_stream=brokerd_trades_stream,
positions=pps,
accounts=accounts,
consumers=1,
)
self.relays[broker] = relay self.relays[broker] = relay
@ -521,8 +517,9 @@ class Router(Struct):
) -> tuple[TradesRelay, Feed]: ) -> tuple[TradesRelay, Feed]:
''' '''
Open and yield ``brokerd`` trades dialogue context-stream if Maybe open a live feed to the target fqme, start `brokerd` order
none already exists. msg relay and dark clearing tasks to run in the background
indefinitely.
''' '''
broker, symbol, suffix = unpack_fqme(fqme) broker, symbol, suffix = unpack_fqme(fqme)
@ -701,13 +698,12 @@ async def translate_and_relay_brokerd_events(
f'Rx brokerd trade msg:\n' f'Rx brokerd trade msg:\n'
f'{fmsg}' f'{fmsg}'
) )
status_msg: Optional[Status] = None status_msg: Status | None = None
match brokerd_msg: match brokerd_msg:
# BrokerdPosition # BrokerdPosition
case { case {
'name': 'position', 'name': 'position',
'symbol': sym,
'broker': broker, 'broker': broker,
}: }:
pos_msg = BrokerdPosition(**brokerd_msg) pos_msg = BrokerdPosition(**brokerd_msg)
@ -718,9 +714,9 @@ async def translate_and_relay_brokerd_events(
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
(broker, sym), (broker, pos_msg.account),
[] {}
).append(pos_msg) )[pos_msg.symbol] = pos_msg
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
@ -787,12 +783,11 @@ async def translate_and_relay_brokerd_events(
# no msg to client necessary # no msg to client necessary
continue continue
# BrokerdOrderError # BrokerdError
case { case {
'name': 'error', 'name': 'error',
'oid': oid, # ems order-dialog id 'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
'symbol': sym,
}: }:
status_msg = book._active.get(oid) status_msg = book._active.get(oid)
msg = BrokerdError(**brokerd_msg) msg = BrokerdError(**brokerd_msg)