Use new `Status.cancel_called` in EMS msg loops

even_moar_kraken_order_fixes
Tyler Goodlet 2022-10-28 15:37:24 -04:00
parent 71a11a23bd
commit 8537a4091b
2 changed files with 24 additions and 26 deletions

View File

@ -56,7 +56,6 @@ from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Order, Order,
Status, Status,
Cancel,
BrokerdCancel, BrokerdCancel,
BrokerdOrder, BrokerdOrder,
# BrokerdOrderAck, # BrokerdOrderAck,
@ -743,8 +742,7 @@ async def translate_and_relay_brokerd_events(
log.warning(f'Rx Ack for closed/unknown order?: {oid}') log.warning(f'Rx Ack for closed/unknown order?: {oid}')
continue continue
req = status_msg.req if status_msg.cancel_called:
if req and req.action == 'cancel':
# assign newly providerd broker backend request id # assign newly providerd broker backend request id
# and tell broker to cancel immediately # and tell broker to cancel immediately
status_msg.reqid = reqid status_msg.reqid = reqid
@ -754,13 +752,12 @@ async def translate_and_relay_brokerd_events(
# a client-already-cancelled order request so we # a client-already-cancelled order request so we
# must immediately send a cancel to the brokerd upon # must immediately send a cancel to the brokerd upon
# rx of this ACK. # rx of this ACK.
cancel_req = status_msg.req
await brokerd_trades_stream.send( await brokerd_trades_stream.send(
BrokerdCancel( BrokerdCancel(
oid=oid, oid=oid,
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
account=cancel_req.account, account=status_msg.req.account,
) )
) )
@ -1055,18 +1052,30 @@ async def process_client_order_cmds(
status = dark_book._active.get(oid) status = dark_book._active.get(oid)
match cmd: match cmd:
# existing live-broker order cancel # existing LIVE CANCEL
case { case {
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if ( } if (
status status
and status.resp in ('open', 'pending') and status.resp in (
'open',
'pending',
)
): ):
reqid = status.reqid reqid = status.reqid
order = status.req order = status.req
cancreq = status.req = Cancel(**cmd)
cancreq.account = order.account # XXX: cancelled-before-ack race case.
# This might be a cancel for an order that hasn't been
# acked yet by a brokerd (so it's in the midst of being
# ``BrokerdAck``ed for submission but we don't have that
# confirmation response back yet). Set this client-side
# msg state so when the ack does show up (later)
# logic in ``translate_and_relay_brokerd_events()`` can
# forward the cancel request to the `brokerd` side of
# the order flow ASAP.
status.cancel_called = True
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
@ -1084,19 +1093,7 @@ async def process_client_order_cmds(
) )
) )
# else: # DARK trigger CANCEL
# this might be a cancel for an order that hasn't been
# acked yet by a brokerd (so it's in the midst of
# being ACKed for submission but we don't have that
# confirmation yet). In this race case, save the
# client-side cancel request for when
# the ack does show up (later) such that the brokerd
# live-order can be cancelled immediately upon
# reception.
# special case for now..
# status.req = to_brokerd_msg
# dark trigger cancel
case { case {
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
@ -1135,9 +1132,9 @@ async def process_client_order_cmds(
# TODO: eventually we should be receiving # TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol # this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``. # setup with ``tractor`` using ``msgspec``.
# live order submission # LIVE order REQUEST
case { case {
'oid': oid, 'oid': oid,
'symbol': fqsn, 'symbol': fqsn,
@ -1207,7 +1204,7 @@ async def process_client_order_cmds(
# that live order asap. # that live order asap.
# dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) # dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark-order / alert submission # DARK-order / alert REQUEST
case { case {
'oid': oid, 'oid': oid,
'symbol': fqsn, 'symbol': fqsn,

View File

@ -1011,7 +1011,8 @@ async def process_trade_msg(
case Status(resp='canceled'): case Status(resp='canceled'):
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
log.cancel(f'Canceled {msg.req["action"]}:{oid}') action = msg.req["action"]
log.cancel(f'Canceled {action}:{oid}')
case Status( case Status(
resp='triggered', resp='triggered',