From 909e06812143acdac3c1761833957ab72631e8a2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Sep 2022 16:28:50 -0400 Subject: [PATCH] Support multi-client order-dialog management This patch was originally to fix a bug where new clients who re-connected to an `emsd` that was running a paper engine were not getting updates from new fills and/or cancels. It turns out the solution is more general: now, any client that creates a order dialog will be subscribing to receive updates on the order flow set mapped for that symbol/instrument as long as the client has registered for that particular fqsn with the EMS. This means re-connecting clients as well as "monitoring" clients can see the same orders, alerts, fills and clears. Impl details: - change all var names spelled as `dialogues` -> `dialogs` to be murican. - make `Router.dialogs: dict[str, defaultdict[str, list]]` so that each dialog id (oid) maps to a set of potential subscribing ems clients. - add `Router.fqsn2dialogs: dict[str, list[str]]` a map of fqsn entries to sets of oids. - adjust all core task code to make appropriate lookups into these 2 new tables instead of being handed specific client streams as input. - start the `translate_and_relay_brokerd_events` task as a daemon task that lives with the particular `TradesRelay` such that dialogs cleared while no client is connected are still processed. - rename `TradesRelay.brokerd_dialogue` -> `.brokerd_stream` - broadcast all status msgs to all subscribed clients in the relay loop. - always de-reg each client stream from the `Router.dialogs` table on close. --- piker/clearing/_ems.py | 190 ++++++++++++++++++++++------------------- 1 file changed, 102 insertions(+), 88 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 9e6be589..70eece72 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -18,7 +18,11 @@ In da suit parlances: "Execution management systems" """ -from collections import defaultdict, ChainMap +from __future__ import annotations +from collections import ( + defaultdict, + ChainMap, +) from contextlib import asynccontextmanager from math import isnan from pprint import pformat @@ -134,12 +138,6 @@ class _DarkBook(Struct): # _ems_entries: dict[str, str] = {} _active: dict = {} - # mapping of ems dialog ids to msg flow history - _msgflows: defaultdict[ - int, - ChainMap[dict[str, dict]], - ] = defaultdict(ChainMap) - _ems2brokerd_ids: dict[str, str] = bidict() @@ -152,8 +150,8 @@ _DEFAULT_SIZE: float = 1.0 async def clear_dark_triggers( + router: Router, brokerd_orders_stream: tractor.MsgStream, - ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa broker: str, fqsn: str, @@ -288,15 +286,16 @@ async def clear_dark_triggers( book._active[oid] = status # send response to client-side - try: - await ems_client_order_stream.send(status) - except ( - trio.ClosedResourceError, - ): - log.warning( - f'{ems_client_order_stream} stream broke?' - ) - break + for client_stream in router.dialogs[oid]: + try: + await client_stream.send(status) + except ( + trio.ClosedResourceError, + ): + log.warning( + f'{client_stream} stream broke?' + ) + break else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -310,7 +309,7 @@ class TradesRelay(Struct): # for now we keep only a single connection open with # each ``brokerd`` for simplicity. - brokerd_dialogue: tractor.MsgStream + brokerd_stream: tractor.MsgStream # map of symbols to dicts of accounts to pp msgs positions: dict[ @@ -342,13 +341,28 @@ class Router(Struct): # order id to client stream map clients: set[tractor.MsgStream] = set() - dialogues: dict[ - str, - list[tractor.MsgStream] - ] = {} - # brokername to trades-dialogues streams with ``brokerd`` actors - relays: dict[str, TradesRelay] = {} + fqsn2dialogs: defaultdict[ + str, # fqsn + list[str], # oids + ] = defaultdict(list) + + dialogs: defaultdict[ + str, # ems uuid (oid) + list[tractor.MsgStream] # client side msg stream + ] = defaultdict(list) + + # mapping of ems dialog ids to msg flow history + msgflows: defaultdict[ + str, + ChainMap[dict[str, dict]], + ] = defaultdict(ChainMap) + + # brokername to trades-dialogs streams with ``brokerd`` actors + relays: dict[ + str, # broker name + TradesRelay, + ] = {} def get_dark_book( self, @@ -373,7 +387,8 @@ class Router(Struct): none already exists. ''' - relay: TradesRelay = self.relays.get(feed.mod.name) + broker = feed.mod.name + relay: TradesRelay = self.relays.get(broker) if ( relay is None @@ -387,7 +402,7 @@ class Router(Struct): ): relay = await self.nursery.start( - open_brokerd_trades_dialogue, + open_brokerd_trades_dialog, self, feed, symbol, @@ -395,18 +410,23 @@ class Router(Struct): loglevel, ) + self.nursery.start_soon( + translate_and_relay_brokerd_events, + broker, + relay.brokerd_stream, + self, + ) + relay.consumers += 1 # TODO: get updated positions here? - assert relay.brokerd_dialogue + assert relay.brokerd_stream try: yield relay - finally: # TODO: what exactly needs to be torn down here or # are we just consumer tracking? - relay.consumers -= 1 async def client_broadcast( @@ -429,7 +449,7 @@ class Router(Struct): _router: Router = None -async def open_brokerd_trades_dialogue( +async def open_brokerd_trades_dialog( router: Router, feed: Feed, @@ -505,7 +525,7 @@ async def open_brokerd_trades_dialogue( # we cache the relay task and instead of running multiple # tasks (which will result in multiples of the same msg being # relayed for each EMS client) we just register each client - # stream to this single relay loop using _router.dialogues + # stream to this single relay loop in the dialog table. # begin processing order events from the target brokerd backend # by receiving order submission response messages, @@ -532,7 +552,7 @@ async def open_brokerd_trades_dialogue( ).append(msg) relay = TradesRelay( - brokerd_dialogue=brokerd_trades_stream, + brokerd_stream=brokerd_trades_stream, positions=pps, accounts=accounts, consumers=1, @@ -550,8 +570,8 @@ async def open_brokerd_trades_dialogue( await trio.sleep_forever() finally: - # parent context must have been closed - # remove from cache so next client will respawn if needed + # parent context must have been closed remove from cache so + # next client will respawn if needed relay = _router.relays.pop(broker, None) if not relay: log.warning(f'Relay for {broker} was already removed!?') @@ -608,7 +628,7 @@ async def translate_and_relay_brokerd_events( book: _DarkBook = router.get_dark_book(broker) relay: TradesRelay = router.relays[broker] - assert relay.brokerd_dialogue == brokerd_trades_stream + assert relay.brokerd_stream == brokerd_trades_stream brokerd_msg: dict[str, Any] async for brokerd_msg in brokerd_trades_stream: @@ -707,11 +727,14 @@ async def translate_and_relay_brokerd_events( # some unexpected failure - something we need to think more # about. In most default situations, with composed orders # (ex. brackets), most brokers seem to use a oca policy. - ems_client_order_stream = router.dialogues[oid] + ems_client_order_streams = router.dialogs[oid] + status_msg.resp = 'error' status_msg.brokerd_msg = msg book._active[oid] = status_msg - await ems_client_order_stream.send(status_msg) + + for stream in ems_client_order_streams: + await stream.send(status_msg) # BrokerdStatus case { @@ -732,15 +755,15 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_stream = router.dialogues.get(oid) + ems_client_order_streams = router.dialogs[oid] status_msg = book._active.get(oid) if ( - not ems_client_order_stream + not ems_client_order_streams or not status_msg ): log.warning( - f'Received status for unknown dialog {oid}:\n' + f'Received status for untracked dialog {oid}:\n' f'{fmsg}' ) continue @@ -759,7 +782,9 @@ async def translate_and_relay_brokerd_events( status_msg.reqid = reqid # THIS LINE IS CRITICAL! status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await ems_client_order_stream.send(status_msg) + + for stream in ems_client_order_streams: + await stream.send(status_msg) if status == 'closed': log.info(f'Execution for {oid} is complete!') @@ -793,7 +818,7 @@ async def translate_and_relay_brokerd_events( msg = BrokerdFill(**brokerd_msg) log.info(f'Fill for {oid} cleared with:\n{fmsg}') - ems_client_order_stream = router.dialogues[oid] + ems_client_order_streams = router.dialogs[oid] # XXX: bleh, a fill can come after 'closed' from `ib`? # only send a late fill event we haven't already closed @@ -803,7 +828,10 @@ async def translate_and_relay_brokerd_events( status_msg.resp = 'fill' status_msg.reqid = reqid status_msg.brokerd_msg = msg - await ems_client_order_stream.send(status_msg) + + for stream in ems_client_order_streams: + await stream.send(status_msg) + # await ems_client_order_stream.send(status_msg) # ``Status`` containing an embedded order msg which # should be loaded as a "pre-existing open order" from the @@ -903,11 +931,6 @@ async def translate_and_relay_brokerd_events( # if status_msg is not None: # del status_msg - # TODO: do we want this to keep things cleaned up? - # it might require a special status from brokerd to affirm the - # flow is complete? - # router.dialogues.pop(oid) - async def process_client_order_cmds( @@ -921,7 +944,7 @@ async def process_client_order_cmds( ) -> None: - client_dialogues = router.dialogues + client_dialogs = router.dialogs # cmd: dict async for cmd in client_order_stream: @@ -934,7 +957,11 @@ async def process_client_order_cmds( # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory # others who are registered for such order affiliated msgs). - client_dialogues[oid] = client_order_stream + subs = client_dialogs[oid] + if client_order_stream not in subs: + subs.append(client_order_stream) + + router.fqsn2dialogs[symbol].append(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid) # any dark/live status which is current @@ -1002,8 +1029,9 @@ async def process_client_order_cmds( status.req = cmd await client_order_stream.send(status) - # de-register this client dialogue - router.dialogues.pop(oid) + # de-register this order dialogue from all clients + router.dialogs[oid].clear() + router.dialogs.pop(oid) dark_book._active.pop(oid) else: @@ -1034,8 +1062,8 @@ async def process_client_order_cmds( if status is not None: # if we already had a broker order id then # this is likely an order update commmand. - log.info(f"Modifying live {broker} order: {reqid}") reqid = status.reqid + log.info(f"Modifying live {broker} order: {reqid}") status.req = req status.resp = 'pending' @@ -1252,11 +1280,10 @@ async def _emsd_main( loglevel, ) as relay, - trio.open_nursery() as n, ): - brokerd_stream = relay.brokerd_dialogue # .clone() + brokerd_stream = relay.brokerd_stream # signal to client that we're started and deliver # all known pps and accounts for this ``brokerd``. @@ -1268,26 +1295,20 @@ async def _emsd_main( # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates - async with ems_ctx.open_stream() as ems_client_order_stream: + async with ems_ctx.open_stream() as client_stream: - # register the client side before startingn the + # register the client side before starting the # brokerd-side relay task to ensure the client is # delivered all exisiting open orders on startup. - _router.clients.add(ems_client_order_stream) - - n.start_soon( - translate_and_relay_brokerd_events, - broker, - brokerd_stream, - _router, - ) + _router.clients.add(client_stream) + for oid in _router.fqsn2dialogs[fqsn]: + _router.dialogs[oid].append(client_stream) # trigger scan and exec loop n.start_soon( clear_dark_triggers, - + _router, brokerd_stream, - ems_client_order_stream, quote_stream, broker, fqsn, # form: ... @@ -1295,16 +1316,11 @@ async def _emsd_main( ) # start inbound (from attached client) order request processing + # main entrypoint, run here until cancelled. try: - - # main entrypoint, run here until cancelled. await process_client_order_cmds( - - ems_client_order_stream, - - # relay.brokerd_dialogue, + client_stream, brokerd_stream, - fqsn, feed, dark_book, @@ -1314,28 +1330,26 @@ async def _emsd_main( finally: # try to remove client from "registry" try: - _router.clients.remove(ems_client_order_stream) + _router.clients.remove(client_stream) except KeyError: log.warning( - f'Stream {ems_client_order_stream._ctx.chan.uid}' + f'Stream {client_stream._ctx.chan.uid}' ' was already dropped?' ) - dialogues = _router.dialogues + dialogs = _router.dialogs + for oid, client_streams in dialogs.items(): + if client_stream in client_streams: + client_streams.remove(client_stream) - for oid, client_stream in dialogues.copy().items(): - - if client_stream == ems_client_order_stream: - - log.warning( - f'client dialogue is being abandoned:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - ) - dialogues.pop(oid) - - # TODO: for order dialogues left "alive" in + # TODO: for order dialogs left "alive" in # the ems this is where we should allow some # system to take over management. Likely we # want to allow the user to choose what kind # of policy to use (eg. cancel all orders # from client, run some algo, etc.) + if not client_streams: + log.warning( + f'Order dialog is being unmonitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + )