Only emit ems fill msgs for 'status' events from ib

Fills seems to be dual emitted from both the `status` and `fill` events
in `ib_insync` internals and more or less contain the same data nested
inside their `Trade` type. We started handling the 'fill' case to deal
with a race issue in commissions/cost report tracking but we don't
really want to leak that same race to incremental fills vs.
order-"closed" tracking.. So go back to only emitting the fill msgs
on statuses and a "closed" on `.remaining == 0`.
open_order_loading
Tyler Goodlet 2022-08-16 09:21:47 -04:00
parent ee8c00684b
commit be8fd32e7d
1 changed files with 28 additions and 25 deletions

View File

@ -807,8 +807,9 @@ async def deliver_trade_events(
log.info(f'ib sending {event_name}:\n{pformat(item)}') log.info(f'ib sending {event_name}:\n{pformat(item)}')
match event_name: match event_name:
# TODO: templating the ib statuses in comparison with other # NOTE: we remap statuses to the ems set via the
# brokers is likely the way to go: # ``_statuses: dict`` above.
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list: # short list:
# - PendingSubmit # - PendingSubmit
@ -839,7 +840,6 @@ async def deliver_trade_events(
trade: Trade = item trade: Trade = item
status: OrderStatus = trade.orderStatus status: OrderStatus = trade.orderStatus
ib_status_key = status.status.lower() ib_status_key = status.status.lower()
acctid = accounts_def.inverse[trade.order.account] acctid = accounts_def.inverse[trade.order.account]
# double check there is no error when # double check there is no error when
@ -851,9 +851,9 @@ async def deliver_trade_events(
and 'Error' not in last_log.message and 'Error' not in last_log.message
): ):
ib_status_key = trade.log[-2].status ib_status_key = trade.log[-2].status
print(ib_status_key)
elif ib_status_key == 'inactive': elif ib_status_key == 'inactive':
async def sched_cancel(): async def sched_cancel():
log.warning( log.warning(
'OH GAWD an inactive order..scheduling a cancel\n' 'OH GAWD an inactive order..scheduling a cancel\n'
@ -874,14 +874,34 @@ async def deliver_trade_events(
remaining = status.remaining remaining = status.remaining
if ( if (
status_key == 'filled' status_key == 'filled'
and remaining == 0
): ):
fill: Fill = trade.fills[-1]
execu: Execution = fill.execution
# execdict = asdict(execu)
# execdict.pop('acctNumber')
msg = BrokerdFill(
# should match the value returned from
# `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
action=action_map[execu.side],
size=execu.shares,
price=execu.price,
# broker_details=execdict,
# XXX: required by order mode currently
broker_time=execu.time,
)
await ems_stream.send(msg)
if remaining == 0:
# emit a closed status on filled statuses where
# all units were cleared.
status_key = 'closed' status_key = 'closed'
# skip duplicate filled updates - we get the deats # skip duplicate filled updates - we get the deats
# from the execution details event # from the execution details event
msg = BrokerdStatus( msg = BrokerdStatus(
reqid=trade.order.orderId, reqid=trade.order.orderId,
time_ns=time.time_ns(), # cuz why not time_ns=time.time_ns(), # cuz why not
account=accounts_def.inverse[trade.order.account], account=accounts_def.inverse[trade.order.account],
@ -899,6 +919,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg) await ems_stream.send(msg)
continue
case 'fill': case 'fill':
@ -914,8 +935,6 @@ async def deliver_trade_events(
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations
trade: Trade trade: Trade
fill: Fill fill: Fill
# TODO: maybe we can use matching to better handle these cases.
trade, fill = item trade, fill = item
execu: Execution = fill.execution execu: Execution = fill.execution
execid = execu.execId execid = execu.execId
@ -944,22 +963,6 @@ async def deliver_trade_events(
} }
) )
msg = BrokerdFill(
# should match the value returned from `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
action=action_map[execu.side],
size=execu.shares,
price=execu.price,
broker_details=trade_entry,
# XXX: required by order mode currently
broker_time=trade_entry['broker_time'],
)
await ems_stream.send(msg)
# 2 cases: # 2 cases:
# - fill comes first or # - fill comes first or
# - comms report comes first # - comms report comes first