From 700dbf0e2b679dff14ac8c1f6ea29344bce96a57 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Aug 2022 17:17:47 -0400 Subject: [PATCH] Handle 'closed' vs. 'fill` race case.. `ib` is super good not being reliable with order event sequence order and duplication of fill info. This adds some guards to try and avoid popping the last status status too early if we end up receiving a `'closed'` before the expected `'fill`' event(s). Further delete the `status_msg` ref on each iteration to avoid stale reference lookups in the relay task/loop. --- piker/clearing/_ems.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 0411f026..24a78420 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -667,19 +667,19 @@ async def translate_and_relay_brokerd_events( # received this ack, in which case we relay that cancel # signal **asap** to the backend broker # status = book._active.get(oid) - status = book._active[oid] - req = status.req + status_msg = book._active[oid] + req = status_msg.req if req and req.action == 'cancel': # assign newly providerd broker backend request id # and tell broker to cancel immediately - status.reqid = reqid + status_msg.reqid = reqid await brokerd_trades_stream.send(req) # 2. the order is now active and will be mirrored in # our book -> registered as live flow else: # TODO: should we relay this ack state? - status.resp = 'pending' + status_msg.resp = 'pending' # no msg to client necessary continue @@ -729,6 +729,7 @@ async def translate_and_relay_brokerd_events( # msg-chain/dialog. ems_client_order_stream = router.dialogues[oid] status_msg = book._active[oid] + old_resp = status_msg.resp status_msg.resp = status # retrieve existing live flow @@ -746,7 +747,11 @@ async def translate_and_relay_brokerd_events( if status == 'closed': log.info(f'Execution for {oid} is complete!') - status_msg = book._active.pop(oid) + + # only if we already rxed a fill then probably + # this clear is fully complete? (frickin ib..) + if old_resp == 'fill': + status_msg = book._active.pop(oid) elif status == 'canceled': log.info(f'Cancellation for {oid} is complete!') @@ -823,6 +828,7 @@ async def translate_and_relay_brokerd_events( 'status': status, 'reqid': reqid, }: + status_msg = book._active[oid] log.warning( 'Unhandled broker status:\n' f'{pformat(brokerd_msg)}\n' @@ -837,10 +843,19 @@ async def translate_and_relay_brokerd_events( oid := book._ems2brokerd_ids.inverse.get(reqid) ): # proxy through the "fill" result(s) - log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') msg = BrokerdFill(**brokerd_msg) + log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') + ems_client_order_stream = router.dialogues[oid] + + # wtf a fill can come after 'closed' from ib? status_msg = book._active[oid] + + # only if we already rxed a 'closed' + # this clear is fully complete? (frickin ib..) + if status_msg.resp == 'closed': + status_msg = book._active.pop(oid) + status_msg.resp = 'fill' status_msg.reqid = reqid status_msg.brokerd_msg = msg @@ -849,6 +864,10 @@ async def translate_and_relay_brokerd_events( case _: raise ValueError(f'Brokerd message {brokerd_msg} is invalid') + # XXX: ugh sometimes we don't access it? + if status_msg: + del status_msg + # TODO: do we want this to keep things cleaned up? # it might require a special status from brokerd to affirm the # flow is complete?