From 2fbfe583dd90e52702d012c4af6925e3ab17be89 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Oct 2022 13:08:53 -0400 Subject: [PATCH] Drop the `Router.clients: set`, `.subscribers` is enough --- piker/clearing/_ems.py | 47 ++++++++++++++---------------------------- 1 file changed, 15 insertions(+), 32 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 1906d1e5..e26c27fb 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -248,8 +248,6 @@ async def clear_dark_triggers( await brokerd_orders_stream.send(brokerd_msg) - # book._msgflows[oid].maps.insert(0, live_req) - case _: raise ValueError(f'Invalid dark book entry: {cmd}') @@ -333,9 +331,6 @@ class Router(Struct): # broker to book map books: dict[str, _DarkBook] = {} - # order id to client stream map - clients: set[tractor.MsgStream] = set() - # sets of clients mapped from subscription keys subscribers: defaultdict[ str, # sub key, default fqsn @@ -577,7 +572,6 @@ class Router(Struct): trio.BrokenResourceError, ): to_remove.add(client_stream) - self.clients.remove(client_stream) log.warning( f'client for {client_stream} was already closed?') @@ -1324,8 +1318,6 @@ async def _emsd_main( # register the client side before starting the # brokerd-side relay task to ensure the client is # delivered all exisiting open orders on startup. - _router.clients.add(client_stream) - # TODO: instead of by fqsn we need a subscription # system/schema here to limit what each new client is # allowed to see in terms of broadcasted order flow @@ -1345,29 +1337,20 @@ async def _emsd_main( ) finally: - # try to remove client from "registry" - try: - _router.clients.remove(client_stream) - except KeyError: - log.warning( - f'Stream {client_stream._ctx.chan.uid}' - ' was already dropped?' - ) - + # try to remove client from subscription registry _router.subscribers[fqsn].remove(client_stream) - dialogs = _router.dialogs - for oid, client_streams in dialogs.items(): - if client_stream in client_streams: - client_streams.remove(client_stream) - # TODO: for order dialogs left "alive" in - # the ems this is where we should allow some - # system to take over management. Likely we - # want to allow the user to choose what kind - # of policy to use (eg. cancel all orders - # from client, run some algo, etc.) - if not client_streams: - log.warning( - f'Order dialog is being unmonitored:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - ) + for oid, client_streams in _router.dialogs.items(): + client_streams.discard(client_stream) + + # TODO: for order dialogs left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.) + if not client_streams: + log.warning( + f'Order dialog is not being monitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + )