diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 17f9be1a..e0917c85 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -289,7 +289,11 @@ class TradesRelay: brokerd_dialogue: tractor.MsgStream # map of symbols to dicts of accounts to pp msgs - positions: dict[str, dict[str, BrokerdPosition]] + positions: dict[ + # brokername, acctid + tuple[str, str], + list[BrokerdPosition], + ] # allowed account names accounts: tuple[str] @@ -461,18 +465,24 @@ async def open_brokerd_trades_dialogue( # normalizing them to EMS messages and relaying back to # the piker order client set. - # locally cache and track positions per account. + # locally cache and track positions per account with + # a table of (brokername, acctid) -> `BrokerdPosition` + # msgs. pps = {} for msg in positions: log.info(f'loading pp: {msg}') account = msg['account'] + + # TODO: better value error for this which + # dumps the account and message and states the + # mismatch.. assert account in accounts pps.setdefault( - f'{msg["symbol"]}.{broker}', - {} - )[account] = msg + (broker, account), + [], + ).append(msg) relay = TradesRelay( brokerd_dialogue=brokerd_trades_stream, @@ -578,11 +588,9 @@ async def translate_and_relay_brokerd_events( relay.positions.setdefault( # NOTE: translate to a FQSN! - f'{sym}.{broker}', - {} - ).setdefault( - pos_msg['account'], {} - ).update(pos_msg) + (broker, sym), + [] + ).append(pos_msg) # fan-out-relay position msgs immediately by # broadcasting updates on all client streams @@ -635,8 +643,8 @@ async def translate_and_relay_brokerd_events( # something is out of order, we don't have an oid for # this broker-side message. log.error( - 'Unknown oid:{oid} for msg:\n' - f'{pformat(brokerd_msg)}' + f'Unknown oid: {oid} for msg:\n' + f'{pformat(brokerd_msg)}\n' 'Unable to relay message to client side!?' ) @@ -1088,15 +1096,12 @@ async def _emsd_main( brokerd_stream = relay.brokerd_dialogue # .clone() - # flatten out collected pps from brokerd for delivery - pp_msgs = { - fqsn: list(pps.values()) - for fqsn, pps in relay.positions.items() - } - # signal to client that we're started and deliver # all known pps and accounts for this ``brokerd``. - await ems_ctx.started((pp_msgs, list(relay.accounts))) + await ems_ctx.started(( + relay.positions, + list(relay.accounts), + )) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index f5a85d64..e9ed3499 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -579,9 +579,9 @@ async def open_order_mode( providers=symbol.brokers ) - # XXX: ``brokerd`` delivers a set of account names that it allows - # use of but the user also can define the accounts they'd like - # to use, in order, in their `brokers.toml` file. + # XXX: ``brokerd`` delivers a set of account names that it + # allows use of but the user also can define the accounts they'd + # like to use, in order, in their `brokers.toml` file. accounts = {} for name in brokerd_accounts: # ensure name is in ``brokers.toml`` @@ -594,10 +594,21 @@ async def open_order_mode( iter(accounts.keys()) ) if accounts else 'paper' + # Pack position messages by account, should only be one-to-one. # NOTE: requires the backend exactly specifies # the expected symbol key in its positions msg. - pp_msgs = position_msgs.get(symkey, ()) - pps_by_account = {msg['account']: msg for msg in pp_msgs} + pps_by_account = {} + for (broker, acctid), msgs in position_msgs.items(): + for msg in msgs: + + sym = msg['symbol'] + if ( + sym == symkey or + # mega-UGH, i think we need to fix the FQSN stuff sooner + # then later.. + sym == symkey.removesuffix(f'.{broker}') + ): + pps_by_account[acctid] = msg # update pp trackers with data relayed from ``brokerd``. for account_name in accounts: