diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index c1dd568b..c77f2140 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -795,7 +795,7 @@ async def handle_order_updates( # 'vol_exec': exec_vlm} # 0.0000 match update_msg: - # EMS-unknown live order that needs to be + # EMS-unknown LIVE order that needs to be # delivered and loaded on the client-side. case { 'userref': reqid, @@ -849,7 +849,7 @@ async def handle_order_updates( ), src='kraken', ) - apiflows[reqid].maps.append(status_msg) + apiflows[reqid].maps.append(status_msg.to_dict()) await ems_stream.send(status_msg) continue @@ -1104,7 +1104,6 @@ async def handle_order_updates( 'txid': [txid], }) case _: - log.warning(f'Unhandled trades update msg: {msg}') diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 6cc1cb7f..49961b52 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -62,6 +62,7 @@ class Pair(Struct): lot: str # volume lot size cost_decimals: int + costmin: float pair_decimals: int # scaling decimal places for pair lot_decimals: int # scaling decimal places for volume diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3bada0c3..1569429f 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -56,7 +56,6 @@ from . import _paper_engine as paper from ._messages import ( Order, Status, - # Cancel, BrokerdCancel, BrokerdOrder, # BrokerdOrderAck, @@ -581,6 +580,7 @@ class Router(Struct): notify_on_headless: bool = True, ) -> bool: + # print(f'SUBSCRIBERS: {self.subscribers}') to_remove: set[tractor.MsgStream] = set() if sub_key == 'all': @@ -611,7 +611,8 @@ class Router(Struct): and notify_on_headless ): log.info( - 'No clients attached, firing notification for {sub_key} msg:\n' + 'No clients attached, ' + f'firing notification for {sub_key} msg:\n' f'{msg}' ) await notify_from_ems_status_msg( @@ -741,12 +742,24 @@ async def translate_and_relay_brokerd_events( log.warning(f'Rx Ack for closed/unknown order?: {oid}') continue - req = status_msg.req - if req and req.action == 'cancel': + if status_msg.cancel_called: # assign newly providerd broker backend request id # and tell broker to cancel immediately status_msg.reqid = reqid - await brokerd_trades_stream.send(req) + + # NOTE: as per comment in cancel-request-block + # above: This is an ack to + # a client-already-cancelled order request so we + # must immediately send a cancel to the brokerd upon + # rx of this ACK. + await brokerd_trades_stream.send( + BrokerdCancel( + oid=oid, + reqid=reqid, + time_ns=time.time_ns(), + account=status_msg.req.account, + ) + ) # 2. the order is now active and will be mirrored in # our book -> registered as live flow @@ -763,8 +776,8 @@ async def translate_and_relay_brokerd_events( 'oid': oid, # ems order-dialog id 'reqid': reqid, # brokerd generated order-request id 'symbol': sym, - } if status_msg := book._active.get(oid): - + }: + status_msg = book._active.get(oid) msg = BrokerdError(**brokerd_msg) log.error(fmsg) # XXX make one when it's blank? @@ -776,11 +789,21 @@ async def translate_and_relay_brokerd_events( # about. In most default situations, with composed orders # (ex. brackets), most brokers seem to use a oca policy. - status_msg.resp = 'error' - status_msg.brokerd_msg = msg - book._active[oid] = status_msg + # only relay to client side if we have an active + # ongoing dialog + if status_msg: + status_msg.resp = 'error' + status_msg.brokerd_msg = msg + book._active[oid] = status_msg - await router.client_broadcast(sym, status_msg) + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) + + else: + log.error(f'Error for unknown order flow:\n{msg}') + continue # BrokerdStatus case { @@ -1029,23 +1052,30 @@ async def process_client_order_cmds( status = dark_book._active.get(oid) match cmd: - # existing live-broker order cancel + # existing LIVE CANCEL case { 'action': 'cancel', 'oid': oid, } if ( status - and status.resp in ('open', 'pending') + and status.resp in ( + 'open', + 'pending', + ) ): reqid = status.reqid order = status.req - to_brokerd_msg = BrokerdCancel( - oid=oid, - reqid=reqid, - time_ns=time.time_ns(), - # account=live_entry.account, - account=order.account, - ) + + # XXX: cancelled-before-ack race case. + # This might be a cancel for an order that hasn't been + # acked yet by a brokerd (so it's in the midst of being + # ``BrokerdAck``ed for submission but we don't have that + # confirmation response back yet). Set this client-side + # msg state so when the ack does show up (later) + # logic in ``translate_and_relay_brokerd_events()`` can + # forward the cancel request to the `brokerd` side of + # the order flow ASAP. + status.cancel_called = True # NOTE: cancel response will be relayed back in messages # from corresponding broker @@ -1054,17 +1084,16 @@ async def process_client_order_cmds( log.info( f'Submitting cancel for live order {reqid}' ) - await brokerd_order_stream.send(to_brokerd_msg) + await brokerd_order_stream.send( + BrokerdCancel( + oid=oid, + reqid=reqid, + time_ns=time.time_ns(), + account=order.account, + ) + ) - else: - # this might be a cancel for an order that hasn't been - # acked yet by a brokerd, so register a cancel for when - # the order ack does show up later such that the brokerd - # order request can be cancelled at that time. - # special case for now.. - status.req = to_brokerd_msg - - # dark trigger cancel + # DARK trigger CANCEL case { 'action': 'cancel', 'oid': oid, @@ -1103,9 +1132,9 @@ async def process_client_order_cmds( # TODO: eventually we should be receiving # this struct on the wire unpacked in a scoped protocol - # setup with ``tractor``. + # setup with ``tractor`` using ``msgspec``. - # live order submission + # LIVE order REQUEST case { 'oid': oid, 'symbol': fqsn, @@ -1175,7 +1204,7 @@ async def process_client_order_cmds( # that live order asap. # dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) - # dark-order / alert submission + # DARK-order / alert REQUEST case { 'oid': oid, 'symbol': fqsn, @@ -1265,6 +1294,9 @@ async def process_client_order_cmds( status, ) + case _: + log.warning(f'Rx UNHANDLED order request {cmd}') + @acm async def maybe_open_trade_relays( diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index af666f5a..2c258625 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -143,7 +143,7 @@ class Status(Struct): # (eg. the Order/Cancel which causes this msg) and # acts as a back-reference to the corresponding # request message which was the source of this msg. - req: Optional[Order | Cancel] = None + req: Order | None = None # XXX: better design/name here? # flag that can be set to indicate a message for an order @@ -152,6 +152,10 @@ class Status(Struct): # might want to "track" using piker UIs/systems). src: Optional[str] = None + # set when a cancel request msg was set for this order flow dialog + # but the brokerd dialog isn't yet in a cancelled state. + cancel_called: bool = False + # for relaying a boxed brokerd-dialog-side msg data "through" the # ems layer to clients. brokerd_msg: dict = {} diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 9ed5b7ec..f2ec1466 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -293,7 +293,7 @@ class SettingsPane: # don't log account "change" case since it'll be submitted # on every mouse interaction. - log.info(f'settings change: {key}: {value}') + log.runtime(f'settings change: {key}: {value}') # TODO: maybe return a diff of settings so if we can an error we # can have general input handling code to report it through the diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 3cda56ff..0b9558d7 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -56,6 +56,7 @@ from ._position import ( from ._forms import FieldsForm from ._window import MultiStatus from ..clearing._messages import ( + # Cancel, Order, Status, # BrokerdOrder, @@ -961,15 +962,20 @@ async def process_trade_msg( dialog: Dialog = mode.dialogs.get(oid) match msg: - case Status(resp='dark_open' | 'open'): + case Status( + resp='dark_open' | 'open', + ) if msg.req['action'] != 'cancel': order = Order(**msg.req) - if dialog is not None: + if ( + dialog is not None + and order.action != 'cancel' + ): # show line label once order is live mode.on_submit(oid, order=order) - else: + elif order.action != 'cancel': log.warning( f'received msg for untracked dialog:\n{fmsg}' ) @@ -1005,12 +1011,12 @@ async def process_trade_msg( case Status(resp='canceled'): # delete level line from view mode.on_cancel(oid) - req = Order(**msg.req) - log.cancel(f'Canceled {req.action}:{oid}') + action = msg.req["action"] + log.cancel(f'Canceled {action}:{oid}') case Status( resp='triggered', - # req=Order(exec_mode='dark') # TODO: + # req=Order(exec_mode='dark') # TODO: msgspec req={'exec_mode': 'dark'}, ): # TODO: UX for a "pending" clear/live order