Better handle order-cancelled-but-not-yet-acked races
When the client is faster then a `brokerd` at submitting and cancelling an order we run into the case where we need to specify that the EMS cancels the order-flow as soon as the brokerd's ack arrives. Previously we were stashing a `BrokerdCancel` msg as the `Status.req` msg (to be both tested for as a "already cancelled" and sent immediately on ack arrival to the broker), but for such cases we can't use that msg to find the fqsn (since only the client side msgs have it defined) which is required by the new `Router.client_broadcast()`. So, Since `Status.req` is supposed to be a client-side flow msg anyway, and we need the fqsn for client broadcasting, we change this `.req` value to the client's submitted `Cancel` msg (thus rectifying the missing `Router.client_broadcast()` fqsn input issue) and build the `BrokerdCancel` request from that `Cancel` inline in the relay loop from the `.req: Cancel` status msg lookup. Further we allow `Cancel` msgs to define an `.account` and adjust the order mode loop to expect `Cancel` source requests in cancelled status updates.even_moar_kraken_order_fixes
parent
8a61211c8c
commit
30994dac10
|
@ -56,7 +56,7 @@ from . import _paper_engine as paper
|
||||||
from ._messages import (
|
from ._messages import (
|
||||||
Order,
|
Order,
|
||||||
Status,
|
Status,
|
||||||
# Cancel,
|
Cancel,
|
||||||
BrokerdCancel,
|
BrokerdCancel,
|
||||||
BrokerdOrder,
|
BrokerdOrder,
|
||||||
# BrokerdOrderAck,
|
# BrokerdOrderAck,
|
||||||
|
@ -748,7 +748,21 @@ async def translate_and_relay_brokerd_events(
|
||||||
# assign newly providerd broker backend request id
|
# assign newly providerd broker backend request id
|
||||||
# and tell broker to cancel immediately
|
# and tell broker to cancel immediately
|
||||||
status_msg.reqid = reqid
|
status_msg.reqid = reqid
|
||||||
await brokerd_trades_stream.send(req)
|
|
||||||
|
# NOTE: as per comment in cancel-request-block
|
||||||
|
# above: This is an ack to
|
||||||
|
# a client-already-cancelled order request so we
|
||||||
|
# must immediately send a cancel to the brokerd upon
|
||||||
|
# rx of this ACK.
|
||||||
|
cancel_req = status_msg.req
|
||||||
|
await brokerd_trades_stream.send(
|
||||||
|
BrokerdCancel(
|
||||||
|
oid=oid,
|
||||||
|
reqid=reqid,
|
||||||
|
time_ns=time.time_ns(),
|
||||||
|
account=cancel_req.account,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# 2. the order is now active and will be mirrored in
|
# 2. the order is now active and will be mirrored in
|
||||||
# our book -> registered as live flow
|
# our book -> registered as live flow
|
||||||
|
@ -1051,13 +1065,8 @@ async def process_client_order_cmds(
|
||||||
):
|
):
|
||||||
reqid = status.reqid
|
reqid = status.reqid
|
||||||
order = status.req
|
order = status.req
|
||||||
to_brokerd_msg = BrokerdCancel(
|
cancreq = status.req = Cancel(**cmd)
|
||||||
oid=oid,
|
cancreq.account = order.account
|
||||||
reqid=reqid,
|
|
||||||
time_ns=time.time_ns(),
|
|
||||||
# account=live_entry.account,
|
|
||||||
account=order.account,
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: cancel response will be relayed back in messages
|
# NOTE: cancel response will be relayed back in messages
|
||||||
# from corresponding broker
|
# from corresponding broker
|
||||||
|
@ -1066,15 +1075,26 @@ async def process_client_order_cmds(
|
||||||
log.info(
|
log.info(
|
||||||
f'Submitting cancel for live order {reqid}'
|
f'Submitting cancel for live order {reqid}'
|
||||||
)
|
)
|
||||||
await brokerd_order_stream.send(to_brokerd_msg)
|
await brokerd_order_stream.send(
|
||||||
|
BrokerdCancel(
|
||||||
|
oid=oid,
|
||||||
|
reqid=reqid,
|
||||||
|
time_ns=time.time_ns(),
|
||||||
|
account=order.account,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
# else:
|
||||||
# this might be a cancel for an order that hasn't been
|
# this might be a cancel for an order that hasn't been
|
||||||
# acked yet by a brokerd, so register a cancel for when
|
# acked yet by a brokerd (so it's in the midst of
|
||||||
# the order ack does show up later such that the brokerd
|
# being ACKed for submission but we don't have that
|
||||||
# order request can be cancelled at that time.
|
# confirmation yet). In this race case, save the
|
||||||
|
# client-side cancel request for when
|
||||||
|
# the ack does show up (later) such that the brokerd
|
||||||
|
# live-order can be cancelled immediately upon
|
||||||
|
# reception.
|
||||||
# special case for now..
|
# special case for now..
|
||||||
status.req = to_brokerd_msg
|
# status.req = to_brokerd_msg
|
||||||
|
|
||||||
# dark trigger cancel
|
# dark trigger cancel
|
||||||
case {
|
case {
|
||||||
|
|
|
@ -110,6 +110,7 @@ class Cancel(Struct):
|
||||||
action: str = 'cancel'
|
action: str = 'cancel'
|
||||||
oid: str # uuid4
|
oid: str # uuid4
|
||||||
symbol: str
|
symbol: str
|
||||||
|
account: str = ''
|
||||||
|
|
||||||
|
|
||||||
# --------------
|
# --------------
|
||||||
|
|
|
@ -56,6 +56,7 @@ from ._position import (
|
||||||
from ._forms import FieldsForm
|
from ._forms import FieldsForm
|
||||||
from ._window import MultiStatus
|
from ._window import MultiStatus
|
||||||
from ..clearing._messages import (
|
from ..clearing._messages import (
|
||||||
|
Cancel,
|
||||||
Order,
|
Order,
|
||||||
Status,
|
Status,
|
||||||
# BrokerdOrder,
|
# BrokerdOrder,
|
||||||
|
@ -961,15 +962,20 @@ async def process_trade_msg(
|
||||||
dialog: Dialog = mode.dialogs.get(oid)
|
dialog: Dialog = mode.dialogs.get(oid)
|
||||||
|
|
||||||
match msg:
|
match msg:
|
||||||
case Status(resp='dark_open' | 'open'):
|
case Status(
|
||||||
|
resp='dark_open' | 'open',
|
||||||
|
) if msg.req['action'] != 'cancel':
|
||||||
|
|
||||||
order = Order(**msg.req)
|
order = Order(**msg.req)
|
||||||
|
|
||||||
if dialog is not None:
|
if (
|
||||||
|
dialog is not None
|
||||||
|
and order.action != 'cancel'
|
||||||
|
):
|
||||||
# show line label once order is live
|
# show line label once order is live
|
||||||
mode.on_submit(oid, order=order)
|
mode.on_submit(oid, order=order)
|
||||||
|
|
||||||
else:
|
elif order.action != 'cancel':
|
||||||
log.warning(
|
log.warning(
|
||||||
f'received msg for untracked dialog:\n{fmsg}'
|
f'received msg for untracked dialog:\n{fmsg}'
|
||||||
)
|
)
|
||||||
|
@ -1005,7 +1011,7 @@ async def process_trade_msg(
|
||||||
case Status(resp='canceled'):
|
case Status(resp='canceled'):
|
||||||
# delete level line from view
|
# delete level line from view
|
||||||
mode.on_cancel(oid)
|
mode.on_cancel(oid)
|
||||||
req = Order(**msg.req)
|
req = Cancel(**msg.req)
|
||||||
log.cancel(f'Canceled {req.action}:{oid}')
|
log.cancel(f'Canceled {req.action}:{oid}')
|
||||||
|
|
||||||
case Status(
|
case Status(
|
||||||
|
|
Loading…
Reference in New Issue