From 562d027ee6adc1d8f2b09ca6a2d8d23076eee142 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Aug 2023 21:43:38 -0400 Subject: [PATCH] Relay brokerd errors to client side, correctly.. Turns out we were expecting/processing `Status(resp='error')` msgs not `BrokerdError` (i guess bc latter was only really being used in initial `brokerd` msg responses and not for relay of actual provider clearing engine failures?) and the case block match / logic wasn't really correct. So this changes a few things: - always do reverse `oid` lookups from `reqid`s if possible in error msg handling case. - add a new `Error` client-dialog msg (derived from `Status`) which we now relay when `brokerd` sends a `BrokerdError` and no prior `Status` can be found (when it is we still fill in appropriate fields from the backend-error and just send back the last status msg like before). - try hard to look up the original `Order.symbol: str` for client broadcasting trying first using any `Status.req` and failing over to embedded `.brokerd_msg` field lookups. - drop the `Status.name = 'error'` from literal def. --- piker/clearing/_ems.py | 122 ++++++++++++++++++++---------------- piker/clearing/_messages.py | 49 ++++++--------- 2 files changed, 88 insertions(+), 83 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 9e30351e..2cb3f9d3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -27,7 +27,7 @@ from contextlib import asynccontextmanager as acm from decimal import Decimal from math import isnan from pprint import pformat -import time +from time import time_ns from types import ModuleType from typing import ( AsyncIterator, @@ -57,6 +57,7 @@ from ..data import iterticks from ._messages import ( Order, Status, + Error, BrokerdCancel, BrokerdOrder, # BrokerdOrderAck, @@ -255,7 +256,7 @@ async def clear_dark_triggers( action=action, oid=oid, account=account, - time_ns=time.time_ns(), + time_ns=time_ns(), symbol=bfqme, price=submit_price, size=size, @@ -268,7 +269,7 @@ async def clear_dark_triggers( # fallthrough logic status = Status( oid=oid, # ems dialog id - time_ns=time.time_ns(), + time_ns=time_ns(), resp=resp, req=cmd, brokerd_msg=brokerd_msg, @@ -826,8 +827,8 @@ async def translate_and_relay_brokerd_events( # keep pps per account up to date locally in ``emsd`` mem # sym, broker = pos_msg.symbol, pos_msg.broker + # NOTE: translate to a FQME! relay.positions.setdefault( - # NOTE: translate to a FQSN! (broker, pos_msg.account), {} )[pos_msg.symbol] = pos_msg @@ -883,7 +884,7 @@ async def translate_and_relay_brokerd_events( BrokerdCancel( oid=oid, reqid=reqid, - time_ns=time.time_ns(), + time_ns=time_ns(), account=status_msg.req.account, ) ) @@ -898,38 +899,63 @@ async def translate_and_relay_brokerd_events( continue # BrokerdError + # TODO: figure out how this will interact with EMS clients + # for ex. on an error do we react with a dark orders + # management response, like cancelling all dark orders? + # This looks like a supervision policy for pending orders on + # some unexpected failure - something we need to think more + # about. In most default situations, with composed orders + # (ex. brackets), most brokers seem to use a oca policy. case { 'name': 'error', 'oid': oid, # ems order-dialog id 'reqid': reqid, # brokerd generated order-request id }: - status_msg = book._active.get(oid) + if ( + not oid + ): + oid: str = book._ems2brokerd_ids.inverse[reqid] + msg = BrokerdError(**brokerd_msg) - log.error(fmsg) # XXX make one when it's blank? - # TODO: figure out how this will interact with EMS clients - # for ex. on an error do we react with a dark orders - # management response, like cancelling all dark orders? - # This looks like a supervision policy for pending orders on - # some unexpected failure - something we need to think more - # about. In most default situations, with composed orders - # (ex. brackets), most brokers seem to use a oca policy. - - # only relay to client side if we have an active - # ongoing dialog - if status_msg: + # NOTE: retreive the last client-side response + # OR create an error when we have no last msg /dialog + # on record + status_msg: Status + if not (status_msg := book._active.get(oid)): + status_msg = Error( + time_ns=time_ns(), + oid=oid, + reqid=reqid, + brokerd_msg=msg, + ) + else: + # only modify last status if we have an active + # ongoing dialog.. status_msg.resp = 'error' status_msg.brokerd_msg = msg - book._active[oid] = status_msg - await router.client_broadcast( - status_msg.req.symbol, - status_msg, + book._active[oid] = status_msg + + log.error( + 'Translating brokerd error to status:\n' + f'{fmsg}' + f'{status_msg.to_dict()}' + ) + if req := status_msg.req: + fqme: str = req.symbol + else: + bdmsg: Struct = status_msg.brokerd_msg + fqme: str = ( + bdmsg.symbol # might be None + or + bdmsg.broker_details['flow']['symbol'] ) - else: - log.error(f'Error for unknown order flow:\n{msg}') - continue + await router.client_broadcast( + fqme, + status_msg, + ) # BrokerdStatus case { @@ -1070,7 +1096,7 @@ async def translate_and_relay_brokerd_events( status_msg.req = order assert status_msg.src # source tag? - oid = str(status_msg.reqid) + oid: str = str(status_msg.reqid) # attempt to avoid collisions status_msg.reqid = oid @@ -1087,38 +1113,28 @@ async def translate_and_relay_brokerd_events( status_msg, ) - # don't fall through - continue - - # brokerd error - case { - 'name': 'status', - 'status': 'error', - }: - log.error(f'Broker error:\n{fmsg}') - # XXX: we presume the brokerd cancels its own order - continue - # TOO FAST ``BrokerdStatus`` that arrives # before the ``BrokerdAck``. + # NOTE XXX: sometimes there is a race with the backend (like + # `ib` where the pending status will be relayed *before* + # the ack msg, in which case we just ignore the faster + # pending msg and wait for our expected ack to arrive + # later (i.e. the first block below should enter). case { - # XXX: sometimes there is a race with the backend (like - # `ib` where the pending stauts will be related before - # the ack, in which case we just ignore the faster - # pending msg and wait for our expected ack to arrive - # later (i.e. the first block below should enter). 'name': 'status', 'status': status, 'reqid': reqid, }: - oid = book._ems2brokerd_ids.inverse.get(reqid) - msg = f'Unhandled broker status for dialog {reqid}:\n' - if oid: - status_msg = book._active.get(oid) - # status msg may not have been set yet or popped? + msg = ( + f'Unhandled broker status for dialog {reqid}:\n' + f'{pformat(brokerd_msg)}' + ) + if ( + oid := book._ems2brokerd_ids.inverse.get(reqid) + ): # NOTE: have seen a key error here on kraken # clearable limits.. - if status_msg: + if status_msg := book._active.get(oid): msg += ( f'last status msg: {pformat(status_msg)}\n\n' f'this msg:{fmsg}\n' @@ -1214,7 +1230,7 @@ async def process_client_order_cmds( BrokerdCancel( oid=oid, reqid=reqid, - time_ns=time.time_ns(), + time_ns=time_ns(), account=order.account, ) ) @@ -1289,7 +1305,7 @@ async def process_client_order_cmds( msg = BrokerdOrder( oid=oid, # no ib support for oids... - time_ns=time.time_ns(), + time_ns=time_ns(), # if this is None, creates a new order # otherwise will modify any existing one @@ -1307,7 +1323,7 @@ async def process_client_order_cmds( oid=oid, reqid=reqid, resp='pending', - time_ns=time.time_ns(), + time_ns=time_ns(), brokerd_msg=msg, req=req, ) @@ -1424,7 +1440,7 @@ async def process_client_order_cmds( status = Status( resp=resp, oid=oid, - time_ns=time.time_ns(), + time_ns=time_ns(), req=req, src='dark', ) diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index 6e44969e..51a3860c 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -18,10 +18,7 @@ Clearing sub-system message and protocols. """ -# from collections import ( -# ChainMap, -# deque, -# ) +from __future__ import annotations from typing import ( Literal, ) @@ -31,28 +28,6 @@ from msgspec import field from piker.types import Struct -# TODO: a composite for tracking msg flow on 2-legged -# dialogs. -# class Dialog(ChainMap): -# ''' -# Msg collection abstraction to easily track the state changes of -# a msg flow in one high level, query-able and immutable construct. - -# The main use case is to query data from a (long-running) -# msg-transaction-sequence - - -# ''' -# def update( -# self, -# msg, -# ) -> None: -# self.maps.insert(0, msg.to_dict()) - -# def flatten(self) -> dict: -# return dict(self) - - # TODO: ``msgspec`` stuff worth paying attention to: # - schema evolution: # https://jcristharif.com/msgspec/usage.html#schema-evolution @@ -163,6 +138,18 @@ class Status(Struct): brokerd_msg: dict = {} +class Error(Status): + resp: str = 'error' + + # TODO: allow re-wrapping from existing (last) status? + @classmethod + def from_status( + cls, + msg: Status, + ) -> Error: + ... + + # --------------- # emsd -> brokerd # --------------- @@ -226,6 +213,7 @@ class BrokerdOrderAck(Struct): # emsd id originally sent in matching request msg oid: str + # TODO: do we need this? account: str = '' name: str = 'ack' @@ -238,13 +226,14 @@ class BrokerdStatus(Struct): 'open', 'canceled', 'pending', - 'error', + # 'error', # NOTE: use `BrokerdError` 'closed', ] + name: str = 'status' + oid: str = '' # TODO: do we need this? account: str | None = None, - name: str = 'status' filled: float = 0.0 reason: str = '' remaining: float = 0.0 @@ -287,15 +276,15 @@ class BrokerdError(Struct): This is still a TODO thing since we're not sure how to employ it yet. ''' - oid: str reason: str # TODO: drop this right? symbol: str | None = None + oid: str | None = None # if no brokerd order request was actually submitted (eg. we errored # at the ``pikerd`` layer) then there will be ``reqid`` allocated. - reqid: int | str | None = None + reqid: str | None = None name: str = 'error' broker_details: dict = {}