Handle paper-engine too-fast clearing race cases

When the paper engine is used it seems we can definitely hit races where
order ack msgs arrive close enough to status messages that `trio`
schedules the status processing before the acks. In such cases we want
to be tolerant and not crash but instead warn that we got an
unknown/out-of-order msg.
size_in_shm_token
Tyler Goodlet 2022-08-23 16:48:33 -04:00
parent ecd93cb05a
commit 430d065da6
1 changed files with 36 additions and 13 deletions

View File

@ -499,7 +499,7 @@ async def open_brokerd_trades_dialogue(
): ):
# XXX: really we only want one stream per `emsd` actor # XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're # to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders # going to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()`` # affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here # session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple # we cache the relay task and instead of running multiple
@ -612,9 +612,10 @@ async def translate_and_relay_brokerd_events(
brokerd_msg: dict[str, Any] brokerd_msg: dict[str, Any]
async for brokerd_msg in brokerd_trades_stream: async for brokerd_msg in brokerd_trades_stream:
fmsg = pformat(brokerd_msg)
log.info( log.info(
f'Received broker trade event:\n' f'Received broker trade event:\n'
f'{pformat(brokerd_msg)}' f'{fmsg}'
) )
match brokerd_msg: match brokerd_msg:
@ -666,7 +667,11 @@ async def translate_and_relay_brokerd_events(
# cancelled by the ems controlling client before we # cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel # received this ack, in which case we relay that cancel
# signal **asap** to the backend broker # signal **asap** to the backend broker
status_msg = book._active[oid] status_msg = book._active.get(oid)
if not status_msg:
log.warning(f'Rx Ack for closed/unknown order?: {oid}')
continue
req = status_msg.req req = status_msg.req
if req and req.action == 'cancel': if req and req.action == 'cancel':
# assign newly providerd broker backend request id # assign newly providerd broker backend request id
@ -692,7 +697,7 @@ async def translate_and_relay_brokerd_events(
} if status_msg := book._active.get(oid): } if status_msg := book._active.get(oid):
msg = BrokerdError(**brokerd_msg) msg = BrokerdError(**brokerd_msg)
log.error(pformat(msg)) # XXX make one when it's blank? log.error(fmsg) # XXX make one when it's blank?
# TODO: figure out how this will interact with EMS clients # TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders # for ex. on an error do we react with a dark orders
@ -726,8 +731,19 @@ async def translate_and_relay_brokerd_events(
# TODO: maybe pack this into a composite type that # TODO: maybe pack this into a composite type that
# contains both the IPC stream as well the # contains both the IPC stream as well the
# msg-chain/dialog. # msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid] ems_client_order_stream = router.dialogues.get(oid)
status_msg = book._active[oid] status_msg = book._active.get(oid)
if (
not ems_client_order_stream
or not status_msg
):
log.warning(
'Received status for unknown dialog {oid}:\n'
'{fmsg}'
)
continue
status_msg.resp = status status_msg.resp = status
# retrieve existing live flow # retrieve existing live flow
@ -762,12 +778,19 @@ async def translate_and_relay_brokerd_events(
'name': 'fill', 'name': 'fill',
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd? # 'symbol': sym, # paper engine doesn't have this, nbd?
} if ( }:
oid := book._ems2brokerd_ids.inverse.get(reqid) oid = book._ems2brokerd_ids.inverse.get(reqid)
): if not oid:
# TODO: maybe we could optionally check for an
# ``.oid`` in the msg since we're planning to
# maybe-kinda offer that via using ``Status``
# in the longer run anyway?
log.warning(f'Unkown fill for {fmsg}')
continue
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg) msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') log.info(f'Fill for {oid} cleared with:\n{fmsg}')
ems_client_order_stream = router.dialogues[oid] ems_client_order_stream = router.dialogues[oid]
@ -796,7 +819,7 @@ async def translate_and_relay_brokerd_events(
# registered from a previous order/status load? # registered from a previous order/status load?
log.error( log.error(
f'Unknown/transient status msg:\n' f'Unknown/transient status msg:\n'
f'{pformat(brokerd_msg)}\n' f'{fmsg}\n'
'Unable to relay message to client side!?' 'Unable to relay message to client side!?'
) )
@ -841,7 +864,7 @@ async def translate_and_relay_brokerd_events(
'name': 'status', 'name': 'status',
'status': 'error', 'status': 'error',
}: }:
log.error(f'Broker error:\n{pformat(brokerd_msg)}') log.error(f'Broker error:\n{fmsg}')
# XXX: we presume the brokerd cancels its own order # XXX: we presume the brokerd cancels its own order
# TOO FAST ``BrokerdStatus`` that arrives # TOO FAST ``BrokerdStatus`` that arrives
@ -862,7 +885,7 @@ async def translate_and_relay_brokerd_events(
status_msg = book._active[oid] status_msg = book._active[oid]
msg += ( msg += (
f'last status msg: {pformat(status_msg)}\n\n' f'last status msg: {pformat(status_msg)}\n\n'
f'this msg:{pformat(brokerd_msg)}\n' f'this msg:{fmsg}\n'
) )
log.warning(msg) log.warning(msg)