diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 0411f026..24a78420 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -667,19 +667,19 @@ async def translate_and_relay_brokerd_events( # received this ack, in which case we relay that cancel # signal **asap** to the backend broker # status = book._active.get(oid) - status = book._active[oid] - req = status.req + status_msg = book._active[oid] + req = status_msg.req if req and req.action == 'cancel': # assign newly providerd broker backend request id # and tell broker to cancel immediately - status.reqid = reqid + status_msg.reqid = reqid await brokerd_trades_stream.send(req) # 2. the order is now active and will be mirrored in # our book -> registered as live flow else: # TODO: should we relay this ack state? - status.resp = 'pending' + status_msg.resp = 'pending' # no msg to client necessary continue @@ -729,6 +729,7 @@ async def translate_and_relay_brokerd_events( # msg-chain/dialog. ems_client_order_stream = router.dialogues[oid] status_msg = book._active[oid] + old_resp = status_msg.resp status_msg.resp = status # retrieve existing live flow @@ -746,7 +747,11 @@ async def translate_and_relay_brokerd_events( if status == 'closed': log.info(f'Execution for {oid} is complete!') - status_msg = book._active.pop(oid) + + # only if we already rxed a fill then probably + # this clear is fully complete? (frickin ib..) + if old_resp == 'fill': + status_msg = book._active.pop(oid) elif status == 'canceled': log.info(f'Cancellation for {oid} is complete!') @@ -823,6 +828,7 @@ async def translate_and_relay_brokerd_events( 'status': status, 'reqid': reqid, }: + status_msg = book._active[oid] log.warning( 'Unhandled broker status:\n' f'{pformat(brokerd_msg)}\n' @@ -837,10 +843,19 @@ async def translate_and_relay_brokerd_events( oid := book._ems2brokerd_ids.inverse.get(reqid) ): # proxy through the "fill" result(s) - log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') msg = BrokerdFill(**brokerd_msg) + log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') + ems_client_order_stream = router.dialogues[oid] + + # wtf a fill can come after 'closed' from ib? status_msg = book._active[oid] + + # only if we already rxed a 'closed' + # this clear is fully complete? (frickin ib..) + if status_msg.resp == 'closed': + status_msg = book._active.pop(oid) + status_msg.resp = 'fill' status_msg.reqid = reqid status_msg.brokerd_msg = msg @@ -849,6 +864,10 @@ async def translate_and_relay_brokerd_events( case _: raise ValueError(f'Brokerd message {brokerd_msg} is invalid') + # XXX: ugh sometimes we don't access it? + if status_msg: + del status_msg + # TODO: do we want this to keep things cleaned up? # it might require a special status from brokerd to affirm the # flow is complete?