From 4877af9bc30352aee4d8671a5ce3cd8900977e5e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Oct 2022 12:54:10 -0400 Subject: [PATCH] Add pub-sub broadcasting Establishes a more formalized subscription based fan out pattern to ems clients who subscribe for order flow for a particular symbol (the fqsn is the default subscription key for now). Make `Router.client_broadcast()` take a `sub_key: str` value which determines the set of clients to forward a message to and drop all such manually defined broadcast loops from task (func) code. Also add `.get_subs()` which (hackily) allows getting the set of clients for a given sub key where any stream that is detected as "closed" is discarded in the output. Further we simplify to `Router.dialogs: defaultdict[str, set[tractor.MsgStream]]` and `.subscriptions` as maps to sets of streams for much easier broadcast management/logic using set operations inside `.client_broadcast()`. --- piker/clearing/_ems.py | 180 ++++++++++++++++++++++++++--------------- 1 file changed, 113 insertions(+), 67 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 70eece72..23b50ddf 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -21,7 +21,7 @@ In da suit parlances: "Execution management systems" from __future__ import annotations from collections import ( defaultdict, - ChainMap, + # ChainMap, ) from contextlib import asynccontextmanager from math import isnan @@ -286,16 +286,10 @@ async def clear_dark_triggers( book._active[oid] = status # send response to client-side - for client_stream in router.dialogs[oid]: - try: - await client_stream.send(status) - except ( - trio.ClosedResourceError, - ): - log.warning( - f'{client_stream} stream broke?' - ) - break + await router.client_broadcast( + fqsn, + status, + ) else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -342,21 +336,24 @@ class Router(Struct): # order id to client stream map clients: set[tractor.MsgStream] = set() - fqsn2dialogs: defaultdict[ - str, # fqsn - list[str], # oids - ] = defaultdict(list) + # sets of clients mapped from subscription keys + subscribers: defaultdict[ + str, # sub key, default fqsn + set[tractor.MsgStream], # unique client streams + ] = defaultdict(set) + # sets of clients dynamically registered for specific + # order flows based on subscription config. dialogs: defaultdict[ str, # ems uuid (oid) - list[tractor.MsgStream] # client side msg stream - ] = defaultdict(list) + set[tractor.MsgStream] # client side msg stream + ] = defaultdict(set) - # mapping of ems dialog ids to msg flow history - msgflows: defaultdict[ - str, - ChainMap[dict[str, dict]], - ] = defaultdict(ChainMap) + # TODO: mapping of ems dialog ids to msg flow history + # msgflows: defaultdict[ + # str, + # ChainMap[dict[str, dict]], + # ] = defaultdict(ChainMap) # brokername to trades-dialogs streams with ``brokerd`` actors relays: dict[ @@ -372,6 +369,20 @@ class Router(Struct): return self.books.setdefault(brokername, _DarkBook(brokername)) + def get_subs( + self, + oid: str, + + ) -> set[tractor.MsgStream]: + ''' + Deliver list of non-closed subscriber client msg streams. + + ''' + return set( + stream for stream in self.dialogs[oid] + if not stream._closed + ) + @asynccontextmanager async def maybe_open_brokerd_trades_dialogue( self, @@ -431,20 +442,27 @@ class Router(Struct): async def client_broadcast( self, + sub_key: str, msg: dict, ) -> None: - for client_stream in self.clients.copy(): + to_remove: set[tractor.MsgStream] = set() + subs = self.subscribers[sub_key] + for client_stream in subs: try: await client_stream.send(msg) except ( trio.ClosedResourceError, trio.BrokenResourceError, ): + to_remove.add(client_stream) self.clients.remove(client_stream) log.warning( f'client for {client_stream} was already closed?') + if to_remove: + subs.difference_update(to_remove) + _router: Router = None @@ -558,7 +576,7 @@ async def open_brokerd_trades_dialog( consumers=1, ) - _router.relays[broker] = relay + router.relays[broker] = relay # the ems scan loop may be cancelled by the client but we # want to keep the ``brokerd`` dialogue up regardless @@ -572,7 +590,7 @@ async def open_brokerd_trades_dialog( finally: # parent context must have been closed remove from cache so # next client will respawn if needed - relay = _router.relays.pop(broker, None) + relay = router.relays.pop(broker, None) if not relay: log.warning(f'Relay for {broker} was already removed!?') @@ -627,7 +645,6 @@ async def translate_and_relay_brokerd_events( ''' book: _DarkBook = router.get_dark_book(broker) relay: TradesRelay = router.relays[broker] - assert relay.brokerd_stream == brokerd_trades_stream brokerd_msg: dict[str, Any] @@ -660,7 +677,7 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - await router.client_broadcast(pos_msg) + await router.client_broadcast(sym, pos_msg) continue # BrokerdOrderAck @@ -727,21 +744,18 @@ async def translate_and_relay_brokerd_events( # some unexpected failure - something we need to think more # about. In most default situations, with composed orders # (ex. brackets), most brokers seem to use a oca policy. - ems_client_order_streams = router.dialogs[oid] status_msg.resp = 'error' status_msg.brokerd_msg = msg book._active[oid] = status_msg - for stream in ems_client_order_streams: - await stream.send(status_msg) + await router.client_broadcast(sym, status_msg) # BrokerdStatus case { 'name': 'status', 'status': status, 'reqid': reqid, # brokerd generated order-request id - } if ( (oid := book._ems2brokerd_ids.inverse.get(reqid)) and status in ( @@ -755,7 +769,7 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_streams = router.dialogs[oid] + ems_client_order_streams = router.get_subs(oid) status_msg = book._active.get(oid) if ( @@ -783,8 +797,10 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - for stream in ems_client_order_streams: - await stream.send(status_msg) + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!') @@ -818,8 +834,6 @@ async def translate_and_relay_brokerd_events( msg = BrokerdFill(**brokerd_msg) log.info(f'Fill for {oid} cleared with:\n{fmsg}') - ems_client_order_streams = router.dialogs[oid] - # XXX: bleh, a fill can come after 'closed' from `ib`? # only send a late fill event we haven't already closed # out the dialog status locally. @@ -829,9 +843,10 @@ async def translate_and_relay_brokerd_events( status_msg.reqid = reqid status_msg.brokerd_msg = msg - for stream in ems_client_order_streams: - await stream.send(status_msg) - # await ems_client_order_stream.send(status_msg) + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) # ``Status`` containing an embedded order msg which # should be loaded as a "pre-existing open order" from the @@ -883,7 +898,10 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - await router.client_broadcast(status_msg) + await router.client_broadcast( + order.symbol, + status_msg, + ) # don't fall through continue @@ -937,15 +955,21 @@ async def process_client_order_cmds( client_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream, - symbol: str, + fqsn: str, feed: Feed, dark_book: _DarkBook, router: Router, ) -> None: + ''' + Client-dialog request loop: accept order requests and deliver + initial status msg responses to subscribed clients. - client_dialogs = router.dialogs + This task-loop handles both management of dark triggered orders and + alerts by inserting them into the "dark book"-table as well as + submitting live orders immediately if requested by the client. + ''' # cmd: dict async for cmd in client_order_stream: log.info(f'Received order cmd:\n{pformat(cmd)}') @@ -953,15 +977,17 @@ async def process_client_order_cmds( # CAWT DAMN we need struct support! oid = str(cmd['oid']) - # register this stream as an active dialogue for this order id - # such that translated message from the brokerd backend can be - # routed (relayed) to **just** that client stream (and in theory - # others who are registered for such order affiliated msgs). - subs = client_dialogs[oid] - if client_order_stream not in subs: - subs.append(client_order_stream) + # register this stream as an active order dialog (msg flow) for + # this order id such that translated message from the brokerd + # backend can be routed and relayed to subscribed clients. + subs = router.dialogs[oid] + + # add all subscribed clients for this fqsn (should eventually be + # a more generalize subscription system) to received order msg + # updates (and thus show stuff in the UI). + subs.add(client_order_stream) + subs.update(router.subscribers[fqsn]) - router.fqsn2dialogs[symbol].append(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid) # any dark/live status which is current @@ -973,7 +999,7 @@ async def process_client_order_cmds( 'action': 'cancel', 'oid': oid, } if ( - (status := dark_book._active.get(oid)) + status and status.resp in ('open', 'pending') ): reqid = status.reqid @@ -1009,11 +1035,11 @@ async def process_client_order_cmds( 'action': 'cancel', 'oid': oid, } if ( - status and status.resp == 'dark_open' - # or status and status.req + status + and status.resp == 'dark_open' ): # remove from dark book clearing - entry = dark_book.orders[symbol].pop(oid, None) + entry = dark_book.orders[fqsn].pop(oid, None) if entry: ( pred, @@ -1028,14 +1054,18 @@ async def process_client_order_cmds( status.resp = 'canceled' status.req = cmd - await client_order_stream.send(status) + await router.client_broadcast( + fqsn, + status, + ) + # de-register this order dialogue from all clients router.dialogs[oid].clear() router.dialogs.pop(oid) dark_book._active.pop(oid) else: - log.exception(f'No dark order for {symbol}?') + log.exception(f'No dark order for {fqsn}?') # TODO: eventually we should be receiving # this struct on the wire unpacked in a scoped protocol @@ -1194,7 +1224,12 @@ async def process_client_order_cmds( src='dark', ) dark_book._active[oid] = status - await client_order_stream.send(status) + + # broadcast status to all subscribed clients + await router.client_broadcast( + fqsn, + status, + ) @tractor.context @@ -1220,20 +1255,26 @@ async def _emsd_main( received in a stream from that client actor and then responses are streamed back up to the original calling task in the same client. - The primary ``emsd`` task tree is: + The primary ``emsd`` task trees are: - - ``_emsd_main()``: - sets up brokerd feed, order feed with ems client, trades dialogue with - brokderd trading api. - | - - ``clear_dark_triggers()``: - run (dark order) conditions on inputs and trigger brokerd "live" - order submissions. + - ``_setup_persistent_emsd()``: + is the ``emsd`` actor's primary root task which sets up an + actor-global ``Router`` instance and starts a relay loop task + which lives until the backend broker is shutdown or the ems is + terminated. | - (maybe) ``translate_and_relay_brokerd_events()``: accept normalized trades responses from brokerd, process and relay to ems client(s); this is a effectively a "trade event reponse" proxy-broker. + + - ``_emsd_main()``: + attaches a brokerd real-time quote feed and trades dialogue with + brokderd trading api for every connecting client. + | + - ``clear_dark_triggers()``: + run (dark order) conditions on inputs and trigger brokerd "live" + order submissions. | - ``process_client_order_cmds()``: accepts order cmds from requesting clients, registers dark orders and @@ -1301,8 +1342,12 @@ async def _emsd_main( # brokerd-side relay task to ensure the client is # delivered all exisiting open orders on startup. _router.clients.add(client_stream) - for oid in _router.fqsn2dialogs[fqsn]: - _router.dialogs[oid].append(client_stream) + + # TODO: instead of by fqsn we need a subscription + # system/schema here to limit what each new client is + # allowed to see in terms of broadcasted order flow + # updates per dialog. + _router.subscribers[fqsn].add(client_stream) # trigger scan and exec loop n.start_soon( @@ -1337,6 +1382,7 @@ async def _emsd_main( ' was already dropped?' ) + _router.subscribers[fqsn].remove(client_stream) dialogs = _router.dialogs for oid, client_streams in dialogs.items(): if client_stream in client_streams: