From abbba1fa6e6d659770dace37d1815ff616b0aa57 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Apr 2023 22:48:30 -0400 Subject: [PATCH] Pack startup pps into a table keyed by fqmes --- piker/clearing/_ems.py | 47 +++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 6d18b686..ad13e8f0 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -315,9 +315,6 @@ class TradesRelay(Struct): # allowed account names accounts: tuple[str] - # count of connected ems clients for this ``brokerd`` - consumers: int = 0 - class Router(Struct): ''' @@ -468,9 +465,15 @@ class Router(Struct): # client set. # locally cache and track positions per account with - # a table of (brokername, acctid) -> `BrokerdPosition` - # msgs. - pps = {} + # a nested table of msgs: + # tuple(brokername, acctid) -> + # (fqme: str -> + # `BrokerdPosition`) + relay = TradesRelay( + brokerd_stream=brokerd_trades_stream, + positions={}, + accounts=accounts, + ) for msg in positions: msg = BrokerdPosition(**msg) @@ -483,17 +486,10 @@ class Router(Struct): account = msg.account assert account in accounts - pps.setdefault( + relay.positions.setdefault( (broker, account), - [], - ).append(msg) - - relay = TradesRelay( - brokerd_stream=brokerd_trades_stream, - positions=pps, - accounts=accounts, - consumers=1, - ) + {}, + )[msg.symbol] = msg self.relays[broker] = relay @@ -521,8 +517,9 @@ class Router(Struct): ) -> tuple[TradesRelay, Feed]: ''' - Open and yield ``brokerd`` trades dialogue context-stream if - none already exists. + Maybe open a live feed to the target fqme, start `brokerd` order + msg relay and dark clearing tasks to run in the background + indefinitely. ''' broker, symbol, suffix = unpack_fqme(fqme) @@ -701,13 +698,12 @@ async def translate_and_relay_brokerd_events( f'Rx brokerd trade msg:\n' f'{fmsg}' ) - status_msg: Optional[Status] = None + status_msg: Status | None = None match brokerd_msg: # BrokerdPosition case { 'name': 'position', - 'symbol': sym, 'broker': broker, }: pos_msg = BrokerdPosition(**brokerd_msg) @@ -718,9 +714,9 @@ async def translate_and_relay_brokerd_events( relay.positions.setdefault( # NOTE: translate to a FQSN! - (broker, sym), - [] - ).append(pos_msg) + (broker, pos_msg.account), + {} + )[pos_msg.symbol] = pos_msg # fan-out-relay position msgs immediately by # broadcasting updates on all client streams @@ -787,12 +783,11 @@ async def translate_and_relay_brokerd_events( # no msg to client necessary continue - # BrokerdOrderError + # BrokerdError case { 'name': 'error', 'oid': oid, # ems order-dialog id 'reqid': reqid, # brokerd generated order-request id - 'symbol': sym, }: status_msg = book._active.get(oid) msg = BrokerdError(**brokerd_msg)