Merge pull request #410 from pikers/even_moar_kraken_order_fixes

Even moar `kraken` order fixes
no_signal_pi_overlays
goodboy 2022-10-28 19:52:20 -04:00 committed by GitHub
commit 2dac531729
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 44 deletions

View File

@ -795,7 +795,7 @@ async def handle_order_updates(
# 'vol_exec': exec_vlm} # 0.0000 # 'vol_exec': exec_vlm} # 0.0000
match update_msg: match update_msg:
# EMS-unknown live order that needs to be # EMS-unknown LIVE order that needs to be
# delivered and loaded on the client-side. # delivered and loaded on the client-side.
case { case {
'userref': reqid, 'userref': reqid,
@ -849,7 +849,7 @@ async def handle_order_updates(
), ),
src='kraken', src='kraken',
) )
apiflows[reqid].maps.append(status_msg) apiflows[reqid].maps.append(status_msg.to_dict())
await ems_stream.send(status_msg) await ems_stream.send(status_msg)
continue continue
@ -1104,7 +1104,6 @@ async def handle_order_updates(
'txid': [txid], 'txid': [txid],
}) })
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')

View File

@ -62,6 +62,7 @@ class Pair(Struct):
lot: str # volume lot size lot: str # volume lot size
cost_decimals: int cost_decimals: int
costmin: float
pair_decimals: int # scaling decimal places for pair pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume lot_decimals: int # scaling decimal places for volume

View File

@ -56,7 +56,6 @@ from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Order, Order,
Status, Status,
# Cancel,
BrokerdCancel, BrokerdCancel,
BrokerdOrder, BrokerdOrder,
# BrokerdOrderAck, # BrokerdOrderAck,
@ -581,6 +580,7 @@ class Router(Struct):
notify_on_headless: bool = True, notify_on_headless: bool = True,
) -> bool: ) -> bool:
# print(f'SUBSCRIBERS: {self.subscribers}')
to_remove: set[tractor.MsgStream] = set() to_remove: set[tractor.MsgStream] = set()
if sub_key == 'all': if sub_key == 'all':
@ -611,7 +611,8 @@ class Router(Struct):
and notify_on_headless and notify_on_headless
): ):
log.info( log.info(
'No clients attached, firing notification for {sub_key} msg:\n' 'No clients attached, '
f'firing notification for {sub_key} msg:\n'
f'{msg}' f'{msg}'
) )
await notify_from_ems_status_msg( await notify_from_ems_status_msg(
@ -741,12 +742,24 @@ async def translate_and_relay_brokerd_events(
log.warning(f'Rx Ack for closed/unknown order?: {oid}') log.warning(f'Rx Ack for closed/unknown order?: {oid}')
continue continue
req = status_msg.req if status_msg.cancel_called:
if req and req.action == 'cancel':
# assign newly providerd broker backend request id # assign newly providerd broker backend request id
# and tell broker to cancel immediately # and tell broker to cancel immediately
status_msg.reqid = reqid status_msg.reqid = reqid
await brokerd_trades_stream.send(req)
# NOTE: as per comment in cancel-request-block
# above: This is an ack to
# a client-already-cancelled order request so we
# must immediately send a cancel to the brokerd upon
# rx of this ACK.
await brokerd_trades_stream.send(
BrokerdCancel(
oid=oid,
reqid=reqid,
time_ns=time.time_ns(),
account=status_msg.req.account,
)
)
# 2. the order is now active and will be mirrored in # 2. the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
@ -763,8 +776,8 @@ async def translate_and_relay_brokerd_events(
'oid': oid, # ems order-dialog id 'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
'symbol': sym, 'symbol': sym,
} if status_msg := book._active.get(oid): }:
status_msg = book._active.get(oid)
msg = BrokerdError(**brokerd_msg) msg = BrokerdError(**brokerd_msg)
log.error(fmsg) # XXX make one when it's blank? log.error(fmsg) # XXX make one when it's blank?
@ -776,11 +789,21 @@ async def translate_and_relay_brokerd_events(
# about. In most default situations, with composed orders # about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy. # (ex. brackets), most brokers seem to use a oca policy.
# only relay to client side if we have an active
# ongoing dialog
if status_msg:
status_msg.resp = 'error' status_msg.resp = 'error'
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
book._active[oid] = status_msg book._active[oid] = status_msg
await router.client_broadcast(sym, status_msg) await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
else:
log.error(f'Error for unknown order flow:\n{msg}')
continue
# BrokerdStatus # BrokerdStatus
case { case {
@ -1029,23 +1052,30 @@ async def process_client_order_cmds(
status = dark_book._active.get(oid) status = dark_book._active.get(oid)
match cmd: match cmd:
# existing live-broker order cancel # existing LIVE CANCEL
case { case {
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if ( } if (
status status
and status.resp in ('open', 'pending') and status.resp in (
'open',
'pending',
)
): ):
reqid = status.reqid reqid = status.reqid
order = status.req order = status.req
to_brokerd_msg = BrokerdCancel(
oid=oid, # XXX: cancelled-before-ack race case.
reqid=reqid, # This might be a cancel for an order that hasn't been
time_ns=time.time_ns(), # acked yet by a brokerd (so it's in the midst of being
# account=live_entry.account, # ``BrokerdAck``ed for submission but we don't have that
account=order.account, # confirmation response back yet). Set this client-side
) # msg state so when the ack does show up (later)
# logic in ``translate_and_relay_brokerd_events()`` can
# forward the cancel request to the `brokerd` side of
# the order flow ASAP.
status.cancel_called = True
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
@ -1054,17 +1084,16 @@ async def process_client_order_cmds(
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(to_brokerd_msg) await brokerd_order_stream.send(
BrokerdCancel(
oid=oid,
reqid=reqid,
time_ns=time.time_ns(),
account=order.account,
)
)
else: # DARK trigger CANCEL
# this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when
# the order ack does show up later such that the brokerd
# order request can be cancelled at that time.
# special case for now..
status.req = to_brokerd_msg
# dark trigger cancel
case { case {
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
@ -1103,9 +1132,9 @@ async def process_client_order_cmds(
# TODO: eventually we should be receiving # TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol # this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``. # setup with ``tractor`` using ``msgspec``.
# live order submission # LIVE order REQUEST
case { case {
'oid': oid, 'oid': oid,
'symbol': fqsn, 'symbol': fqsn,
@ -1175,7 +1204,7 @@ async def process_client_order_cmds(
# that live order asap. # that live order asap.
# dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) # dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark-order / alert submission # DARK-order / alert REQUEST
case { case {
'oid': oid, 'oid': oid,
'symbol': fqsn, 'symbol': fqsn,
@ -1265,6 +1294,9 @@ async def process_client_order_cmds(
status, status,
) )
case _:
log.warning(f'Rx UNHANDLED order request {cmd}')
@acm @acm
async def maybe_open_trade_relays( async def maybe_open_trade_relays(

View File

@ -143,7 +143,7 @@ class Status(Struct):
# (eg. the Order/Cancel which causes this msg) and # (eg. the Order/Cancel which causes this msg) and
# acts as a back-reference to the corresponding # acts as a back-reference to the corresponding
# request message which was the source of this msg. # request message which was the source of this msg.
req: Optional[Order | Cancel] = None req: Order | None = None
# XXX: better design/name here? # XXX: better design/name here?
# flag that can be set to indicate a message for an order # flag that can be set to indicate a message for an order
@ -152,6 +152,10 @@ class Status(Struct):
# might want to "track" using piker UIs/systems). # might want to "track" using piker UIs/systems).
src: Optional[str] = None src: Optional[str] = None
# set when a cancel request msg was set for this order flow dialog
# but the brokerd dialog isn't yet in a cancelled state.
cancel_called: bool = False
# for relaying a boxed brokerd-dialog-side msg data "through" the # for relaying a boxed brokerd-dialog-side msg data "through" the
# ems layer to clients. # ems layer to clients.
brokerd_msg: dict = {} brokerd_msg: dict = {}

View File

@ -293,7 +293,7 @@ class SettingsPane:
# don't log account "change" case since it'll be submitted # don't log account "change" case since it'll be submitted
# on every mouse interaction. # on every mouse interaction.
log.info(f'settings change: {key}: {value}') log.runtime(f'settings change: {key}: {value}')
# TODO: maybe return a diff of settings so if we can an error we # TODO: maybe return a diff of settings so if we can an error we
# can have general input handling code to report it through the # can have general input handling code to report it through the

View File

@ -56,6 +56,7 @@ from ._position import (
from ._forms import FieldsForm from ._forms import FieldsForm
from ._window import MultiStatus from ._window import MultiStatus
from ..clearing._messages import ( from ..clearing._messages import (
# Cancel,
Order, Order,
Status, Status,
# BrokerdOrder, # BrokerdOrder,
@ -961,15 +962,20 @@ async def process_trade_msg(
dialog: Dialog = mode.dialogs.get(oid) dialog: Dialog = mode.dialogs.get(oid)
match msg: match msg:
case Status(resp='dark_open' | 'open'): case Status(
resp='dark_open' | 'open',
) if msg.req['action'] != 'cancel':
order = Order(**msg.req) order = Order(**msg.req)
if dialog is not None: if (
dialog is not None
and order.action != 'cancel'
):
# show line label once order is live # show line label once order is live
mode.on_submit(oid, order=order) mode.on_submit(oid, order=order)
else: elif order.action != 'cancel':
log.warning( log.warning(
f'received msg for untracked dialog:\n{fmsg}' f'received msg for untracked dialog:\n{fmsg}'
) )
@ -1005,12 +1011,12 @@ async def process_trade_msg(
case Status(resp='canceled'): case Status(resp='canceled'):
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
req = Order(**msg.req) action = msg.req["action"]
log.cancel(f'Canceled {req.action}:{oid}') log.cancel(f'Canceled {action}:{oid}')
case Status( case Status(
resp='triggered', resp='triggered',
# req=Order(exec_mode='dark') # TODO: # req=Order(exec_mode='dark') # TODO: msgspec
req={'exec_mode': 'dark'}, req={'exec_mode': 'dark'},
): ):
# TODO: UX for a "pending" clear/live order # TODO: UX for a "pending" clear/live order