From e34ea94f9f7430e99e66e4b6c0b4702b618481ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Aug 2022 18:29:40 -0400 Subject: [PATCH] Start brokerd relay loop after opening client stream In order to avoid missed existing order message emissions on startup we need to be sure the client side stream is registered with the router first. So break out the starting of the `translate_and_relay_brokerd_events()` task until inside the client stream block and start the task using the dark clearing loop nursery. Also, ensure `oid` (and thus for `ib` the equivalent re-used `reqid`) are cast to `str` before registering the dark book. Deliver the dark book entries as part of the `_emsd_main()` context `.started()` values. --- piker/clearing/_ems.py | 81 ++++++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index b847333a..01cc25a2 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -501,14 +501,9 @@ async def open_brokerd_trades_dialogue( task_status.started(relay) - await translate_and_relay_brokerd_events( - broker, - brokerd_trades_stream, - _router, - ) - # this context should block here indefinitely until # the ``brokerd`` task either dies or is cancelled + await trio.sleep_forever() finally: # parent context must have been closed @@ -567,8 +562,8 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} ''' - book = router.get_dark_book(broker) - relay = router.relays[broker] + book: _DarkBook = router.get_dark_book(broker) + relay: TradesRelay = router.relays[broker] assert relay.brokerd_dialogue == brokerd_trades_stream @@ -731,14 +726,16 @@ async def translate_and_relay_brokerd_events( if status == 'submitted': msg = BrokerdStatus(**brokerd_msg) - log.info('Relaying existing open order:\n {brokerd_msg}') + log.info( + f'Relaying existing open order:\n {brokerd_msg}' + ) # use backend request id as our ems id though this # may end up with collisions? broker = details['name'] - # oid = f'{broker}-{reqid}' - oid = reqid + oid = str(reqid) book._ems_entries[oid] = msg + # attempt to avoid collisions msg.reqid = oid resp = 'broker_submitted' @@ -848,7 +845,9 @@ async def process_client_order_cmds( async for cmd in client_order_stream: log.info(f'Received order cmd:\n{pformat(cmd)}') - oid = cmd['oid'] + # CAWT DAMN we need struct support! + oid = str(cmd['oid']) + # register this stream as an active dialogue for this order id # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory @@ -892,24 +891,24 @@ async def process_client_order_cmds( 'action': 'cancel', 'oid': oid, } if not live_entry: - try: - # remove from dark book clearing - dark_book.orders[symbol].pop(oid, None) + # try: + # remove from dark book clearing + dark_book.orders[symbol].pop(oid, None) - # tell client side that we've cancelled the - # dark-trigger order - await client_order_stream.send( - Status( - resp='dark_cancelled', - oid=oid, - time_ns=time.time_ns(), - ) + # tell client side that we've cancelled the + # dark-trigger order + await client_order_stream.send( + Status( + resp='dark_cancelled', + oid=oid, + time_ns=time.time_ns(), ) - # de-register this client dialogue - router.dialogues.pop(oid) + ) + # de-register this client dialogue + router.dialogues.pop(oid) - except KeyError: - log.exception(f'No dark order for {symbol}?') + # except KeyError: + # log.exception(f'No dark order for {symbol}?') # live order submission case { @@ -932,8 +931,11 @@ async def process_client_order_cmds( sym = fqsn.replace(f'.{broker}', '') if live_entry is not None: - # sanity check on emsd id - assert live_entry.oid == oid + # sanity check on emsd id, but it won't work + # for pre-existing orders that we load since + # the only msg will be a ``BrokerdStatus`` + # assert live_entry.oid == oid + reqid = live_entry.reqid # if we already had a broker order id then # this is likely an order update commmand. @@ -1118,10 +1120,9 @@ async def _emsd_main( ): # XXX: this should be initial price quote from target provider - first_quote = feed.first_quotes[fqsn] - - book = _router.get_dark_book(broker) - book.lasts[fqsn] = first_quote['last'] + first_quote: dict = feed.first_quotes[fqsn] + book: _DarkBook = _router.get_dark_book(broker) + book.lasts[fqsn]: float = first_quote['last'] # open a stream with the brokerd backend for order # flow dialogue @@ -1148,12 +1149,25 @@ async def _emsd_main( await ems_ctx.started(( relay.positions, list(relay.accounts), + book._ems_entries, )) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates async with ems_ctx.open_stream() as ems_client_order_stream: + # register the client side before startingn the + # brokerd-side relay task to ensure the client is + # delivered all exisiting open orders on startup. + _router.clients.add(ems_client_order_stream) + + n.start_soon( + translate_and_relay_brokerd_events, + broker, + brokerd_stream, + _router, + ) + # trigger scan and exec loop n.start_soon( clear_dark_triggers, @@ -1168,7 +1182,6 @@ async def _emsd_main( # start inbound (from attached client) order request processing try: - _router.clients.add(ems_client_order_stream) # main entrypoint, run here until cancelled. await process_client_order_cmds(