From 1edccf37d95c9baf02b5eb96bfe3a7800123f657 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 26 Jun 2021 16:00:04 -0400 Subject: [PATCH] Support multiple client dialogues active on one brokerd trades dialogue --- piker/clearing/_ems.py | 86 +++++++++++++++++++++++++----------------- 1 file changed, 52 insertions(+), 34 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 22226055..c98c4069 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -282,6 +282,7 @@ class _Router(BaseModel): books: dict[str, _DarkBook] = {} # order id to client stream map + clients: set[tractor.MsgStream] = set() dialogues: dict[str, list[tractor.MsgStream]] = {} # brokername to trades-dialogues streams with ``brokerd`` actors @@ -425,8 +426,6 @@ async def open_brokerd_trades_dialogue( # normalizing them to EMS messages and relaying back to # the piker order client set. - # with brokerd_trades_stream.shield(): - relay = TradesRelay( brokerd_dialogue=brokerd_trades_stream, positions=positions, @@ -450,12 +449,8 @@ async def open_brokerd_trades_dialogue( # the ``brokerd`` task either dies or is cancelled finally: - # context must have been closed + # parent context must have been closed # remove from cache so next client will respawn if needed - # print('BROKERD DIALOGUE KILLED!!?!?!') - # with trio.CancelScope(shield=True): - # await tractor.breakpoint() - # raise _router.relays.pop(broker) @@ -521,13 +516,12 @@ async def translate_and_relay_brokerd_events( pos_msg = BrokerdPosition(**brokerd_msg).dict() # keep up to date locally in ``emsd`` - relay.positions.update(pos_msg) + relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg) # relay through position msgs immediately by # broadcasting updates on all client streams - for oid, ems_client_order_stream in router.dialogues.items(): - - await ems_client_order_stream.send(pos_msg) + for client_stream in router.clients: + await client_stream.send(pos_msg) continue @@ -682,16 +676,20 @@ async def translate_and_relay_brokerd_events( # Create and relay response status message # to requesting EMS client - ems_client_order_stream = router.dialogues[oid] - await ems_client_order_stream.send( - Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=broker_details, - ).dict() - ) + try: + ems_client_order_stream = router.dialogues[oid] + await ems_client_order_stream.send( + Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=broker_details, + ).dict() + ) + except KeyError: + log.error( + f'Received `brokerd` msg for unknown client with oid: {oid}') # TODO: do we want this to keep things cleaned up? # it might require a special status from brokerd to affirm the @@ -712,6 +710,8 @@ async def process_client_order_cmds( ) -> None: + client_dialogues = router.dialogues + # cmd: dict async for cmd in client_order_stream: @@ -720,14 +720,16 @@ async def process_client_order_cmds( action = cmd['action'] oid = cmd['oid'] + # TODO: make ``tractor.MsgStream`` a frozen type again such that it + # can be stored in sets like the old context was. + # wait, maybe this **is** already working thanks to our parent + # `trio` type? + # register this stream as an active dialogue for this order id # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory # others who are registered for such order affiliated msgs). - - # TODO: make ``tractor.MsgStream`` a frozen type again such that it - # can be stored in sets like the old context was. - router.dialogues[oid] = client_order_stream + client_dialogues[oid] = client_order_stream reqid = dark_book._ems2brokerd_ids.inverse.get(oid) live_entry = dark_book._ems_entries.get(oid) @@ -1000,7 +1002,7 @@ async def _emsd_main( trio.open_nursery() as n, ): - brokerd_stream = relay.brokerd_dialogue #.clone() + brokerd_stream = relay.brokerd_dialogue # .clone() # signal to client that we're started # TODO: we could eventually send back **all** brokerd @@ -1027,6 +1029,8 @@ async def _emsd_main( # start inbound (from attached client) order request processing try: + _router.clients.add(ems_client_order_stream) + await process_client_order_cmds( ems_client_order_stream, @@ -1039,12 +1043,26 @@ async def _emsd_main( dark_book, _router, ) + finally: - pass - # for oid, client_stream in _router.dialogs.copy().items(): - # if client_stream is ems_client_order_stream: - # # TODO: we need a placeholder for sending - # # the updates to an alert system inside - # # ``emsd`` ?? - # print(f'popping order for stream {oid}') - # _router.dialogs.pop(oid) + # remove client from "registry" + _router.clients.remove(ems_client_order_stream) + + dialogues = _router.dialogues + + for oid, client_stream in dialogues.items(): + + if client_stream == ems_client_order_stream: + + log.warning( + f'client dialogue is being abandoned:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + ) + dialogues.pop(oid) + + # TODO: for order dialogues left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.).