Relay brokerd errors to client side, correctly..

Turns out we were expecting/processing `Status(resp='error')` msgs not
`BrokerdError` (i guess bc latter was only really being used in initial
`brokerd` msg responses and not for relay of actual provider clearing
engine failures?) and the case block match / logic wasn't really
correct. So this changes a few things:

- always do reverse `oid` lookups from `reqid`s if possible in error msg
  handling case.
- add a new `Error` client-dialog msg (derived from `Status`) which we
  now relay when `brokerd` sends a `BrokerdError` and no prior `Status`
  can be found (when it is we still fill in appropriate fields from the
  backend-error and just send back the last status msg like before).
- try hard to look up the original `Order.symbol: str` for client
  broadcasting trying first using any `Status.req` and failing over to
  embedded `.brokerd_msg` field lookups.
- drop the `Status.name = 'error'` from literal def.
account_tests
Tyler Goodlet 2023-08-09 21:43:38 -04:00
parent ff2bbd5aca
commit 562d027ee6
2 changed files with 88 additions and 83 deletions

View File

@ -27,7 +27,7 @@ from contextlib import asynccontextmanager as acm
from decimal import Decimal from decimal import Decimal
from math import isnan from math import isnan
from pprint import pformat from pprint import pformat
import time from time import time_ns
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
AsyncIterator, AsyncIterator,
@ -57,6 +57,7 @@ from ..data import iterticks
from ._messages import ( from ._messages import (
Order, Order,
Status, Status,
Error,
BrokerdCancel, BrokerdCancel,
BrokerdOrder, BrokerdOrder,
# BrokerdOrderAck, # BrokerdOrderAck,
@ -255,7 +256,7 @@ async def clear_dark_triggers(
action=action, action=action,
oid=oid, oid=oid,
account=account, account=account,
time_ns=time.time_ns(), time_ns=time_ns(),
symbol=bfqme, symbol=bfqme,
price=submit_price, price=submit_price,
size=size, size=size,
@ -268,7 +269,7 @@ async def clear_dark_triggers(
# fallthrough logic # fallthrough logic
status = Status( status = Status(
oid=oid, # ems dialog id oid=oid, # ems dialog id
time_ns=time.time_ns(), time_ns=time_ns(),
resp=resp, resp=resp,
req=cmd, req=cmd,
brokerd_msg=brokerd_msg, brokerd_msg=brokerd_msg,
@ -826,8 +827,8 @@ async def translate_and_relay_brokerd_events(
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
# sym, broker = pos_msg.symbol, pos_msg.broker # sym, broker = pos_msg.symbol, pos_msg.broker
# NOTE: translate to a FQME!
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN!
(broker, pos_msg.account), (broker, pos_msg.account),
{} {}
)[pos_msg.symbol] = pos_msg )[pos_msg.symbol] = pos_msg
@ -883,7 +884,7 @@ async def translate_and_relay_brokerd_events(
BrokerdCancel( BrokerdCancel(
oid=oid, oid=oid,
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time_ns(),
account=status_msg.req.account, account=status_msg.req.account,
) )
) )
@ -898,15 +899,6 @@ async def translate_and_relay_brokerd_events(
continue continue
# BrokerdError # BrokerdError
case {
'name': 'error',
'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id
}:
status_msg = book._active.get(oid)
msg = BrokerdError(**brokerd_msg)
log.error(fmsg) # XXX make one when it's blank?
# TODO: figure out how this will interact with EMS clients # TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders # for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders? # management response, like cancelling all dark orders?
@ -914,22 +906,56 @@ async def translate_and_relay_brokerd_events(
# some unexpected failure - something we need to think more # some unexpected failure - something we need to think more
# about. In most default situations, with composed orders # about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy. # (ex. brackets), most brokers seem to use a oca policy.
case {
'name': 'error',
'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id
}:
if (
not oid
):
oid: str = book._ems2brokerd_ids.inverse[reqid]
# only relay to client side if we have an active msg = BrokerdError(**brokerd_msg)
# ongoing dialog
if status_msg: # NOTE: retreive the last client-side response
# OR create an error when we have no last msg /dialog
# on record
status_msg: Status
if not (status_msg := book._active.get(oid)):
status_msg = Error(
time_ns=time_ns(),
oid=oid,
reqid=reqid,
brokerd_msg=msg,
)
else:
# only modify last status if we have an active
# ongoing dialog..
status_msg.resp = 'error' status_msg.resp = 'error'
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
book._active[oid] = status_msg book._active[oid] = status_msg
await router.client_broadcast( log.error(
status_msg.req.symbol, 'Translating brokerd error to status:\n'
status_msg, f'{fmsg}'
f'{status_msg.to_dict()}'
)
if req := status_msg.req:
fqme: str = req.symbol
else:
bdmsg: Struct = status_msg.brokerd_msg
fqme: str = (
bdmsg.symbol # might be None
or
bdmsg.broker_details['flow']['symbol']
) )
else: await router.client_broadcast(
log.error(f'Error for unknown order flow:\n{msg}') fqme,
continue status_msg,
)
# BrokerdStatus # BrokerdStatus
case { case {
@ -1070,7 +1096,7 @@ async def translate_and_relay_brokerd_events(
status_msg.req = order status_msg.req = order
assert status_msg.src # source tag? assert status_msg.src # source tag?
oid = str(status_msg.reqid) oid: str = str(status_msg.reqid)
# attempt to avoid collisions # attempt to avoid collisions
status_msg.reqid = oid status_msg.reqid = oid
@ -1087,38 +1113,28 @@ async def translate_and_relay_brokerd_events(
status_msg, status_msg,
) )
# don't fall through
continue
# brokerd error
case {
'name': 'status',
'status': 'error',
}:
log.error(f'Broker error:\n{fmsg}')
# XXX: we presume the brokerd cancels its own order
continue
# TOO FAST ``BrokerdStatus`` that arrives # TOO FAST ``BrokerdStatus`` that arrives
# before the ``BrokerdAck``. # before the ``BrokerdAck``.
case { # NOTE XXX: sometimes there is a race with the backend (like
# XXX: sometimes there is a race with the backend (like # `ib` where the pending status will be relayed *before*
# `ib` where the pending stauts will be related before # the ack msg, in which case we just ignore the faster
# the ack, in which case we just ignore the faster
# pending msg and wait for our expected ack to arrive # pending msg and wait for our expected ack to arrive
# later (i.e. the first block below should enter). # later (i.e. the first block below should enter).
case {
'name': 'status', 'name': 'status',
'status': status, 'status': status,
'reqid': reqid, 'reqid': reqid,
}: }:
oid = book._ems2brokerd_ids.inverse.get(reqid) msg = (
msg = f'Unhandled broker status for dialog {reqid}:\n' f'Unhandled broker status for dialog {reqid}:\n'
if oid: f'{pformat(brokerd_msg)}'
status_msg = book._active.get(oid) )
# status msg may not have been set yet or popped? if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# NOTE: have seen a key error here on kraken # NOTE: have seen a key error here on kraken
# clearable limits.. # clearable limits..
if status_msg: if status_msg := book._active.get(oid):
msg += ( msg += (
f'last status msg: {pformat(status_msg)}\n\n' f'last status msg: {pformat(status_msg)}\n\n'
f'this msg:{fmsg}\n' f'this msg:{fmsg}\n'
@ -1214,7 +1230,7 @@ async def process_client_order_cmds(
BrokerdCancel( BrokerdCancel(
oid=oid, oid=oid,
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time_ns(),
account=order.account, account=order.account,
) )
) )
@ -1289,7 +1305,7 @@ async def process_client_order_cmds(
msg = BrokerdOrder( msg = BrokerdOrder(
oid=oid, # no ib support for oids... oid=oid, # no ib support for oids...
time_ns=time.time_ns(), time_ns=time_ns(),
# if this is None, creates a new order # if this is None, creates a new order
# otherwise will modify any existing one # otherwise will modify any existing one
@ -1307,7 +1323,7 @@ async def process_client_order_cmds(
oid=oid, oid=oid,
reqid=reqid, reqid=reqid,
resp='pending', resp='pending',
time_ns=time.time_ns(), time_ns=time_ns(),
brokerd_msg=msg, brokerd_msg=msg,
req=req, req=req,
) )
@ -1424,7 +1440,7 @@ async def process_client_order_cmds(
status = Status( status = Status(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time_ns(),
req=req, req=req,
src='dark', src='dark',
) )

View File

@ -18,10 +18,7 @@
Clearing sub-system message and protocols. Clearing sub-system message and protocols.
""" """
# from collections import ( from __future__ import annotations
# ChainMap,
# deque,
# )
from typing import ( from typing import (
Literal, Literal,
) )
@ -31,28 +28,6 @@ from msgspec import field
from piker.types import Struct from piker.types import Struct
# TODO: a composite for tracking msg flow on 2-legged
# dialogs.
# class Dialog(ChainMap):
# '''
# Msg collection abstraction to easily track the state changes of
# a msg flow in one high level, query-able and immutable construct.
# The main use case is to query data from a (long-running)
# msg-transaction-sequence
# '''
# def update(
# self,
# msg,
# ) -> None:
# self.maps.insert(0, msg.to_dict())
# def flatten(self) -> dict:
# return dict(self)
# TODO: ``msgspec`` stuff worth paying attention to: # TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: # - schema evolution:
# https://jcristharif.com/msgspec/usage.html#schema-evolution # https://jcristharif.com/msgspec/usage.html#schema-evolution
@ -163,6 +138,18 @@ class Status(Struct):
brokerd_msg: dict = {} brokerd_msg: dict = {}
class Error(Status):
resp: str = 'error'
# TODO: allow re-wrapping from existing (last) status?
@classmethod
def from_status(
cls,
msg: Status,
) -> Error:
...
# --------------- # ---------------
# emsd -> brokerd # emsd -> brokerd
# --------------- # ---------------
@ -226,6 +213,7 @@ class BrokerdOrderAck(Struct):
# emsd id originally sent in matching request msg # emsd id originally sent in matching request msg
oid: str oid: str
# TODO: do we need this?
account: str = '' account: str = ''
name: str = 'ack' name: str = 'ack'
@ -238,13 +226,14 @@ class BrokerdStatus(Struct):
'open', 'open',
'canceled', 'canceled',
'pending', 'pending',
'error', # 'error', # NOTE: use `BrokerdError`
'closed', 'closed',
] ]
name: str = 'status'
oid: str = ''
# TODO: do we need this? # TODO: do we need this?
account: str | None = None, account: str | None = None,
name: str = 'status'
filled: float = 0.0 filled: float = 0.0
reason: str = '' reason: str = ''
remaining: float = 0.0 remaining: float = 0.0
@ -287,15 +276,15 @@ class BrokerdError(Struct):
This is still a TODO thing since we're not sure how to employ it yet. This is still a TODO thing since we're not sure how to employ it yet.
''' '''
oid: str
reason: str reason: str
# TODO: drop this right? # TODO: drop this right?
symbol: str | None = None symbol: str | None = None
oid: str | None = None
# if no brokerd order request was actually submitted (eg. we errored # if no brokerd order request was actually submitted (eg. we errored
# at the ``pikerd`` layer) then there will be ``reqid`` allocated. # at the ``pikerd`` layer) then there will be ``reqid`` allocated.
reqid: int | str | None = None reqid: str | None = None
name: str = 'error' name: str = 'error'
broker_details: dict = {} broker_details: dict = {}