From c43f7eb656ed6b86db05a64ddc23f8f2584ae0bf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 Oct 2022 10:25:13 -0400 Subject: [PATCH 1/8] Fix missing `costmin: float` field in pair msgs --- piker/brokers/kraken/feed.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 6cc1cb7f..49961b52 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -62,6 +62,7 @@ class Pair(Struct): lot: str # volume lot size cost_decimals: int + costmin: float pair_decimals: int # scaling decimal places for pair lot_decimals: int # scaling decimal places for volume From 8a61211c8cf691748b7ae89771ba8c359a99b3d0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 Oct 2022 10:51:14 -0400 Subject: [PATCH 2/8] Handle brokerd errors even when no client-side-status found --- piker/clearing/_ems.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3bada0c3..d2febe4f 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -581,6 +581,7 @@ class Router(Struct): notify_on_headless: bool = True, ) -> bool: + # print(f'SUBSCRIBERS: {self.subscribers}') to_remove: set[tractor.MsgStream] = set() if sub_key == 'all': @@ -611,7 +612,8 @@ class Router(Struct): and notify_on_headless ): log.info( - 'No clients attached, firing notification for {sub_key} msg:\n' + 'No clients attached, ' + f'firing notification for {sub_key} msg:\n' f'{msg}' ) await notify_from_ems_status_msg( @@ -763,8 +765,8 @@ async def translate_and_relay_brokerd_events( 'oid': oid, # ems order-dialog id 'reqid': reqid, # brokerd generated order-request id 'symbol': sym, - } if status_msg := book._active.get(oid): - + }: + status_msg = book._active.get(oid) msg = BrokerdError(**brokerd_msg) log.error(fmsg) # XXX make one when it's blank? @@ -776,11 +778,21 @@ async def translate_and_relay_brokerd_events( # about. In most default situations, with composed orders # (ex. brackets), most brokers seem to use a oca policy. - status_msg.resp = 'error' - status_msg.brokerd_msg = msg - book._active[oid] = status_msg + # only relay to client side if we have an active + # ongoing dialog + if status_msg: + status_msg.resp = 'error' + status_msg.brokerd_msg = msg + book._active[oid] = status_msg - await router.client_broadcast(sym, status_msg) + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) + + else: + log.error(f'Error for unknown order flow:\n{msg}') + continue # BrokerdStatus case { @@ -1265,6 +1277,9 @@ async def process_client_order_cmds( status, ) + case _: + log.warning(f'Rx UNHANDLED order request {cmd}') + @acm async def maybe_open_trade_relays( From 30994dac106bb819c7e6fb20f08f408e43ce2d94 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 Oct 2022 14:44:59 -0400 Subject: [PATCH 3/8] Better handle order-cancelled-but-not-yet-acked races When the client is faster then a `brokerd` at submitting and cancelling an order we run into the case where we need to specify that the EMS cancels the order-flow as soon as the brokerd's ack arrives. Previously we were stashing a `BrokerdCancel` msg as the `Status.req` msg (to be both tested for as a "already cancelled" and sent immediately on ack arrival to the broker), but for such cases we can't use that msg to find the fqsn (since only the client side msgs have it defined) which is required by the new `Router.client_broadcast()`. So, Since `Status.req` is supposed to be a client-side flow msg anyway, and we need the fqsn for client broadcasting, we change this `.req` value to the client's submitted `Cancel` msg (thus rectifying the missing `Router.client_broadcast()` fqsn input issue) and build the `BrokerdCancel` request from that `Cancel` inline in the relay loop from the `.req: Cancel` status msg lookup. Further we allow `Cancel` msgs to define an `.account` and adjust the order mode loop to expect `Cancel` source requests in cancelled status updates. --- piker/clearing/_ems.py | 50 ++++++++++++++++++++++++++----------- piker/clearing/_messages.py | 1 + piker/ui/order_mode.py | 14 ++++++++--- 3 files changed, 46 insertions(+), 19 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index d2febe4f..136fbed5 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -56,7 +56,7 @@ from . import _paper_engine as paper from ._messages import ( Order, Status, - # Cancel, + Cancel, BrokerdCancel, BrokerdOrder, # BrokerdOrderAck, @@ -748,7 +748,21 @@ async def translate_and_relay_brokerd_events( # assign newly providerd broker backend request id # and tell broker to cancel immediately status_msg.reqid = reqid - await brokerd_trades_stream.send(req) + + # NOTE: as per comment in cancel-request-block + # above: This is an ack to + # a client-already-cancelled order request so we + # must immediately send a cancel to the brokerd upon + # rx of this ACK. + cancel_req = status_msg.req + await brokerd_trades_stream.send( + BrokerdCancel( + oid=oid, + reqid=reqid, + time_ns=time.time_ns(), + account=cancel_req.account, + ) + ) # 2. the order is now active and will be mirrored in # our book -> registered as live flow @@ -1051,13 +1065,8 @@ async def process_client_order_cmds( ): reqid = status.reqid order = status.req - to_brokerd_msg = BrokerdCancel( - oid=oid, - reqid=reqid, - time_ns=time.time_ns(), - # account=live_entry.account, - account=order.account, - ) + cancreq = status.req = Cancel(**cmd) + cancreq.account = order.account # NOTE: cancel response will be relayed back in messages # from corresponding broker @@ -1066,15 +1075,26 @@ async def process_client_order_cmds( log.info( f'Submitting cancel for live order {reqid}' ) - await brokerd_order_stream.send(to_brokerd_msg) + await brokerd_order_stream.send( + BrokerdCancel( + oid=oid, + reqid=reqid, + time_ns=time.time_ns(), + account=order.account, + ) + ) - else: + # else: # this might be a cancel for an order that hasn't been - # acked yet by a brokerd, so register a cancel for when - # the order ack does show up later such that the brokerd - # order request can be cancelled at that time. + # acked yet by a brokerd (so it's in the midst of + # being ACKed for submission but we don't have that + # confirmation yet). In this race case, save the + # client-side cancel request for when + # the ack does show up (later) such that the brokerd + # live-order can be cancelled immediately upon + # reception. # special case for now.. - status.req = to_brokerd_msg + # status.req = to_brokerd_msg # dark trigger cancel case { diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index af666f5a..fc3bd4d1 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -110,6 +110,7 @@ class Cancel(Struct): action: str = 'cancel' oid: str # uuid4 symbol: str + account: str = '' # -------------- diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 3cda56ff..1ea10e07 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -56,6 +56,7 @@ from ._position import ( from ._forms import FieldsForm from ._window import MultiStatus from ..clearing._messages import ( + Cancel, Order, Status, # BrokerdOrder, @@ -961,15 +962,20 @@ async def process_trade_msg( dialog: Dialog = mode.dialogs.get(oid) match msg: - case Status(resp='dark_open' | 'open'): + case Status( + resp='dark_open' | 'open', + ) if msg.req['action'] != 'cancel': order = Order(**msg.req) - if dialog is not None: + if ( + dialog is not None + and order.action != 'cancel' + ): # show line label once order is live mode.on_submit(oid, order=order) - else: + elif order.action != 'cancel': log.warning( f'received msg for untracked dialog:\n{fmsg}' ) @@ -1005,7 +1011,7 @@ async def process_trade_msg( case Status(resp='canceled'): # delete level line from view mode.on_cancel(oid) - req = Order(**msg.req) + req = Cancel(**msg.req) log.cancel(f'Canceled {req.action}:{oid}') case Status( From 9486d993cef7d1ea0e150d0dfe26954afd3c853d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 Oct 2022 14:59:36 -0400 Subject: [PATCH 4/8] Drop order mode settings change logmsgs to `.runtime` again --- piker/ui/_position.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 9ed5b7ec..f2ec1466 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -293,7 +293,7 @@ class SettingsPane: # don't log account "change" case since it'll be submitted # on every mouse interaction. - log.info(f'settings change: {key}: {value}') + log.runtime(f'settings change: {key}: {value}') # TODO: maybe return a diff of settings so if we can an error we # can have general input handling code to report it through the From e6dd1458f891ddde4a4f74101c2db35b8b79534f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 Oct 2022 15:00:23 -0400 Subject: [PATCH 5/8] `kraken`: the apiflows chain map needs a `dict` --- piker/brokers/kraken/broker.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index c1dd568b..c77f2140 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -795,7 +795,7 @@ async def handle_order_updates( # 'vol_exec': exec_vlm} # 0.0000 match update_msg: - # EMS-unknown live order that needs to be + # EMS-unknown LIVE order that needs to be # delivered and loaded on the client-side. case { 'userref': reqid, @@ -849,7 +849,7 @@ async def handle_order_updates( ), src='kraken', ) - apiflows[reqid].maps.append(status_msg) + apiflows[reqid].maps.append(status_msg.to_dict()) await ems_stream.send(status_msg) continue @@ -1104,7 +1104,6 @@ async def handle_order_updates( 'txid': [txid], }) case _: - log.warning(f'Unhandled trades update msg: {msg}') From fa368b1263cd3cd96040ec7ea43dfb69ea66052e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Oct 2022 15:10:48 -0400 Subject: [PATCH 6/8] 'Just getitem access the 'action' from req msg' --- piker/ui/order_mode.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 1ea10e07..f6fd7cb3 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -56,7 +56,7 @@ from ._position import ( from ._forms import FieldsForm from ._window import MultiStatus from ..clearing._messages import ( - Cancel, + # Cancel, Order, Status, # BrokerdOrder, @@ -1011,12 +1011,11 @@ async def process_trade_msg( case Status(resp='canceled'): # delete level line from view mode.on_cancel(oid) - req = Cancel(**msg.req) - log.cancel(f'Canceled {req.action}:{oid}') + log.cancel(f'Canceled {msg.req["action"]}:{oid}') case Status( resp='triggered', - # req=Order(exec_mode='dark') # TODO: + # req=Order(exec_mode='dark') # TODO: msgspec req={'exec_mode': 'dark'}, ): # TODO: UX for a "pending" clear/live order From 71a11a23bd17034315b8e8c59909e46e59d8abe5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Oct 2022 15:11:04 -0400 Subject: [PATCH 7/8] Add `Status.cancel_called: bool` This is a simpler (and oddly more `trio`-nic and/or SC) way to handle the cancelled-before-acked race for order dialogs. Will allow keeping the `.req` field as solely an `Order` msg. --- piker/clearing/_messages.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index fc3bd4d1..2c258625 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -110,7 +110,6 @@ class Cancel(Struct): action: str = 'cancel' oid: str # uuid4 symbol: str - account: str = '' # -------------- @@ -144,7 +143,7 @@ class Status(Struct): # (eg. the Order/Cancel which causes this msg) and # acts as a back-reference to the corresponding # request message which was the source of this msg. - req: Optional[Order | Cancel] = None + req: Order | None = None # XXX: better design/name here? # flag that can be set to indicate a message for an order @@ -153,6 +152,10 @@ class Status(Struct): # might want to "track" using piker UIs/systems). src: Optional[str] = None + # set when a cancel request msg was set for this order flow dialog + # but the brokerd dialog isn't yet in a cancelled state. + cancel_called: bool = False + # for relaying a boxed brokerd-dialog-side msg data "through" the # ems layer to clients. brokerd_msg: dict = {} From 8537a4091b3fc467c16f5416bc00cb91f13d7ded Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Oct 2022 15:37:24 -0400 Subject: [PATCH 8/8] Use new `Status.cancel_called` in EMS msg loops --- piker/clearing/_ems.py | 47 ++++++++++++++++++++---------------------- piker/ui/order_mode.py | 3 ++- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 136fbed5..1569429f 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -56,7 +56,6 @@ from . import _paper_engine as paper from ._messages import ( Order, Status, - Cancel, BrokerdCancel, BrokerdOrder, # BrokerdOrderAck, @@ -743,8 +742,7 @@ async def translate_and_relay_brokerd_events( log.warning(f'Rx Ack for closed/unknown order?: {oid}') continue - req = status_msg.req - if req and req.action == 'cancel': + if status_msg.cancel_called: # assign newly providerd broker backend request id # and tell broker to cancel immediately status_msg.reqid = reqid @@ -754,13 +752,12 @@ async def translate_and_relay_brokerd_events( # a client-already-cancelled order request so we # must immediately send a cancel to the brokerd upon # rx of this ACK. - cancel_req = status_msg.req await brokerd_trades_stream.send( BrokerdCancel( oid=oid, reqid=reqid, time_ns=time.time_ns(), - account=cancel_req.account, + account=status_msg.req.account, ) ) @@ -1055,18 +1052,30 @@ async def process_client_order_cmds( status = dark_book._active.get(oid) match cmd: - # existing live-broker order cancel + # existing LIVE CANCEL case { 'action': 'cancel', 'oid': oid, } if ( status - and status.resp in ('open', 'pending') + and status.resp in ( + 'open', + 'pending', + ) ): reqid = status.reqid order = status.req - cancreq = status.req = Cancel(**cmd) - cancreq.account = order.account + + # XXX: cancelled-before-ack race case. + # This might be a cancel for an order that hasn't been + # acked yet by a brokerd (so it's in the midst of being + # ``BrokerdAck``ed for submission but we don't have that + # confirmation response back yet). Set this client-side + # msg state so when the ack does show up (later) + # logic in ``translate_and_relay_brokerd_events()`` can + # forward the cancel request to the `brokerd` side of + # the order flow ASAP. + status.cancel_called = True # NOTE: cancel response will be relayed back in messages # from corresponding broker @@ -1084,19 +1093,7 @@ async def process_client_order_cmds( ) ) - # else: - # this might be a cancel for an order that hasn't been - # acked yet by a brokerd (so it's in the midst of - # being ACKed for submission but we don't have that - # confirmation yet). In this race case, save the - # client-side cancel request for when - # the ack does show up (later) such that the brokerd - # live-order can be cancelled immediately upon - # reception. - # special case for now.. - # status.req = to_brokerd_msg - - # dark trigger cancel + # DARK trigger CANCEL case { 'action': 'cancel', 'oid': oid, @@ -1135,9 +1132,9 @@ async def process_client_order_cmds( # TODO: eventually we should be receiving # this struct on the wire unpacked in a scoped protocol - # setup with ``tractor``. + # setup with ``tractor`` using ``msgspec``. - # live order submission + # LIVE order REQUEST case { 'oid': oid, 'symbol': fqsn, @@ -1207,7 +1204,7 @@ async def process_client_order_cmds( # that live order asap. # dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) - # dark-order / alert submission + # DARK-order / alert REQUEST case { 'oid': oid, 'symbol': fqsn, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index f6fd7cb3..0b9558d7 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -1011,7 +1011,8 @@ async def process_trade_msg( case Status(resp='canceled'): # delete level line from view mode.on_cancel(oid) - log.cancel(f'Canceled {msg.req["action"]}:{oid}') + action = msg.req["action"] + log.cancel(f'Canceled {action}:{oid}') case Status( resp='triggered',