diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ae54615b..473a9e95 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -499,7 +499,7 @@ async def open_brokerd_trades_dialogue( ): # XXX: really we only want one stream per `emsd` actor # to relay global `brokerd` order events unless we're - # doing to expect each backend to relay only orders + # going to expect each backend to relay only orders # affiliated with a particular ``trades_dialogue()`` # session (seems annoying for implementers). So, here # we cache the relay task and instead of running multiple @@ -612,9 +612,10 @@ async def translate_and_relay_brokerd_events( brokerd_msg: dict[str, Any] async for brokerd_msg in brokerd_trades_stream: + fmsg = pformat(brokerd_msg) log.info( f'Received broker trade event:\n' - f'{pformat(brokerd_msg)}' + f'{fmsg}' ) match brokerd_msg: @@ -666,7 +667,11 @@ async def translate_and_relay_brokerd_events( # cancelled by the ems controlling client before we # received this ack, in which case we relay that cancel # signal **asap** to the backend broker - status_msg = book._active[oid] + status_msg = book._active.get(oid) + if not status_msg: + log.warning(f'Rx Ack for closed/unknown order?: {oid}') + continue + req = status_msg.req if req and req.action == 'cancel': # assign newly providerd broker backend request id @@ -692,7 +697,7 @@ async def translate_and_relay_brokerd_events( } if status_msg := book._active.get(oid): msg = BrokerdError(**brokerd_msg) - log.error(pformat(msg)) # XXX make one when it's blank? + log.error(fmsg) # XXX make one when it's blank? # TODO: figure out how this will interact with EMS clients # for ex. on an error do we react with a dark orders @@ -726,8 +731,19 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_stream = router.dialogues[oid] - status_msg = book._active[oid] + ems_client_order_stream = router.dialogues.get(oid) + status_msg = book._active.get(oid) + + if ( + not ems_client_order_stream + or not status_msg + ): + log.warning( + 'Received status for unknown dialog {oid}:\n' + '{fmsg}' + ) + continue + status_msg.resp = status # retrieve existing live flow @@ -762,12 +778,19 @@ async def translate_and_relay_brokerd_events( 'name': 'fill', 'reqid': reqid, # brokerd generated order-request id # 'symbol': sym, # paper engine doesn't have this, nbd? - } if ( - oid := book._ems2brokerd_ids.inverse.get(reqid) - ): + }: + oid = book._ems2brokerd_ids.inverse.get(reqid) + if not oid: + # TODO: maybe we could optionally check for an + # ``.oid`` in the msg since we're planning to + # maybe-kinda offer that via using ``Status`` + # in the longer run anyway? + log.warning(f'Unkown fill for {fmsg}') + continue + # proxy through the "fill" result(s) msg = BrokerdFill(**brokerd_msg) - log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') + log.info(f'Fill for {oid} cleared with:\n{fmsg}') ems_client_order_stream = router.dialogues[oid] @@ -796,7 +819,7 @@ async def translate_and_relay_brokerd_events( # registered from a previous order/status load? log.error( f'Unknown/transient status msg:\n' - f'{pformat(brokerd_msg)}\n' + f'{fmsg}\n' 'Unable to relay message to client side!?' ) @@ -841,7 +864,7 @@ async def translate_and_relay_brokerd_events( 'name': 'status', 'status': 'error', }: - log.error(f'Broker error:\n{pformat(brokerd_msg)}') + log.error(f'Broker error:\n{fmsg}') # XXX: we presume the brokerd cancels its own order # TOO FAST ``BrokerdStatus`` that arrives @@ -862,7 +885,7 @@ async def translate_and_relay_brokerd_events( status_msg = book._active[oid] msg += ( f'last status msg: {pformat(status_msg)}\n\n' - f'this msg:{pformat(brokerd_msg)}\n' + f'this msg:{fmsg}\n' ) log.warning(msg)