Handle 'closed' vs. 'fill` race case..

`ib` is super good not being reliable with order event sequence order
and duplication of fill info. This adds some guards to try and avoid
popping the last status status too early if we end up receiving
a `'closed'` before the expected `'fill`' event(s). Further delete the
`status_msg` ref on each iteration to avoid stale reference lookups in
the relay task/loop.
dict_differ
Tyler Goodlet 2022-08-10 17:17:47 -04:00
parent b52c4092f3
commit 700dbf0e2b
1 changed files with 25 additions and 6 deletions

View File

@ -667,19 +667,19 @@ async def translate_and_relay_brokerd_events(
# received this ack, in which case we relay that cancel # received this ack, in which case we relay that cancel
# signal **asap** to the backend broker # signal **asap** to the backend broker
# status = book._active.get(oid) # status = book._active.get(oid)
status = book._active[oid] status_msg = book._active[oid]
req = status.req req = status_msg.req
if req and req.action == 'cancel': if req and req.action == 'cancel':
# assign newly providerd broker backend request id # assign newly providerd broker backend request id
# and tell broker to cancel immediately # and tell broker to cancel immediately
status.reqid = reqid status_msg.reqid = reqid
await brokerd_trades_stream.send(req) await brokerd_trades_stream.send(req)
# 2. the order is now active and will be mirrored in # 2. the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
else: else:
# TODO: should we relay this ack state? # TODO: should we relay this ack state?
status.resp = 'pending' status_msg.resp = 'pending'
# no msg to client necessary # no msg to client necessary
continue continue
@ -729,6 +729,7 @@ async def translate_and_relay_brokerd_events(
# msg-chain/dialog. # msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid] ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid] status_msg = book._active[oid]
old_resp = status_msg.resp
status_msg.resp = status status_msg.resp = status
# retrieve existing live flow # retrieve existing live flow
@ -746,7 +747,11 @@ async def translate_and_relay_brokerd_events(
if status == 'closed': if status == 'closed':
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
status_msg = book._active.pop(oid)
# only if we already rxed a fill then probably
# this clear is fully complete? (frickin ib..)
if old_resp == 'fill':
status_msg = book._active.pop(oid)
elif status == 'canceled': elif status == 'canceled':
log.info(f'Cancellation for {oid} is complete!') log.info(f'Cancellation for {oid} is complete!')
@ -823,6 +828,7 @@ async def translate_and_relay_brokerd_events(
'status': status, 'status': status,
'reqid': reqid, 'reqid': reqid,
}: }:
status_msg = book._active[oid]
log.warning( log.warning(
'Unhandled broker status:\n' 'Unhandled broker status:\n'
f'{pformat(brokerd_msg)}\n' f'{pformat(brokerd_msg)}\n'
@ -837,10 +843,19 @@ async def translate_and_relay_brokerd_events(
oid := book._ems2brokerd_ids.inverse.get(reqid) oid := book._ems2brokerd_ids.inverse.get(reqid)
): ):
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
msg = BrokerdFill(**brokerd_msg) msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
ems_client_order_stream = router.dialogues[oid] ems_client_order_stream = router.dialogues[oid]
# wtf a fill can come after 'closed' from ib?
status_msg = book._active[oid] status_msg = book._active[oid]
# only if we already rxed a 'closed'
# this clear is fully complete? (frickin ib..)
if status_msg.resp == 'closed':
status_msg = book._active.pop(oid)
status_msg.resp = 'fill' status_msg.resp = 'fill'
status_msg.reqid = reqid status_msg.reqid = reqid
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
@ -849,6 +864,10 @@ async def translate_and_relay_brokerd_events(
case _: case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid') raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# XXX: ugh sometimes we don't access it?
if status_msg:
del status_msg
# TODO: do we want this to keep things cleaned up? # TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the # it might require a special status from brokerd to affirm the
# flow is complete? # flow is complete?