diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index b847333a..01cc25a2 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -501,14 +501,9 @@ async def open_brokerd_trades_dialogue( task_status.started(relay) - await translate_and_relay_brokerd_events( - broker, - brokerd_trades_stream, - _router, - ) - # this context should block here indefinitely until # the ``brokerd`` task either dies or is cancelled + await trio.sleep_forever() finally: # parent context must have been closed @@ -567,8 +562,8 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} ''' - book = router.get_dark_book(broker) - relay = router.relays[broker] + book: _DarkBook = router.get_dark_book(broker) + relay: TradesRelay = router.relays[broker] assert relay.brokerd_dialogue == brokerd_trades_stream @@ -731,14 +726,16 @@ async def translate_and_relay_brokerd_events( if status == 'submitted': msg = BrokerdStatus(**brokerd_msg) - log.info('Relaying existing open order:\n {brokerd_msg}') + log.info( + f'Relaying existing open order:\n {brokerd_msg}' + ) # use backend request id as our ems id though this # may end up with collisions? broker = details['name'] - # oid = f'{broker}-{reqid}' - oid = reqid + oid = str(reqid) book._ems_entries[oid] = msg + # attempt to avoid collisions msg.reqid = oid resp = 'broker_submitted' @@ -848,7 +845,9 @@ async def process_client_order_cmds( async for cmd in client_order_stream: log.info(f'Received order cmd:\n{pformat(cmd)}') - oid = cmd['oid'] + # CAWT DAMN we need struct support! + oid = str(cmd['oid']) + # register this stream as an active dialogue for this order id # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory @@ -892,24 +891,24 @@ async def process_client_order_cmds( 'action': 'cancel', 'oid': oid, } if not live_entry: - try: - # remove from dark book clearing - dark_book.orders[symbol].pop(oid, None) + # try: + # remove from dark book clearing + dark_book.orders[symbol].pop(oid, None) - # tell client side that we've cancelled the - # dark-trigger order - await client_order_stream.send( - Status( - resp='dark_cancelled', - oid=oid, - time_ns=time.time_ns(), - ) + # tell client side that we've cancelled the + # dark-trigger order + await client_order_stream.send( + Status( + resp='dark_cancelled', + oid=oid, + time_ns=time.time_ns(), ) - # de-register this client dialogue - router.dialogues.pop(oid) + ) + # de-register this client dialogue + router.dialogues.pop(oid) - except KeyError: - log.exception(f'No dark order for {symbol}?') + # except KeyError: + # log.exception(f'No dark order for {symbol}?') # live order submission case { @@ -932,8 +931,11 @@ async def process_client_order_cmds( sym = fqsn.replace(f'.{broker}', '') if live_entry is not None: - # sanity check on emsd id - assert live_entry.oid == oid + # sanity check on emsd id, but it won't work + # for pre-existing orders that we load since + # the only msg will be a ``BrokerdStatus`` + # assert live_entry.oid == oid + reqid = live_entry.reqid # if we already had a broker order id then # this is likely an order update commmand. @@ -1118,10 +1120,9 @@ async def _emsd_main( ): # XXX: this should be initial price quote from target provider - first_quote = feed.first_quotes[fqsn] - - book = _router.get_dark_book(broker) - book.lasts[fqsn] = first_quote['last'] + first_quote: dict = feed.first_quotes[fqsn] + book: _DarkBook = _router.get_dark_book(broker) + book.lasts[fqsn]: float = first_quote['last'] # open a stream with the brokerd backend for order # flow dialogue @@ -1148,12 +1149,25 @@ async def _emsd_main( await ems_ctx.started(( relay.positions, list(relay.accounts), + book._ems_entries, )) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates async with ems_ctx.open_stream() as ems_client_order_stream: + # register the client side before startingn the + # brokerd-side relay task to ensure the client is + # delivered all exisiting open orders on startup. + _router.clients.add(ems_client_order_stream) + + n.start_soon( + translate_and_relay_brokerd_events, + broker, + brokerd_stream, + _router, + ) + # trigger scan and exec loop n.start_soon( clear_dark_triggers, @@ -1168,7 +1182,6 @@ async def _emsd_main( # start inbound (from attached client) order request processing try: - _router.clients.add(ems_client_order_stream) # main entrypoint, run here until cancelled. await process_client_order_cmds(