Add full EMS order-dialog (re-)load support!

This includes darks, lives and alerts with all connecting clients
being broadcast all existing order-flow dialog states. Obviously
for now darks and alerts only live as long as the `emsd` actor lifetime
(though we will store these in local state eventually) and "live" orders
have lifetimes managed by their respective backend broker.

The details of this change-set is extensive, so here we go..

Messaging schema:
- change the messaging `Status` status-key set to:
  `resp: Literal['pending', 'open', 'dark_open', 'triggered',
                'closed',  'fill', 'canceled', 'error']`

  which better reflects the semantics of order lifetimes and was
  partially inspired by the status keys `kraken` provides for their
  order-entry API. The prior key set was based on `ib`'s horrible
  semantics which sound like they're right out of the 80s..
  Also, we reflect this same set in the `BrokerdStatus` msg and likely
  we'll just get rid of the separate brokerd-dialog side type
  eventually.
- use `Literal` type annots for statuses where applicable and as they
  are supported by `msgspec`.
- add additional optional `Status` fields:
  -`req: Order` to allow each status msg to optionally ref its
    commanding order-request msg allowing at least a request-response
    style implicit tracing in all response msgs.
  -`src: str` tag string to show the source of the msg.
  -`reqid: str | int` such that the ems can relay the `brokerd`
    request id both to the client side and have one spot to look
    up prior status msgs and
- draft a (unused/commented) `Dialog` type which can be eventually used
  at all EMS endpoints to track msg-flow states

EMS engine adjustments/rework:
- use the new status key set throughout and expect `BrokerdStatus` msgs
  to use the same new schema as `Status`.
- add a `_DarkBook._active: dict[str, Status]` table which is now used for
  all per-leg-dialog associations and order flow state tracking
  allowing for the both the brokerd-relay and client-request handler loops
  to read/write the same msg-table and provides for delivering
  the overall EMS-active-orders state to newly/re-connecting clients
  with minimal processing; this table replaces what the `._ems_entries`
  table from prior.
- add `Router.client_broadcast()` to send a msg to all currently
  connected peers.
- a variety of msg handler block logic tweaks including more `case:`
  blocks to be both flatter and improve explicitness:
  - for the relay loop move all `Status` msg update and sending to
    within each block instead of a fallthrough case plus hard-to-follow
    state logic.
  - add a specific case for unhandled backend status keys and just log
    them.
  - pop alerts from `._active` immediately once triggered.
  - where possible mutate status msgs fields over instantiating new
    ones.
- insert and expect `Order` instances in the dark clearing loop and
  adjust `case:` blocks accordingly.
- tag `dark_open` and `triggered` statuses as sourced from the ems.
- drop all the `ChainMap` stuff for now; we're going to make our own
  `Dialog` type for this purpose..

Order mode rework:
- always parse the `Status` msg and use match syntax cases with object
  patterns, hackily assign the `.req` in many blocks to work around not
  yet having proper on-the-wire decoding yet.
- make `.load_unknown_dialog_from_msg()` expect a `Status` with boxed
  `.req: Order` as input.
- change `OrderDialog` -> `Dialog` in prep for a general purpose type
  of the same name.

`ib` backend order loading support:
- do "closed" status detection inside the msg-relay loop instead
  of expecting the ems to do this..
- add an attempt to cancel inactive orders by scheduling cancel
  submissions continually (no idea if this works).
- add a status map to go from the 80s keys to our new set.
- deliver `Status` msgs with an embedded `Order` for existing live order
  loading and make sure to try an get the source exchange info (instead
  of SMART).

Paper engine ported to match:
- use new status keys in `BrokerdStatus` msgs
- use `match:` syntax in request handler loop
open_order_loading
Tyler Goodlet 2022-08-10 00:16:08 -04:00
parent 87ed9abefa
commit 7fa9dbf869
5 changed files with 605 additions and 554 deletions

View File

@ -60,6 +60,8 @@ from piker.pp import (
)
from piker.log import get_console_log
from piker.clearing._messages import (
Order,
Status,
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
@ -184,7 +186,7 @@ async def handle_order_requests(
)
)
elif action == 'cancel':
if action == 'cancel':
msg = BrokerdCancel(**request_msg)
client.submit_cancel(reqid=int(msg.reqid))
@ -491,43 +493,43 @@ async def trades_dialogue(
order = trade.order
quant = trade.order.totalQuantity
action = order.action.lower()
size = {
'SELL': -1,
'BUY': 1,
}[order.action] * quant
fqsn, _ = con2fqsn(trade.contract)
'sell': -1,
'buy': 1,
}[action] * quant
con = trade.contract
# TODO: in the case of the SMART venue (aka ib's
# router-clearing sys) we probably should handle
# showing such orders overtop of the fqsn for the
# primary exchange, how to map this easily is going
# to be a bit tricky though?
deats = await proxy.con_deats(contracts=[con])
fqsn = list(deats)[0]
reqid = order.orderId
# TODO: maybe embed a ``BrokerdOrder`` instead
# since then we can directly load it on the client
# side in the order mode loop?
msg = BrokerdStatus(
msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=str(reqid),
reqid=reqid,
time_ns=(ts := time.time_ns()),
status='submitted',
account=accounts_def.inverse[order.account],
filled=0,
reason='Existing live order',
# this seems to not be necessarily up to date in
# the execDetails event.. so we have to send it
# here I guess?
remaining=quant,
broker_details={
'name': 'ib',
'fqsn': fqsn,
# this is a embedded/boxed order
# msg that can be loaded by the ems
# and for relay to clients.
'order': BrokerdOrder(
# embedded order info
req=Order(
action=action,
exec_mode='live',
oid=str(reqid),
symbol=fqsn,
account=accounts_def.inverse[order.account],
oid=reqid,
time_ns=ts,
size=size,
price=order.lmtPrice,
size=size,
),
},
src='ib',
)
order_msgs.append(msg)
@ -686,6 +688,7 @@ async def trades_dialogue(
# allocate event relay tasks for each client connection
n.start_soon(
deliver_trade_events,
n,
trade_event_stream,
ems_stream,
accounts_def,
@ -779,6 +782,7 @@ _statuses: dict[str, str] = {
async def deliver_trade_events(
nurse: trio.Nursery,
trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream,
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
@ -834,14 +838,35 @@ async def deliver_trade_events(
# unwrap needed data from ib_insync internal types
trade: Trade = item
status: OrderStatus = trade.orderStatus
status_key = status.status.lower()
ib_status_key = status.status.lower()
acctid = accounts_def.inverse[trade.order.account]
# double check there is no error when
# cancelling.. gawwwd
if status_key == 'cancelled':
if ib_status_key == 'cancelled':
last_log = trade.log[-1]
if last_log.message:
status_key = trade.log[-2].status
ib_status_key = trade.log[-2].status
elif ib_status_key == 'inactive':
async def sched_cancel():
log.warning(
'OH GAWD an inactive order..scheduling a cancel\n'
f'{pformat(item)}'
)
proxy = proxies[acctid]
await proxy.submit_cancel(reqid=trade.order.orderId)
await trio.sleep(1)
nurse.start_soon(sched_cancel)
nurse.start_soon(sched_cancel)
status_key = _statuses.get(ib_status_key) or ib_status_key
remaining = status.remaining
if remaining == 0:
status_key = 'closed'
# skip duplicate filled updates - we get the deats
# from the execution details event
@ -859,7 +884,7 @@ async def deliver_trade_events(
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
remaining=status.remaining,
remaining=remaining,
broker_details={'name': 'ib'},
)
@ -1002,9 +1027,8 @@ async def deliver_trade_events(
cid, msg = pack_position(item)
log.info(f'New IB position msg: {msg}')
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg)
continue
case 'event':

View File

@ -27,6 +27,7 @@ from typing import (
AsyncIterator,
Any,
Callable,
Optional,
)
from bidict import bidict
@ -43,9 +44,10 @@ from . import _paper_engine as paper
from ._messages import (
Order,
Status,
# Cancel,
BrokerdCancel,
BrokerdOrder,
BrokerdOrderAck,
# BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdError,
@ -130,6 +132,7 @@ class _DarkBook(Struct):
] = {}
# _ems_entries: dict[str, str] = {}
_active: dict = {}
# mapping of ems dialog ids to msg flow history
_msgflows: defaultdict[
@ -192,6 +195,7 @@ async def clear_dark_triggers(
for oid, (
pred,
tf,
# TODO: send this msg instead?
cmd,
percent_away,
abs_diff_away
@ -211,30 +215,29 @@ async def clear_dark_triggers(
# majority of iterations will be non-matches
continue
brokerd_msg: Optional[BrokerdOrder] = None
match cmd:
# alert: nothing to do but relay a status
# back to the requesting ems client
case {
'action': 'alert',
}:
resp = 'alert_triggered'
case Order(action='alert'):
resp = 'triggered'
# executable order submission
case {
'action': action,
'symbol': symbol,
'account': account,
'size': size,
}:
case Order(
action=action,
symbol=symbol,
account=account,
size=size,
):
bfqsn: str = symbol.replace(f'.{broker}', '')
submit_price = price + abs_diff_away
resp = 'dark_triggered' # hidden on client-side
resp = 'triggered' # hidden on client-side
log.info(
f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}')
live_req = BrokerdOrder(
brokerd_msg = BrokerdOrder(
action=action,
oid=oid,
account=account,
@ -243,7 +246,8 @@ async def clear_dark_triggers(
price=submit_price,
size=size,
)
await brokerd_orders_stream.send(live_req)
await brokerd_orders_stream.send(brokerd_msg)
# mark this entry as having sent an order
# request. the entry will be replaced once the
@ -252,18 +256,18 @@ async def clear_dark_triggers(
# allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems.
# book._ems_entries[oid] = live_req
book._msgflows[oid].append(live_req)
# book._msgflows[oid].maps.insert(0, live_req)
case _:
raise ValueError(f'Invalid dark book entry: {cmd}')
# fallthrough logic
resp = Status(
status = Status(
oid=oid, # ems dialog id
time_ns=time.time_ns(),
resp=resp,
trigger_price=price,
brokerd_msg=cmd,
req=cmd,
brokerd_msg=brokerd_msg,
)
# remove exec-condition from set
@ -274,9 +278,18 @@ async def clear_dark_triggers(
f'pred for {oid} was already removed!?'
)
# update actives
if cmd.action == 'alert':
# don't register the alert status (so it won't
# be reloaded by clients) since it's now
# complete / closed.
book._active.pop(oid)
else:
book._active[oid] = status
# send response to client-side
try:
await ems_client_order_stream.send(resp)
await ems_client_order_stream.send(status)
except (
trio.ClosedResourceError,
):
@ -396,6 +409,22 @@ class Router(Struct):
relay.consumers -= 1
async def client_broadcast(
self,
msg: dict,
) -> None:
for client_stream in self.clients.copy():
try:
await client_stream.send(msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
self.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
_router: Router = None
@ -570,8 +599,7 @@ async def translate_and_relay_brokerd_events(
broker ems
'error' -> log it locally (for now)
'status' -> relabel as 'broker_<status>', if complete send 'executed'
'fill' -> 'broker_filled'
('status' | 'fill'} -> relayed through see ``Status`` msg type.
Currently handled status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
@ -610,31 +638,16 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(pos_msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
await router.client_broadcast(pos_msg)
continue
# BrokerdOrderAck
# initial response to brokerd order request
case {
'name': 'ack',
'reqid': reqid, # brokerd generated order-request id
'oid': oid, # ems order-dialog id
} if (
# entry := book._ems_entries.get(oid)
flow := book._msgflows.get(oid)
):
# initial response to brokerd order request
# if name == 'ack':
}:
# register the brokerd request id (that was generated
# / created internally by the broker backend) with our
# local ems order id for reverse lookup later.
@ -649,31 +662,24 @@ async def translate_and_relay_brokerd_events(
# new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases:
# - the order has previously been requested to be
# 1. the order has previously been requested to be
# cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
action = flow.get('action')
# action = getattr(entry, 'action', None)
if action and action == 'cancel':
# status = book._active.get(oid)
status = book._active[oid]
req = status.req
if req and req.action == 'cancel':
# assign newly providerd broker backend request id
flow['reqid'] = reqid
# entry.reqid = reqid
# and tell broker to cancel immediately
status.reqid = reqid
await brokerd_trades_stream.send(req)
entry = flow.maps[0]
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# 2. the order is now active and will be mirrored in
# our book -> registered as live flow
else:
# update the flow with the ack msg
# book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
flow.maps.insert(
0,
BrokerdOrderAck(**brokerd_msg).to_dict()
)
# TODO: should we relay this ack state?
status.resp = 'pending'
# no msg to client necessary
continue
@ -684,13 +690,10 @@ async def translate_and_relay_brokerd_events(
'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id
'symbol': sym,
'broker_details': details,
# 'reason': reason,
}:
} if status_msg := book._active.get(oid):
msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank?
book._msgflows[oid].maps.insert(0, msg.to_dict())
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
@ -699,141 +702,132 @@ async def translate_and_relay_brokerd_events(
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
ems_client_order_stream = router.dialogues[oid]
status_msg.resp = 'error'
status_msg.brokerd_msg = msg
book._active[oid] = status_msg
await ems_client_order_stream.send(status_msg)
# BrokerdStatus
case {
'name': 'status',
'status': status,
'reqid': reqid, # brokerd generated order-request id
# TODO: feels like the wrong msg for this field?
'remaining': remaining,
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
(oid := book._ems2brokerd_ids.inverse.get(reqid))
and status in (
'canceled',
'open',
'closed',
)
):
# ack = book._ems_entries[oid]
# ack = book._msgflows[oid].maps[0]
msg = BrokerdStatus(**brokerd_msg)
# TODO: should we flatten out these cases and/or should
# they maybe even eventually be separate messages?
if status == 'cancelled':
# TODO: maybe pack this into a composite type that
# contains both the IPC stream as well the
# msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
status_msg.resp = status
# retrieve existing live flow
old_reqid = status_msg.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
status_msg.reqid = reqid # THIS LINE IS CRITICAL!
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
await ems_client_order_stream.send(status_msg)
if status == 'closed':
log.info(f'Execution for {oid} is complete!')
status_msg = book._active.pop(oid)
elif status == 'canceled':
log.info(f'Cancellation for {oid} is complete!')
if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not remaining:
resp = 'broker_executed'
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!')
# remove from active flows
book._msgflows.pop(oid)
else: # open
# relayed from backend but probably not handled so
# just log it
else:
log.info(f'{broker} filled {msg}')
log.info(f'{broker} opened order {msg}')
else:
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
# book._ems_entries[oid] = msg
book._msgflows[oid].maps.insert(0, msg.to_dict())
# TODO: i wonder if we should just support receiving an
# actual ``BrokerdOrder`` msg here? Is it a bad idea to
# presume that inbound orders on the backend dialog can be
# used to drive order tracking/tracing in the EMS *over*
# a set of backends from some other non-ems owner?
# this will likely feel better once we get open_msg_scope()
# or wtv finished.
# BrokerdStatus containing an embedded order msg which
# ``Status`` containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the
# brokerd backend.
case {
'name': 'status',
'status': status,
'resp': status,
'reqid': reqid, # brokerd generated order-request id
'broker_details': details,
}:
# TODO: we probably want some kind of "tagging" system
# for external order submissions like this eventually
# to be able to more formally handle multi-player
# trading...
if status != 'submitted':
if (
status != 'open'
):
# TODO: check for an oid we might know since it was
# registered from a previous order/status load?
log.error(
f'Unknown status msg:\n'
f'Unknown/transient status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
# TODO: we probably want some kind of "tagging" system
# for external order submissions like this eventually
# to be able to more formally handle multi-player
# trading...
else:
# existing open backend order which we broadcast to
# all currently connected clients.
order_dict = brokerd_msg['broker_details'].pop('order')
order = BrokerdOrder(**order_dict)
msg = BrokerdStatus(**brokerd_msg)
log.info(
f'Relaying existing open order:\n {brokerd_msg}'
)
# use backend request id as our ems id though this
# may end up with collisions?
broker = details['name']
oid = str(reqid)
# attempt to avoid collisions
msg.reqid = oid
status_msg = Status(**brokerd_msg)
order = Order(**status_msg.req)
assert order.price and order.size
status_msg.req = order
# XXX: MEGA HACK ALERT FOR the dialog entries delivery
# on client connect...
# TODO: fix this garbage..
msg.broker_details['resp'] = resp = 'broker_submitted'
assert status_msg.src # source tag?
oid = str(status_msg.reqid)
# attempt to avoid collisions
status_msg.reqid = oid
assert status_msg.resp == 'open'
# register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid
# book._ems_entries[oid] = msg
# fill in approximate msg flow history
flow = book._msgflows[oid]
flow.maps.insert(0, order.to_dict())
flow.maps.insert(0, msg.to_dict())
flow.maps.insert(0, details)
flattened = dict(flow)
# await tractor.breakpoint()
book._active[oid] = status_msg
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(flattened)
# Status(
# oid=oid,
# resp=resp,
# time_ns=time.time_ns(),
# broker_reqid=reqid,
# brokerd_msg=flattened,
# )
# )
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
await router.client_broadcast(status_msg)
# don't fall through
continue
# TOO FAST ``BrokerdStatus`` that arrives
# before the ``BrokerdAck``.
case {
# XXX: sometimes there is a race with the backend (like
# `ib` where the pending stauts will be related before
# the ack, in which case we just ignore the faster
# pending msg and wait for our expected ack to arrive
# later (i.e. the first block below should enter).
'name': 'status',
'status': status,
'reqid': reqid,
}:
log.warning(
'Unhandled broker status:\n'
f'{pformat(brokerd_msg)}\n'
)
# BrokerdFill
case {
'name': 'fill',
@ -843,40 +837,18 @@ async def translate_and_relay_brokerd_events(
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
msg = BrokerdFill(**brokerd_msg)
resp = 'broker_filled'
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow
# entry = book._ems_entries[oid]
# assert entry.oid == oid # from when we only stored the first ack
# old_reqid = entry.reqid
# if old_reqid and old_reqid != reqid:
# log.warning(
# f'Brokerd order id change for {oid}:\n'
# f'{old_reqid} -> {reqid}'
# )
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client oid: {oid}')
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
@ -910,23 +882,27 @@ async def process_client_order_cmds(
# others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
# live_entry = dark_book._ems_entries.get(oid)
live_entry = dark_book._msgflows.get(oid)
# any dark/live status which is current
status = dark_book._active.get(oid)
match cmd:
# existing live-broker order cancel
case {
'action': 'cancel',
'oid': oid,
} if live_entry:
# reqid = live_entry.reqid
reqid = live_entry['reqid']
msg = BrokerdCancel(
} if (
(status := dark_book._active.get(oid))
and status.resp in ('open', 'pending')
):
reqid = status.reqid
order = status.req
to_brokerd_msg = BrokerdCancel(
oid=oid,
reqid=reqid,
time_ns=time.time_ns(),
# account=live_entry.account,
account=live_entry['account'],
account=order.account,
)
# NOTE: cancel response will be relayed back in messages
@ -936,39 +912,52 @@ async def process_client_order_cmds(
log.info(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg)
await brokerd_order_stream.send(to_brokerd_msg)
else:
# this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when
# the order ack does show up later such that the brokerd
# order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg
live_entry.maps.insert(0, msg.to_dict())
# dark_book._ems_entries[oid] = msg
# special case for now..
status.req = to_brokerd_msg
# dark trigger cancel
case {
'action': 'cancel',
'oid': oid,
} if not live_entry:
# try:
} if (
status and status.resp == 'dark_open'
# or status and status.req
):
# remove from dark book clearing
dark_book.orders[symbol].pop(oid, None)
entry = dark_book.orders[symbol].pop(oid, None)
if entry:
(
pred,
tickfilter,
cmd,
percent_away,
abs_diff_away
) = entry
# tell client side that we've cancelled the
# dark-trigger order
await client_order_stream.send(
Status(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
)
)
status.resp = 'canceled'
status.req = cmd
await client_order_stream.send(status)
# de-register this client dialogue
router.dialogues.pop(oid)
dark_book._active.pop(oid)
# except KeyError:
# log.exception(f'No dark order for {symbol}?')
else:
log.exception(f'No dark order for {symbol}?')
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
# live order submission
case {
@ -977,11 +966,9 @@ async def process_client_order_cmds(
'price': trigger_price,
'size': size,
'action': ('buy' | 'sell') as action,
'exec_mode': 'live',
'exec_mode': ('live' | 'paper'),
}:
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
# TODO: relay this order msg directly?
req = Order(**cmd)
broker = req.brokers[0]
@ -990,17 +977,13 @@ async def process_client_order_cmds(
# aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '')
if live_entry is not None:
# sanity check on emsd id, but it won't work
# for pre-existing orders that we load since
# the only msg will be a ``BrokerdStatus``
# assert live_entry.oid == oid
# reqid = live_entry.reqid
reqid = live_entry['reqid']
if status is not None:
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}")
reqid = status.reqid
status.req = req
status.resp = 'pending'
msg = BrokerdOrder(
oid=oid, # no ib support for oids...
@ -1017,6 +1000,18 @@ async def process_client_order_cmds(
account=req.account,
)
if status is None:
status = Status(
oid=oid,
reqid=reqid,
resp='pending',
time_ns=time.time_ns(),
brokerd_msg=msg,
req=req,
)
dark_book._active[oid] = status
# send request to backend
# XXX: the trades data broker response loop
# (``translate_and_relay_brokerd_events()`` above) will
@ -1032,8 +1027,7 @@ async def process_client_order_cmds(
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that live order asap.
# dark_book._ems_entries[oid] = msg
dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark-order / alert submission
case {
@ -1049,9 +1043,11 @@ async def process_client_order_cmds(
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
exec_mode in ('dark', 'paper')
exec_mode in ('dark',)
or action == 'alert'
):
req = Order(**cmd)
# Auto-gen scanner predicate:
# we automatically figure out what the alert check
# condition should be based on the current first
@ -1098,23 +1094,25 @@ async def process_client_order_cmds(
)[oid] = (
pred,
tickfilter,
cmd,
req,
percent_away,
abs_diff_away
)
resp = 'dark_submitted'
resp = 'dark_open'
# alerts have special msgs to distinguish
if action == 'alert':
resp = 'alert_submitted'
# if action == 'alert':
# resp = 'open'
await client_order_stream.send(
Status(
status = Status(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
req=req,
src='dark',
)
)
dark_book._active[oid] = status
await client_order_stream.send(status)
@tractor.context
@ -1206,35 +1204,12 @@ async def _emsd_main(
brokerd_stream = relay.brokerd_dialogue # .clone()
# convert dialogs to status msgs for client delivery
statuses = {}
# for oid, msg in book._ems_entries.items():
for oid, msgflow in book._msgflows.items():
# we relay to the client side a msg that contains
# all data flattened from the message history.
# status = msgflow['status']
flattened = dict(msgflow)
# status = flattened['status']
flattened.pop('brokerd_msg', None)
statuses[oid] = flattened
# Status(
# oid=oid,
# time_ns=flattened['time_ns'],
# # time_ns=msg.time_ns,
# # resp=f'broker_{msg.status}',
# resp=f'broker_{status}',
# # trigger_price=msg.order.price,
# trigger_price=flattened['price'],
# brokerd_msg=flattened,
# )
# await tractor.breakpoint()
# signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``.
await ems_ctx.started((
relay.positions,
list(relay.accounts),
statuses,
book._active,
))
# establish 2-way stream with requesting order-client and

View File

@ -18,56 +18,99 @@
Clearing sub-system message and protocols.
"""
from typing import Optional, Union
from collections import (
ChainMap,
deque,
)
from typing import (
Optional,
Literal,
Union,
)
from ..data._source import Symbol
from ..data.types import Struct
# TODO: a composite for tracking msg flow on 2-legged
# dialogs.
# class Dialog(ChainMap):
# '''
# Msg collection abstraction to easily track the state changes of
# a msg flow in one high level, query-able and immutable construct.
# The main use case is to query data from a (long-running)
# msg-transaction-sequence
# '''
# def update(
# self,
# msg,
# ) -> None:
# self.maps.insert(0, msg.to_dict())
# def flatten(self) -> dict:
# return dict(self)
# TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# --------------
# Client -> emsd
# --------------
class Cancel(Struct):
'''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
'''
action: str = 'cancel'
oid: str # uuid4
symbol: str
class Order(Struct):
# TODO: use ``msgspec.Literal``
# TODO: ideally we can combine these 2 fields into
# 1 and just use the size polarity to determine a buy/sell.
# i would like to see this become more like
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'}
# action: Literal[
# 'live',
# 'dark',
# 'alert',
# ]
action: Literal[
'buy',
'sell',
'alert',
]
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: Literal[
'dark',
'live',
# 'paper', no right?
]
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: Union[str, Symbol]
account: str # should we set a default as '' ?
price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float
brokers: list[str]
size: float # -ve is "sell", +ve is "buy"
# Assigned once initial ack is received
# ack_time_ns: Optional[int] = None
brokers: Optional[list[str]] = []
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: str # {'dark', 'live'}
class Cancel(Struct):
'''
Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
'''
action: str = 'cancel'
oid: str # uuid4
symbol: str
req: Optional[Order] = None
# --------------
@ -79,35 +122,30 @@ class Order(Struct):
class Status(Struct):
name: str = 'status'
oid: str # uuid4
time_ns: int
# {
# 'dark_submitted',
# 'dark_cancelled',
# 'dark_triggered',
resp: Literal[
'pending', # acked but not yet open
'open',
'dark_open', # live in dark loop
'triggered', # dark-submitted to brokerd-backend
'closed', # fully cleared all size/units
'fill', # partial execution
'canceled',
'error',
]
# 'broker_submitted',
# 'broker_cancelled',
# 'broker_executed',
# 'broker_filled',
# 'broker_errored',
# 'alert_submitted',
# 'alert_triggered',
# }
resp: str # "response", see above
# trigger info
trigger_price: Optional[float] = None
# price: float
# broker: Optional[str] = None
oid: str # uuid4
# this maps normally to the ``BrokerdOrder.reqid`` below, an id
# normally allocated internally by the backend broker routing system
broker_reqid: Optional[Union[int, str]] = None
reqid: Optional[Union[int, str]] = None
# the (last) source order/request msg if provided
# (eg. the Order/Cancel which causes this msg)
req: Optional[Union[Order, Cancel]] = None
src: Optional[str] = None
# for relaying backend msg data "through" the ems layer
brokerd_msg: dict = {}
@ -185,20 +223,19 @@ class BrokerdStatus(Struct):
name: str = 'status'
reqid: Union[int, str]
time_ns: int
status: Literal[
'open',
'canceled',
'fill',
'pending',
]
# TODO: instead (ack, pending, open, fill, clos(ed), cancelled)
# {
# 'submitted', # open
# 'cancelled', # canceled
# 'filled', # closed
# }
status: str
account: str
filled: float = 0.0
reason: str = ''
remaining: float = 0.0
external: bool = False
# external: bool = False
# order: Optional[BrokerdOrder] = None
# XXX: better design/name here?
@ -206,7 +243,7 @@ class BrokerdStatus(Struct):
# event that wasn't originated by piker's emsd (eg. some external
# trading system which does it's own order control but that you
# might want to "track" using piker UIs/systems).
external: bool = False
# external: bool = False
# XXX: not required schema as of yet
broker_details: dict = {

View File

@ -45,8 +45,13 @@ from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
from ..log import get_logger
from ._messages import (
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill, BrokerdPosition, BrokerdError
BrokerdCancel,
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdPosition,
BrokerdError,
)
@ -94,6 +99,10 @@ class PaperBoi:
'''
is_modify: bool = False
if action == 'alert':
# bypass all fill simulation
return reqid
entry = self._reqids.get(reqid)
if entry:
# order is already existing, this is a modify
@ -104,10 +113,6 @@ class PaperBoi:
# register order internally
self._reqids[reqid] = (oid, symbol, action, price)
if action == 'alert':
# bypass all fill simulation
return reqid
# TODO: net latency model
# we checkpoint here quickly particulalry
# for dark orders since we want the dark_executed
@ -119,7 +124,9 @@ class PaperBoi:
size = -size
msg = BrokerdStatus(
status='submitted',
status='open',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid,
time_ns=time.time_ns(),
filled=0.0,
@ -136,7 +143,14 @@ class PaperBoi:
) or (
action == 'sell' and (clear_price := self.last_bid[0]) >= price
):
await self.fake_fill(symbol, clear_price, size, action, reqid, oid)
await self.fake_fill(
symbol,
clear_price,
size,
action,
reqid,
oid,
)
else:
# register this submissions as a paper live order
@ -178,7 +192,9 @@ class PaperBoi:
await trio.sleep(0.05)
msg = BrokerdStatus(
status='cancelled',
status='canceled',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid,
time_ns=time.time_ns(),
broker_details={'name': 'paperboi'},
@ -230,25 +246,23 @@ class PaperBoi:
self._trade_ledger.update(fill_msg.to_dict())
if order_complete:
msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
status='filled',
# account=f'paper_{self.broker}',
account='paper',
status='closed',
filled=size,
remaining=0 if order_complete else remaining,
broker_details={
'paper_info': {
'oid': oid,
},
'action': action,
'size': size,
'price': price,
'name': self.broker,
},
# broker_details={
# 'paper_info': {
# 'oid': oid,
# },
# 'action': action,
# 'size': size,
# 'price': price,
# 'name': self.broker,
# },
)
await self.ems_trades_stream.send(msg)
@ -393,30 +407,33 @@ async def handle_order_requests(
# order_request: dict
async for request_msg in ems_order_stream:
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
# action = request_msg['action']
match request_msg:
# if action in {'buy', 'sell'}:
case {'action': ('buy' | 'sell')}:
order = BrokerdOrder(**request_msg)
account = order.account
if account != 'paper':
log.error(
'This is a paper account,'
' only a `paper` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
# oid=request_msg['oid'],
oid=order.oid,
# symbol=request_msg['symbol'],
symbol=order.symbol,
reason=f'Paper only. No account found: `{account}` ?',
))
continue
# validate
order = BrokerdOrder(**request_msg)
# order = BrokerdOrder(**request_msg)
if order.reqid is None:
reqid = str(uuid.uuid4())
else:
reqid = order.reqid
# if order.reqid is None:
# reqid =
# else:
reqid = order.reqid or str(uuid.uuid4())
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
@ -447,14 +464,14 @@ async def handle_order_requests(
reqid=reqid,
)
elif action == 'cancel':
# elif action == 'cancel':
case {'action': 'cancel'}:
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(
reqid=msg.reqid
)
else:
case _:
log.error(f'Unknown order command: {request_msg}')

View File

@ -63,7 +63,7 @@ from ._forms import open_form_input_handling
log = get_logger(__name__)
class OrderDialog(Struct):
class Dialog(Struct):
'''
Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart.
@ -146,7 +146,7 @@ class OrderMode:
current_pp: Optional[PositionTracker] = None
active: bool = False
name: str = 'order'
dialogs: dict[str, OrderDialog] = field(default_factory=dict)
dialogs: dict[str, Dialog] = field(default_factory=dict)
_colors = {
'alert': 'alert_yellow',
@ -163,6 +163,7 @@ class OrderMode:
) -> LevelLine:
level = order.price
print(f'SIZE: {order.size}')
line = order_line(
self.chart,
@ -175,7 +176,8 @@ class OrderMode:
color=self._colors[order.action],
dotted=True if (
order.exec_mode == 'dark' and order.action != 'alert'
order.exec_mode == 'dark'
and order.action != 'alert'
) else False,
**line_kwargs,
@ -265,7 +267,7 @@ class OrderMode:
send_msg: bool = True,
order: Optional[Order] = None,
) -> OrderDialog:
) -> Dialog:
'''
Send execution order to EMS return a level line to
represent the order on a chart.
@ -304,7 +306,7 @@ class OrderMode:
uuid=order.oid,
)
dialog = OrderDialog(
dialog = Dialog(
uuid=order.oid,
order=order,
symbol=order.symbol,
@ -373,7 +375,7 @@ class OrderMode:
self,
uuid: str
) -> OrderDialog:
) -> Dialog:
'''
Order submitted status event handler.
@ -428,7 +430,7 @@ class OrderMode:
self,
uuid: str,
msg: Dict[str, Any],
msg: Status,
) -> None:
@ -452,7 +454,7 @@ class OrderMode:
# TODO: add in standard fill/exec info that maybe we
# pack in a broker independent way?
f'{msg["resp"]}: {msg["trigger_price"]}',
f'{msg.resp}: {msg.req.price}',
],
)
log.runtime(result)
@ -524,53 +526,36 @@ class OrderMode:
def load_unknown_dialog_from_msg(
self,
# status: Status,
msg: dict,
msg: Status,
) -> OrderDialog:
) -> Dialog:
oid = str(msg['oid'])
# oid = str(status.oid)
# bstatus = BrokerdStatus(**msg.brokerd_msg)
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
# border = BrokerdOrder(**bstatus.broker_details['order'])
# msg = msg['brokerd_msg']
order = Order(**msg.req)
oid = str(msg.oid)
symbol = order.symbol
# size = border.size
size = msg['size']
if size >= 0:
action = 'buy'
# TODO: MEGA UGGG ZONEEEE!
src = msg.src
if (
src
and src != 'dark'
and src not in symbol
):
fqsn = symbol + '.' + src
brokername = src
else:
action = 'sell'
fqsn = symbol
*head, brokername = fqsn.rsplit('.')
# acct = border.account
# price = border.price
# price = msg['brokerd_msg']['price']
symbol = msg['symbol']
deats = msg['broker_details']
brokername = deats['name']
fqsn = (
# deats['fqsn'] + '.' + deats['name']
symbol + '.' + brokername
)
symbol = Symbol.from_fqsn(
# fill out complex fields
order.oid = str(order.oid)
order.brokers = [brokername]
order.symbol = Symbol.from_fqsn(
fqsn=fqsn,
info={},
)
# map to order composite-type
order = Order(
action=action,
price=msg['price'],
account=msg['account'],
size=size,
symbol=symbol,
brokers=[brokername],
oid=oid,
exec_mode='live', # dark or live
)
dialog = self.submit_order(
send_msg=False,
order=order,
@ -770,7 +755,7 @@ async def open_order_mode(
order_pane.order_mode = mode
# select a pp to track
tracker = trackers[pp_account]
tracker: PositionTracker = trackers[pp_account]
mode.current_pp = tracker
tracker.show()
tracker.hide_info()
@ -870,12 +855,13 @@ async def process_trade_msg(
book: OrderBook,
msg: dict,
) -> None:
) -> tuple[Dialog, Status]:
get_index = mode.chart.get_index
fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}')
name = msg['name']
if name in (
'position',
):
@ -901,86 +887,92 @@ async def process_trade_msg(
# short circuit to next msg to avoid
# unnecessary msg content lookups
return
# continue
resp = msg['resp']
oid = str(msg['oid'])
dialog = mode.dialogs.get(oid)
msg = Status(**msg)
resp = msg.resp
oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid)
if dialog is None:
log.warning(
f'received msg for untracked dialog:\n{fmsg}'
)
# dialog = mode.load_unknown_dialog_from_msg(Status(**msg))
dialog = mode.load_unknown_dialog_from_msg(msg)
match msg:
case Status(resp='dark_open' | 'open'):
# record message to dialog tracking
dialog.msgs[oid] = msg
# response to 'action' request (buy/sell)
if resp in (
'dark_submitted',
'broker_submitted'
):
if dialog is not None:
# show line label once order is live
mode.on_submit(oid)
# resp to 'cancel' request or error condition
# for action request
elif resp in (
'broker_inactive',
'broker_errored',
else:
log.warning(
f'received msg for untracked dialog:\n{fmsg}'
)
assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}'
sym = mode.chart.linked.symbol
fqsn = sym.front_fqsn()
order = Order(**msg.req)
if (
((order.symbol + f'.{msg.src}') == fqsn)
# a existing dark order for the same symbol
or (
order.symbol == fqsn
and (msg.src == 'dark') or (msg.src in fqsn)
)
):
dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid)
case Status(resp='error'):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
broker_msg = msg.brokerd_msg
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
elif resp in (
'broker_cancelled',
'dark_cancelled'
):
case Status(resp='canceled'):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
req = msg.req
log.cancel(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
f'Canceled order {oid}:\n{pformat(req)}'
)
elif resp in (
'dark_triggered'
case Status(
resp='triggered',
# req=Order(exec_mode='dark') # TODO:
req={'exec_mode': 'dark'},
):
log.info(f'Dark order triggered for {fmsg}')
elif resp in (
'alert_triggered'
case Status(
resp='triggered',
# req=Order(exec_mode='live', action='alert') as req, # TODO
req={'exec_mode': 'live', 'action': 'alert'} as req,
):
# should only be one "fill" for an alert
# add a triangle and remove the level line
mode.on_fill(
oid,
price=msg['trigger_price'],
price=req.price,
arrow_index=get_index(time.time()),
)
mode.lines.remove_line(uuid=oid)
msg.req = Order(**req)
await mode.on_exec(oid, msg)
# response to completed 'action' request for buy/sell
elif resp in (
'broker_executed',
# response to completed 'dialog' for order request
case Status(
resp='closed',
# req=Order() as req, # TODO
req=req,
):
# right now this is just triggering a system alert
msg.req = Order(**req)
await mode.on_exec(oid, msg)
if msg['brokerd_msg']['remaining'] == 0:
mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually
elif resp in (
'broker_filled',
):
case Status(resp='fill'):
known_order = book._sent_orders.get(oid)
if not known_order:
log.warning(f'order {oid} is unknown')
@ -988,7 +980,7 @@ async def process_trade_msg(
# continue
action = known_order.action
details = msg['brokerd_msg']
details = msg.brokerd_msg
# TODO: some kinda progress system
mode.on_fill(
@ -1003,3 +995,9 @@ async def process_trade_msg(
# TODO: how should we look this up?
# tracker = mode.trackers[msg['account']]
# tracker.live_pp.fills.append(msg)
# record message to dialog tracking
if dialog:
dialog.msgs[oid] = msg
return dialog, msg