Handle 'closed' vs. 'fill` race case..

`ib` is super good not being reliable with order event sequence order
and duplication of fill info. This adds some guards to try and avoid
popping the last status status too early if we end up receiving
a `'closed'` before the expected `'fill`' event(s). Further delete the
`status_msg` ref on each iteration to avoid stale reference lookups in
the relay task/loop.
open_order_loading
Tyler Goodlet 2022-08-10 17:17:47 -04:00
parent 176b230a46
commit 2b61672723
1 changed files with 25 additions and 6 deletions

View File

@ -667,19 +667,19 @@ async def translate_and_relay_brokerd_events(
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
# status = book._active.get(oid)
status = book._active[oid]
req = status.req
status_msg = book._active[oid]
req = status_msg.req
if req and req.action == 'cancel':
# assign newly providerd broker backend request id
# and tell broker to cancel immediately
status.reqid = reqid
status_msg.reqid = reqid
await brokerd_trades_stream.send(req)
# 2. the order is now active and will be mirrored in
# our book -> registered as live flow
else:
# TODO: should we relay this ack state?
status.resp = 'pending'
status_msg.resp = 'pending'
# no msg to client necessary
continue
@ -729,6 +729,7 @@ async def translate_and_relay_brokerd_events(
# msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
old_resp = status_msg.resp
status_msg.resp = status
# retrieve existing live flow
@ -746,7 +747,11 @@ async def translate_and_relay_brokerd_events(
if status == 'closed':
log.info(f'Execution for {oid} is complete!')
status_msg = book._active.pop(oid)
# only if we already rxed a fill then probably
# this clear is fully complete? (frickin ib..)
if old_resp == 'fill':
status_msg = book._active.pop(oid)
elif status == 'canceled':
log.info(f'Cancellation for {oid} is complete!')
@ -823,6 +828,7 @@ async def translate_and_relay_brokerd_events(
'status': status,
'reqid': reqid,
}:
status_msg = book._active[oid]
log.warning(
'Unhandled broker status:\n'
f'{pformat(brokerd_msg)}\n'
@ -837,10 +843,19 @@ async def translate_and_relay_brokerd_events(
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
ems_client_order_stream = router.dialogues[oid]
# wtf a fill can come after 'closed' from ib?
status_msg = book._active[oid]
# only if we already rxed a 'closed'
# this clear is fully complete? (frickin ib..)
if status_msg.resp == 'closed':
status_msg = book._active.pop(oid)
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
@ -849,6 +864,10 @@ async def translate_and_relay_brokerd_events(
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# XXX: ugh sometimes we don't access it?
if status_msg:
del status_msg
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?