Add full EMS order-dialog (re-)load support!

This includes darks, lives and alerts with all connecting clients
being broadcast all existing order-flow dialog states. Obviously
for now darks and alerts only live as long as the `emsd` actor lifetime
(though we will store these in local state eventually) and "live" orders
have lifetimes managed by their respective backend broker.

The details of this change-set is extensive, so here we go..

Messaging schema:
- change the messaging `Status` status-key set to:
  `resp: Literal['pending', 'open', 'dark_open', 'triggered',
                'closed',  'fill', 'canceled', 'error']`

  which better reflects the semantics of order lifetimes and was
  partially inspired by the status keys `kraken` provides for their
  order-entry API. The prior key set was based on `ib`'s horrible
  semantics which sound like they're right out of the 80s..
  Also, we reflect this same set in the `BrokerdStatus` msg and likely
  we'll just get rid of the separate brokerd-dialog side type
  eventually.
- use `Literal` type annots for statuses where applicable and as they
  are supported by `msgspec`.
- add additional optional `Status` fields:
  -`req: Order` to allow each status msg to optionally ref its
    commanding order-request msg allowing at least a request-response
    style implicit tracing in all response msgs.
  -`src: str` tag string to show the source of the msg.
  -`reqid: str | int` such that the ems can relay the `brokerd`
    request id both to the client side and have one spot to look
    up prior status msgs and
- draft a (unused/commented) `Dialog` type which can be eventually used
  at all EMS endpoints to track msg-flow states

EMS engine adjustments/rework:
- use the new status key set throughout and expect `BrokerdStatus` msgs
  to use the same new schema as `Status`.
- add a `_DarkBook._active: dict[str, Status]` table which is now used for
  all per-leg-dialog associations and order flow state tracking
  allowing for the both the brokerd-relay and client-request handler loops
  to read/write the same msg-table and provides for delivering
  the overall EMS-active-orders state to newly/re-connecting clients
  with minimal processing; this table replaces what the `._ems_entries`
  table from prior.
- add `Router.client_broadcast()` to send a msg to all currently
  connected peers.
- a variety of msg handler block logic tweaks including more `case:`
  blocks to be both flatter and improve explicitness:
  - for the relay loop move all `Status` msg update and sending to
    within each block instead of a fallthrough case plus hard-to-follow
    state logic.
  - add a specific case for unhandled backend status keys and just log
    them.
  - pop alerts from `._active` immediately once triggered.
  - where possible mutate status msgs fields over instantiating new
    ones.
- insert and expect `Order` instances in the dark clearing loop and
  adjust `case:` blocks accordingly.
- tag `dark_open` and `triggered` statuses as sourced from the ems.
- drop all the `ChainMap` stuff for now; we're going to make our own
  `Dialog` type for this purpose..

Order mode rework:
- always parse the `Status` msg and use match syntax cases with object
  patterns, hackily assign the `.req` in many blocks to work around not
  yet having proper on-the-wire decoding yet.
- make `.load_unknown_dialog_from_msg()` expect a `Status` with boxed
  `.req: Order` as input.
- change `OrderDialog` -> `Dialog` in prep for a general purpose type
  of the same name.

`ib` backend order loading support:
- do "closed" status detection inside the msg-relay loop instead
  of expecting the ems to do this..
- add an attempt to cancel inactive orders by scheduling cancel
  submissions continually (no idea if this works).
- add a status map to go from the 80s keys to our new set.
- deliver `Status` msgs with an embedded `Order` for existing live order
  loading and make sure to try an get the source exchange info (instead
  of SMART).

Paper engine ported to match:
- use new status keys in `BrokerdStatus` msgs
- use `match:` syntax in request handler loop
dict_differ
Tyler Goodlet 2022-08-10 00:16:08 -04:00
parent bbbdcad33b
commit 7fe3e3f482
5 changed files with 619 additions and 556 deletions

View File

@ -61,6 +61,8 @@ from piker.pp import (
) )
from piker.log import get_console_log from piker.log import get_console_log
from piker.clearing._messages import ( from piker.clearing._messages import (
Order,
Status,
BrokerdOrder, BrokerdOrder,
BrokerdOrderAck, BrokerdOrderAck,
BrokerdStatus, BrokerdStatus,
@ -185,7 +187,7 @@ async def handle_order_requests(
) )
) )
elif action == 'cancel': if action == 'cancel':
msg = BrokerdCancel(**request_msg) msg = BrokerdCancel(**request_msg)
client.submit_cancel(reqid=int(msg.reqid)) client.submit_cancel(reqid=int(msg.reqid))
@ -478,43 +480,43 @@ async def trades_dialogue(
order = trade.order order = trade.order
quant = trade.order.totalQuantity quant = trade.order.totalQuantity
action = order.action.lower()
size = { size = {
'SELL': -1, 'sell': -1,
'BUY': 1, 'buy': 1,
}[order.action] * quant }[action] * quant
fqsn, _ = con2fqsn(trade.contract) con = trade.contract
# TODO: in the case of the SMART venue (aka ib's
# router-clearing sys) we probably should handle
# showing such orders overtop of the fqsn for the
# primary exchange, how to map this easily is going
# to be a bit tricky though?
deats = await proxy.con_deats(contracts=[con])
fqsn = list(deats)[0]
reqid = order.orderId reqid = order.orderId
# TODO: maybe embed a ``BrokerdOrder`` instead # TODO: maybe embed a ``BrokerdOrder`` instead
# since then we can directly load it on the client # since then we can directly load it on the client
# side in the order mode loop? # side in the order mode loop?
msg = BrokerdStatus( msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=str(reqid),
reqid=reqid, reqid=reqid,
time_ns=(ts := time.time_ns()),
status='submitted',
account=accounts_def.inverse[order.account],
filled=0,
reason='Existing live order',
# this seems to not be necessarily up to date in # embedded order info
# the execDetails event.. so we have to send it req=Order(
# here I guess? action=action,
remaining=quant, exec_mode='live',
broker_details={ oid=str(reqid),
'name': 'ib',
'fqsn': fqsn,
# this is a embedded/boxed order
# msg that can be loaded by the ems
# and for relay to clients.
'order': BrokerdOrder(
symbol=fqsn, symbol=fqsn,
account=accounts_def.inverse[order.account], account=accounts_def.inverse[order.account],
oid=reqid,
time_ns=ts,
size=size,
price=order.lmtPrice, price=order.lmtPrice,
size=size,
), ),
}, src='ib',
) )
order_msgs.append(msg) order_msgs.append(msg)
@ -543,7 +545,7 @@ async def trades_dialogue(
or pp.size != msg.size or pp.size != msg.size
): ):
trans = norm_trade_records(ledger) trans = norm_trade_records(ledger)
updated = table.update_from_trans(trans) table.update_from_trans(trans)
# update trades ledgers for all accounts from connected # update trades ledgers for all accounts from connected
# api clients which report trades for **this session**. # api clients which report trades for **this session**.
trades = await proxy.trades() trades = await proxy.trades()
@ -569,7 +571,7 @@ async def trades_dialogue(
trans = trans_by_acct.get(acctid) trans = trans_by_acct.get(acctid)
if trans: if trans:
table.update_from_trans(trans) table.update_from_trans(trans)
updated = table.update_from_trans(trans) table.update_from_trans(trans)
# XXX: not sure exactly why it wouldn't be in # XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but # the updated output (maybe this is a bug?) but
@ -649,6 +651,7 @@ async def trades_dialogue(
for client, stream in clients: for client, stream in clients:
n.start_soon( n.start_soon(
deliver_trade_events, deliver_trade_events,
n,
stream, stream,
ems_stream, ems_stream,
accounts_def, accounts_def,
@ -724,8 +727,21 @@ async def emit_pp_update(
await ems_stream.send(msg) await ems_stream.send(msg)
_statuses: dict[str, str] = {
'cancelled': 'canceled',
'submitted': 'open',
'pendingsubmit': 'pending',
'filled': 'fill',
# TODO: see a current ``ib_insync`` issue around this:
# https://github.com/erdewit/ib_insync/issues/363
'inactive': 'pending',
}
async def deliver_trade_events( async def deliver_trade_events(
nurse: trio.Nursery,
trade_event_stream: trio.MemoryReceiveChannel, trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'` accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
@ -781,14 +797,35 @@ async def deliver_trade_events(
# unwrap needed data from ib_insync internal types # unwrap needed data from ib_insync internal types
trade: Trade = item trade: Trade = item
status: OrderStatus = trade.orderStatus status: OrderStatus = trade.orderStatus
status_key = status.status.lower() ib_status_key = status.status.lower()
acctid = accounts_def.inverse[trade.order.account]
# double check there is no error when # double check there is no error when
# cancelling.. gawwwd # cancelling.. gawwwd
if status_key == 'cancelled': if ib_status_key == 'cancelled':
last_log = trade.log[-1] last_log = trade.log[-1]
if last_log.message: if last_log.message:
status_key = trade.log[-2].status ib_status_key = trade.log[-2].status
elif ib_status_key == 'inactive':
async def sched_cancel():
log.warning(
'OH GAWD an inactive order..scheduling a cancel\n'
f'{pformat(item)}'
)
proxy = proxies[acctid]
await proxy.submit_cancel(reqid=trade.order.orderId)
await trio.sleep(1)
nurse.start_soon(sched_cancel)
nurse.start_soon(sched_cancel)
status_key = _statuses.get(ib_status_key) or ib_status_key
remaining = status.remaining
if remaining == 0:
status_key = 'closed'
# skip duplicate filled updates - we get the deats # skip duplicate filled updates - we get the deats
# from the execution details event # from the execution details event
@ -806,7 +843,7 @@ async def deliver_trade_events(
# this seems to not be necessarily up to date in the # this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess? # execDetails event.. so we have to send it here I guess?
remaining=status.remaining, remaining=remaining,
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
@ -949,9 +986,8 @@ async def deliver_trade_events(
cid, msg = pack_position(item) cid, msg = pack_position(item)
log.info(f'New IB position msg: {msg}') log.info(f'New IB position msg: {msg}')
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg) continue
case 'event': case 'event':

View File

@ -27,6 +27,7 @@ from typing import (
AsyncIterator, AsyncIterator,
Any, Any,
Callable, Callable,
Optional,
) )
from bidict import bidict from bidict import bidict
@ -43,9 +44,10 @@ from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Order, Order,
Status, Status,
# Cancel,
BrokerdCancel, BrokerdCancel,
BrokerdOrder, BrokerdOrder,
BrokerdOrderAck, # BrokerdOrderAck,
BrokerdStatus, BrokerdStatus,
BrokerdFill, BrokerdFill,
BrokerdError, BrokerdError,
@ -130,6 +132,7 @@ class _DarkBook(Struct):
] = {} ] = {}
# _ems_entries: dict[str, str] = {} # _ems_entries: dict[str, str] = {}
_active: dict = {}
# mapping of ems dialog ids to msg flow history # mapping of ems dialog ids to msg flow history
_msgflows: defaultdict[ _msgflows: defaultdict[
@ -192,6 +195,7 @@ async def clear_dark_triggers(
for oid, ( for oid, (
pred, pred,
tf, tf,
# TODO: send this msg instead?
cmd, cmd,
percent_away, percent_away,
abs_diff_away abs_diff_away
@ -211,30 +215,29 @@ async def clear_dark_triggers(
# majority of iterations will be non-matches # majority of iterations will be non-matches
continue continue
brokerd_msg: Optional[BrokerdOrder] = None
match cmd: match cmd:
# alert: nothing to do but relay a status # alert: nothing to do but relay a status
# back to the requesting ems client # back to the requesting ems client
case { case Order(action='alert'):
'action': 'alert', resp = 'triggered'
}:
resp = 'alert_triggered'
# executable order submission # executable order submission
case { case Order(
'action': action, action=action,
'symbol': symbol, symbol=symbol,
'account': account, account=account,
'size': size, size=size,
}: ):
bfqsn: str = symbol.replace(f'.{broker}', '') bfqsn: str = symbol.replace(f'.{broker}', '')
submit_price = price + abs_diff_away submit_price = price + abs_diff_away
resp = 'dark_triggered' # hidden on client-side resp = 'triggered' # hidden on client-side
log.info( log.info(
f'Dark order triggered for price {price}\n' f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}') f'Submitting order @ price {submit_price}')
live_req = BrokerdOrder( brokerd_msg = BrokerdOrder(
action=action, action=action,
oid=oid, oid=oid,
account=account, account=account,
@ -243,7 +246,8 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=size, size=size,
) )
await brokerd_orders_stream.send(live_req)
await brokerd_orders_stream.send(brokerd_msg)
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -252,18 +256,18 @@ async def clear_dark_triggers(
# allocated unique ``BrokerdOrderAck.reqid`` key # allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems. # generated by the broker's own systems.
# book._ems_entries[oid] = live_req # book._ems_entries[oid] = live_req
book._msgflows[oid].append(live_req) # book._msgflows[oid].maps.insert(0, live_req)
case _: case _:
raise ValueError(f'Invalid dark book entry: {cmd}') raise ValueError(f'Invalid dark book entry: {cmd}')
# fallthrough logic # fallthrough logic
resp = Status( status = Status(
oid=oid, # ems dialog id oid=oid, # ems dialog id
time_ns=time.time_ns(), time_ns=time.time_ns(),
resp=resp, resp=resp,
trigger_price=price, req=cmd,
brokerd_msg=cmd, brokerd_msg=brokerd_msg,
) )
# remove exec-condition from set # remove exec-condition from set
@ -274,9 +278,18 @@ async def clear_dark_triggers(
f'pred for {oid} was already removed!?' f'pred for {oid} was already removed!?'
) )
# update actives
if cmd.action == 'alert':
# don't register the alert status (so it won't
# be reloaded by clients) since it's now
# complete / closed.
book._active.pop(oid)
else:
book._active[oid] = status
# send response to client-side # send response to client-side
try: try:
await ems_client_order_stream.send(resp) await ems_client_order_stream.send(status)
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
): ):
@ -396,6 +409,22 @@ class Router(Struct):
relay.consumers -= 1 relay.consumers -= 1
async def client_broadcast(
self,
msg: dict,
) -> None:
for client_stream in self.clients.copy():
try:
await client_stream.send(msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
self.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
_router: Router = None _router: Router = None
@ -570,8 +599,7 @@ async def translate_and_relay_brokerd_events(
broker ems broker ems
'error' -> log it locally (for now) 'error' -> log it locally (for now)
'status' -> relabel as 'broker_<status>', if complete send 'executed' ('status' | 'fill'} -> relayed through see ``Status`` msg type.
'fill' -> 'broker_filled'
Currently handled status values from IB: Currently handled status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'} {'presubmitted', 'submitted', 'cancelled', 'inactive'}
@ -610,31 +638,16 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
for client_stream in router.clients.copy(): await router.client_broadcast(pos_msg)
try:
await client_stream.send(pos_msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
continue continue
# BrokerdOrderAck # BrokerdOrderAck
# initial response to brokerd order request
case { case {
'name': 'ack', 'name': 'ack',
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
'oid': oid, # ems order-dialog id 'oid': oid, # ems order-dialog id
} if ( }:
# entry := book._ems_entries.get(oid)
flow := book._msgflows.get(oid)
):
# initial response to brokerd order request
# if name == 'ack':
# register the brokerd request id (that was generated # register the brokerd request id (that was generated
# / created internally by the broker backend) with our # / created internally by the broker backend) with our
# local ems order id for reverse lookup later. # local ems order id for reverse lookup later.
@ -649,31 +662,24 @@ async def translate_and_relay_brokerd_events(
# new order which has not yet be registered into the # new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases: # local ems book, insert it now and handle 2 cases:
# - the order has previously been requested to be # 1. the order has previously been requested to be
# cancelled by the ems controlling client before we # cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel # received this ack, in which case we relay that cancel
# signal **asap** to the backend broker # signal **asap** to the backend broker
action = flow.get('action') # status = book._active.get(oid)
# action = getattr(entry, 'action', None) status = book._active[oid]
if action and action == 'cancel': req = status.req
if req and req.action == 'cancel':
# assign newly providerd broker backend request id # assign newly providerd broker backend request id
flow['reqid'] = reqid # and tell broker to cancel immediately
# entry.reqid = reqid status.reqid = reqid
await brokerd_trades_stream.send(req)
entry = flow.maps[0] # 2. the order is now active and will be mirrored in
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
else: else:
# update the flow with the ack msg # TODO: should we relay this ack state?
# book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) status.resp = 'pending'
flow.maps.insert(
0,
BrokerdOrderAck(**brokerd_msg).to_dict()
)
# no msg to client necessary # no msg to client necessary
continue continue
@ -684,13 +690,10 @@ async def translate_and_relay_brokerd_events(
'oid': oid, # ems order-dialog id 'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
'symbol': sym, 'symbol': sym,
'broker_details': details, } if status_msg := book._active.get(oid):
# 'reason': reason,
}:
msg = BrokerdError(**brokerd_msg) msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank? log.error(pformat(msg)) # XXX make one when it's blank?
book._msgflows[oid].maps.insert(0, msg.to_dict())
# TODO: figure out how this will interact with EMS clients # TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders # for ex. on an error do we react with a dark orders
@ -699,141 +702,132 @@ async def translate_and_relay_brokerd_events(
# some unexpected failure - something we need to think more # some unexpected failure - something we need to think more
# about. In most default situations, with composed orders # about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy. # (ex. brackets), most brokers seem to use a oca policy.
ems_client_order_stream = router.dialogues[oid]
status_msg.resp = 'error'
status_msg.brokerd_msg = msg
book._active[oid] = status_msg
await ems_client_order_stream.send(status_msg)
# BrokerdStatus # BrokerdStatus
case { case {
'name': 'status', 'name': 'status',
'status': status, 'status': status,
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
# TODO: feels like the wrong msg for this field?
'remaining': remaining,
} if ( } if (
oid := book._ems2brokerd_ids.inverse.get(reqid) (oid := book._ems2brokerd_ids.inverse.get(reqid))
and status in (
'canceled',
'open',
'closed',
)
): ):
# ack = book._ems_entries[oid]
# ack = book._msgflows[oid].maps[0]
msg = BrokerdStatus(**brokerd_msg) msg = BrokerdStatus(**brokerd_msg)
# TODO: should we flatten out these cases and/or should # TODO: maybe pack this into a composite type that
# they maybe even eventually be separate messages? # contains both the IPC stream as well the
if status == 'cancelled': # msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
status_msg.resp = status
# retrieve existing live flow
old_reqid = status_msg.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
status_msg.reqid = reqid # THIS LINE IS CRITICAL!
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
await ems_client_order_stream.send(status_msg)
if status == 'closed':
log.info(f'Execution for {oid} is complete!')
status_msg = book._active.pop(oid)
elif status == 'canceled':
log.info(f'Cancellation for {oid} is complete!') log.info(f'Cancellation for {oid} is complete!')
if status == 'filled': else: # open
# conditional execution is fully complete, no more # relayed from backend but probably not handled so
# fills for the noted order
if not remaining:
resp = 'broker_executed'
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!')
# remove from active flows
book._msgflows.pop(oid)
# just log it # just log it
else: log.info(f'{broker} opened order {msg}')
log.info(f'{broker} filled {msg}')
else: # ``Status`` containing an embedded order msg which
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
# book._ems_entries[oid] = msg
book._msgflows[oid].maps.insert(0, msg.to_dict())
# TODO: i wonder if we should just support receiving an
# actual ``BrokerdOrder`` msg here? Is it a bad idea to
# presume that inbound orders on the backend dialog can be
# used to drive order tracking/tracing in the EMS *over*
# a set of backends from some other non-ems owner?
# this will likely feel better once we get open_msg_scope()
# or wtv finished.
# BrokerdStatus containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the # should be loaded as a "pre-existing open order" from the
# brokerd backend. # brokerd backend.
case { case {
'name': 'status', 'name': 'status',
'status': status, 'resp': status,
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
'broker_details': details,
}: }:
# TODO: we probably want some kind of "tagging" system if (
# for external order submissions like this eventually status != 'open'
# to be able to more formally handle multi-player ):
# trading... # TODO: check for an oid we might know since it was
# registered from a previous order/status load?
if status != 'submitted':
log.error( log.error(
f'Unknown status msg:\n' f'Unknown/transient status msg:\n'
f'{pformat(brokerd_msg)}\n' f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?' 'Unable to relay message to client side!?'
) )
# TODO: we probably want some kind of "tagging" system
# for external order submissions like this eventually
# to be able to more formally handle multi-player
# trading...
else: else:
# existing open backend order which we broadcast to # existing open backend order which we broadcast to
# all currently connected clients. # all currently connected clients.
order_dict = brokerd_msg['broker_details'].pop('order')
order = BrokerdOrder(**order_dict)
msg = BrokerdStatus(**brokerd_msg)
log.info( log.info(
f'Relaying existing open order:\n {brokerd_msg}' f'Relaying existing open order:\n {brokerd_msg}'
) )
# use backend request id as our ems id though this # use backend request id as our ems id though this
# may end up with collisions? # may end up with collisions?
broker = details['name'] status_msg = Status(**brokerd_msg)
oid = str(reqid) order = Order(**status_msg.req)
# attempt to avoid collisions assert order.price and order.size
msg.reqid = oid status_msg.req = order
# XXX: MEGA HACK ALERT FOR the dialog entries delivery assert status_msg.src # source tag?
# on client connect... oid = str(status_msg.reqid)
# TODO: fix this garbage..
msg.broker_details['resp'] = resp = 'broker_submitted' # attempt to avoid collisions
status_msg.reqid = oid
assert status_msg.resp == 'open'
# register this existing broker-side dialog # register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid book._ems2brokerd_ids[oid] = reqid
# book._ems_entries[oid] = msg book._active[oid] = status_msg
# fill in approximate msg flow history
flow = book._msgflows[oid]
flow.maps.insert(0, order.to_dict())
flow.maps.insert(0, msg.to_dict())
flow.maps.insert(0, details)
flattened = dict(flow)
# await tractor.breakpoint()
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
for client_stream in router.clients.copy(): await router.client_broadcast(status_msg)
try:
await client_stream.send(flattened)
# Status(
# oid=oid,
# resp=resp,
# time_ns=time.time_ns(),
# broker_reqid=reqid,
# brokerd_msg=flattened,
# )
# )
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
# don't fall through # don't fall through
continue continue
# TOO FAST ``BrokerdStatus`` that arrives
# before the ``BrokerdAck``.
case {
# XXX: sometimes there is a race with the backend (like
# `ib` where the pending stauts will be related before
# the ack, in which case we just ignore the faster
# pending msg and wait for our expected ack to arrive
# later (i.e. the first block below should enter).
'name': 'status',
'status': status,
'reqid': reqid,
}:
log.warning(
'Unhandled broker status:\n'
f'{pformat(brokerd_msg)}\n'
)
# BrokerdFill # BrokerdFill
case { case {
'name': 'fill', 'name': 'fill',
@ -843,40 +837,18 @@ async def translate_and_relay_brokerd_events(
oid := book._ems2brokerd_ids.inverse.get(reqid) oid := book._ems2brokerd_ids.inverse.get(reqid)
): ):
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
msg = BrokerdFill(**brokerd_msg) msg = BrokerdFill(**brokerd_msg)
resp = 'broker_filled' ems_client_order_stream = router.dialogues[oid]
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') status_msg = book._active[oid]
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
case _: case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid') raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow
# entry = book._ems_entries[oid]
# assert entry.oid == oid # from when we only stored the first ack
# old_reqid = entry.reqid
# if old_reqid and old_reqid != reqid:
# log.warning(
# f'Brokerd order id change for {oid}:\n'
# f'{old_reqid} -> {reqid}'
# )
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client oid: {oid}')
# TODO: do we want this to keep things cleaned up? # TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the # it might require a special status from brokerd to affirm the
# flow is complete? # flow is complete?
@ -910,23 +882,27 @@ async def process_client_order_cmds(
# others who are registered for such order affiliated msgs). # others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
# live_entry = dark_book._ems_entries.get(oid)
live_entry = dark_book._msgflows.get(oid) # any dark/live status which is current
status = dark_book._active.get(oid)
match cmd: match cmd:
# existing live-broker order cancel # existing live-broker order cancel
case { case {
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if live_entry: } if (
# reqid = live_entry.reqid (status := dark_book._active.get(oid))
reqid = live_entry['reqid'] and status.resp in ('open', 'pending')
msg = BrokerdCancel( ):
reqid = status.reqid
order = status.req
to_brokerd_msg = BrokerdCancel(
oid=oid, oid=oid,
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
# account=live_entry.account, # account=live_entry.account,
account=live_entry['account'], account=order.account,
) )
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
@ -936,39 +912,52 @@ async def process_client_order_cmds(
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(to_brokerd_msg)
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when # acked yet by a brokerd, so register a cancel for when
# the order ack does show up later such that the brokerd # the order ack does show up later such that the brokerd
# order request can be cancelled at that time. # order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg # dark_book._ems_entries[oid] = msg
live_entry.maps.insert(0, msg.to_dict()) # special case for now..
status.req = to_brokerd_msg
# dark trigger cancel # dark trigger cancel
case { case {
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if not live_entry: } if (
# try: status and status.resp == 'dark_open'
# or status and status.req
):
# remove from dark book clearing # remove from dark book clearing
dark_book.orders[symbol].pop(oid, None) entry = dark_book.orders[symbol].pop(oid, None)
if entry:
(
pred,
tickfilter,
cmd,
percent_away,
abs_diff_away
) = entry
# tell client side that we've cancelled the # tell client side that we've cancelled the
# dark-trigger order # dark-trigger order
await client_order_stream.send( status.resp = 'canceled'
Status( status.req = cmd
resp='dark_cancelled',
oid=oid, await client_order_stream.send(status)
time_ns=time.time_ns(),
)
)
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
dark_book._active.pop(oid)
# except KeyError: else:
# log.exception(f'No dark order for {symbol}?') log.exception(f'No dark order for {symbol}?')
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
# live order submission # live order submission
case { case {
@ -977,11 +966,9 @@ async def process_client_order_cmds(
'price': trigger_price, 'price': trigger_price,
'size': size, 'size': size,
'action': ('buy' | 'sell') as action, 'action': ('buy' | 'sell') as action,
'exec_mode': 'live', 'exec_mode': ('live' | 'paper'),
}: }:
# TODO: eventually we should be receiving # TODO: relay this order msg directly?
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
req = Order(**cmd) req = Order(**cmd)
broker = req.brokers[0] broker = req.brokers[0]
@ -990,17 +977,13 @@ async def process_client_order_cmds(
# aren't expectig their own name, but should they? # aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '') sym = fqsn.replace(f'.{broker}', '')
if live_entry is not None: if status is not None:
# sanity check on emsd id, but it won't work
# for pre-existing orders that we load since
# the only msg will be a ``BrokerdStatus``
# assert live_entry.oid == oid
# reqid = live_entry.reqid
reqid = live_entry['reqid']
# if we already had a broker order id then # if we already had a broker order id then
# this is likely an order update commmand. # this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}") log.info(f"Modifying live {broker} order: {reqid}")
reqid = status.reqid
status.req = req
status.resp = 'pending'
msg = BrokerdOrder( msg = BrokerdOrder(
oid=oid, # no ib support for oids... oid=oid, # no ib support for oids...
@ -1017,6 +1000,18 @@ async def process_client_order_cmds(
account=req.account, account=req.account,
) )
if status is None:
status = Status(
oid=oid,
reqid=reqid,
resp='pending',
time_ns=time.time_ns(),
brokerd_msg=msg,
req=req,
)
dark_book._active[oid] = status
# send request to backend # send request to backend
# XXX: the trades data broker response loop # XXX: the trades data broker response loop
# (``translate_and_relay_brokerd_events()`` above) will # (``translate_and_relay_brokerd_events()`` above) will
@ -1032,8 +1027,7 @@ async def process_client_order_cmds(
# client, before that ack, when the ack does arrive we # client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel # immediately take the reqid from the broker and cancel
# that live order asap. # that live order asap.
# dark_book._ems_entries[oid] = msg # dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark-order / alert submission # dark-order / alert submission
case { case {
@ -1049,9 +1043,11 @@ async def process_client_order_cmds(
# submit order to local EMS book and scan loop, # submit order to local EMS book and scan loop,
# effectively a local clearing engine, which # effectively a local clearing engine, which
# scans for conditions and triggers matching executions # scans for conditions and triggers matching executions
exec_mode in ('dark', 'paper') exec_mode in ('dark',)
or action == 'alert' or action == 'alert'
): ):
req = Order(**cmd)
# Auto-gen scanner predicate: # Auto-gen scanner predicate:
# we automatically figure out what the alert check # we automatically figure out what the alert check
# condition should be based on the current first # condition should be based on the current first
@ -1098,23 +1094,25 @@ async def process_client_order_cmds(
)[oid] = ( )[oid] = (
pred, pred,
tickfilter, tickfilter,
cmd, req,
percent_away, percent_away,
abs_diff_away abs_diff_away
) )
resp = 'dark_submitted' resp = 'dark_open'
# alerts have special msgs to distinguish # alerts have special msgs to distinguish
if action == 'alert': # if action == 'alert':
resp = 'alert_submitted' # resp = 'open'
await client_order_stream.send( status = Status(
Status(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
req=req,
src='dark',
) )
) dark_book._active[oid] = status
await client_order_stream.send(status)
@tractor.context @tractor.context
@ -1206,35 +1204,12 @@ async def _emsd_main(
brokerd_stream = relay.brokerd_dialogue # .clone() brokerd_stream = relay.brokerd_dialogue # .clone()
# convert dialogs to status msgs for client delivery
statuses = {}
# for oid, msg in book._ems_entries.items():
for oid, msgflow in book._msgflows.items():
# we relay to the client side a msg that contains
# all data flattened from the message history.
# status = msgflow['status']
flattened = dict(msgflow)
# status = flattened['status']
flattened.pop('brokerd_msg', None)
statuses[oid] = flattened
# Status(
# oid=oid,
# time_ns=flattened['time_ns'],
# # time_ns=msg.time_ns,
# # resp=f'broker_{msg.status}',
# resp=f'broker_{status}',
# # trigger_price=msg.order.price,
# trigger_price=flattened['price'],
# brokerd_msg=flattened,
# )
# await tractor.breakpoint()
# signal to client that we're started and deliver # signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``. # all known pps and accounts for this ``brokerd``.
await ems_ctx.started(( await ems_ctx.started((
relay.positions, relay.positions,
list(relay.accounts), list(relay.accounts),
statuses, book._active,
)) ))
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and

View File

@ -18,56 +18,99 @@
Clearing sub-system message and protocols. Clearing sub-system message and protocols.
""" """
from typing import Optional, Union from collections import (
ChainMap,
deque,
)
from typing import (
Optional,
Literal,
Union,
)
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct from ..data.types import Struct
# TODO: a composite for tracking msg flow on 2-legged
# dialogs.
# class Dialog(ChainMap):
# '''
# Msg collection abstraction to easily track the state changes of
# a msg flow in one high level, query-able and immutable construct.
# The main use case is to query data from a (long-running)
# msg-transaction-sequence
# '''
# def update(
# self,
# msg,
# ) -> None:
# self.maps.insert(0, msg.to_dict())
# def flatten(self) -> dict:
# return dict(self)
# TODO: ``msgspec`` stuff worth paying attention to: # TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution # - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# - use literals for a common msg determined by diff keys? # - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal # - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# -------------- # --------------
# Client -> emsd # Client -> emsd
# -------------- # --------------
class Cancel(Struct):
'''Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
'''
action: str = 'cancel'
oid: str # uuid4
symbol: str
class Order(Struct): class Order(Struct):
# TODO: use ``msgspec.Literal`` # TODO: ideally we can combine these 2 fields into
# 1 and just use the size polarity to determine a buy/sell.
# i would like to see this become more like
# https://jcristharif.com/msgspec/usage.html#literal # https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'} # action: Literal[
# 'live',
# 'dark',
# 'alert',
# ]
action: Literal[
'buy',
'sell',
'alert',
]
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: Literal[
'dark',
'live',
# 'paper', no right?
]
# internal ``emdsd`` unique "order id" # internal ``emdsd`` unique "order id"
oid: str # uuid4 oid: str # uuid4
symbol: Union[str, Symbol] symbol: Union[str, Symbol]
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
price: float price: float
# TODO: could we drop the ``.action`` field above and instead just size: float # -ve is "sell", +ve is "buy"
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float
brokers: list[str]
# Assigned once initial ack is received brokers: Optional[list[str]] = []
# ack_time_ns: Optional[int] = None
# determines whether the create execution
# will be submitted to the ems or directly to class Cancel(Struct):
# the backend broker '''
exec_mode: str # {'dark', 'live'} Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
'''
action: str = 'cancel'
oid: str # uuid4
symbol: str
req: Optional[Order] = None
# -------------- # --------------
@ -79,35 +122,30 @@ class Order(Struct):
class Status(Struct): class Status(Struct):
name: str = 'status' name: str = 'status'
oid: str # uuid4
time_ns: int time_ns: int
# { resp: Literal[
# 'dark_submitted', 'pending', # acked but not yet open
# 'dark_cancelled', 'open',
# 'dark_triggered', 'dark_open', # live in dark loop
'triggered', # dark-submitted to brokerd-backend
'closed', # fully cleared all size/units
'fill', # partial execution
'canceled',
'error',
]
# 'broker_submitted', oid: str # uuid4
# 'broker_cancelled',
# 'broker_executed',
# 'broker_filled',
# 'broker_errored',
# 'alert_submitted',
# 'alert_triggered',
# }
resp: str # "response", see above
# trigger info
trigger_price: Optional[float] = None
# price: float
# broker: Optional[str] = None
# this maps normally to the ``BrokerdOrder.reqid`` below, an id # this maps normally to the ``BrokerdOrder.reqid`` below, an id
# normally allocated internally by the backend broker routing system # normally allocated internally by the backend broker routing system
broker_reqid: Optional[Union[int, str]] = None reqid: Optional[Union[int, str]] = None
# the (last) source order/request msg if provided
# (eg. the Order/Cancel which causes this msg)
req: Optional[Union[Order, Cancel]] = None
src: Optional[str] = None
# for relaying backend msg data "through" the ems layer # for relaying backend msg data "through" the ems layer
brokerd_msg: dict = {} brokerd_msg: dict = {}
@ -185,20 +223,19 @@ class BrokerdStatus(Struct):
name: str = 'status' name: str = 'status'
reqid: Union[int, str] reqid: Union[int, str]
time_ns: int time_ns: int
status: Literal[
'open',
'canceled',
'fill',
'pending',
]
# TODO: instead (ack, pending, open, fill, clos(ed), cancelled)
# {
# 'submitted', # open
# 'cancelled', # canceled
# 'filled', # closed
# }
status: str
account: str account: str
filled: float = 0.0 filled: float = 0.0
reason: str = '' reason: str = ''
remaining: float = 0.0 remaining: float = 0.0
external: bool = False # external: bool = False
# order: Optional[BrokerdOrder] = None # order: Optional[BrokerdOrder] = None
# XXX: better design/name here? # XXX: better design/name here?
@ -206,7 +243,7 @@ class BrokerdStatus(Struct):
# event that wasn't originated by piker's emsd (eg. some external # event that wasn't originated by piker's emsd (eg. some external
# trading system which does it's own order control but that you # trading system which does it's own order control but that you
# might want to "track" using piker UIs/systems). # might want to "track" using piker UIs/systems).
external: bool = False # external: bool = False
# XXX: not required schema as of yet # XXX: not required schema as of yet
broker_details: dict = { broker_details: dict = {

View File

@ -45,8 +45,13 @@ from ..data._normalize import iterticks
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
from ..log import get_logger from ..log import get_logger
from ._messages import ( from ._messages import (
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdCancel,
BrokerdFill, BrokerdPosition, BrokerdError BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdPosition,
BrokerdError,
) )
@ -94,6 +99,10 @@ class PaperBoi:
''' '''
is_modify: bool = False is_modify: bool = False
if action == 'alert':
# bypass all fill simulation
return reqid
entry = self._reqids.get(reqid) entry = self._reqids.get(reqid)
if entry: if entry:
# order is already existing, this is a modify # order is already existing, this is a modify
@ -104,10 +113,6 @@ class PaperBoi:
# register order internally # register order internally
self._reqids[reqid] = (oid, symbol, action, price) self._reqids[reqid] = (oid, symbol, action, price)
if action == 'alert':
# bypass all fill simulation
return reqid
# TODO: net latency model # TODO: net latency model
# we checkpoint here quickly particulalry # we checkpoint here quickly particulalry
# for dark orders since we want the dark_executed # for dark orders since we want the dark_executed
@ -119,7 +124,9 @@ class PaperBoi:
size = -size size = -size
msg = BrokerdStatus( msg = BrokerdStatus(
status='submitted', status='open',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
filled=0.0, filled=0.0,
@ -136,7 +143,14 @@ class PaperBoi:
) or ( ) or (
action == 'sell' and (clear_price := self.last_bid[0]) >= price action == 'sell' and (clear_price := self.last_bid[0]) >= price
): ):
await self.fake_fill(symbol, clear_price, size, action, reqid, oid) await self.fake_fill(
symbol,
clear_price,
size,
action,
reqid,
oid,
)
else: else:
# register this submissions as a paper live order # register this submissions as a paper live order
@ -178,7 +192,9 @@ class PaperBoi:
await trio.sleep(0.05) await trio.sleep(0.05)
msg = BrokerdStatus( msg = BrokerdStatus(
status='cancelled', status='canceled',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_details={'name': 'paperboi'}, broker_details={'name': 'paperboi'},
@ -230,25 +246,23 @@ class PaperBoi:
self._trade_ledger.update(fill_msg.to_dict()) self._trade_ledger.update(fill_msg.to_dict())
if order_complete: if order_complete:
msg = BrokerdStatus( msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
# account=f'paper_{self.broker}',
status='filled', account='paper',
status='closed',
filled=size, filled=size,
remaining=0 if order_complete else remaining, remaining=0 if order_complete else remaining,
# broker_details={
broker_details={ # 'paper_info': {
'paper_info': { # 'oid': oid,
'oid': oid, # },
}, # 'action': action,
'action': action, # 'size': size,
'size': size, # 'price': price,
'price': price, # 'name': self.broker,
'name': self.broker, # },
},
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg)
@ -393,30 +407,33 @@ async def handle_order_requests(
# order_request: dict # order_request: dict
async for request_msg in ems_order_stream: async for request_msg in ems_order_stream:
action = request_msg['action'] # action = request_msg['action']
match request_msg:
if action in {'buy', 'sell'}: # if action in {'buy', 'sell'}:
case {'action': ('buy' | 'sell')}:
account = request_msg['account'] order = BrokerdOrder(**request_msg)
account = order.account
if account != 'paper': if account != 'paper':
log.error( log.error(
'This is a paper account,' 'This is a paper account,'
' only a `paper` selection is valid' ' only a `paper` selection is valid'
) )
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'], # oid=request_msg['oid'],
symbol=request_msg['symbol'], oid=order.oid,
# symbol=request_msg['symbol'],
symbol=order.symbol,
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
)) ))
continue continue
# validate # validate
order = BrokerdOrder(**request_msg) # order = BrokerdOrder(**request_msg)
if order.reqid is None: # if order.reqid is None:
reqid = str(uuid.uuid4()) # reqid =
else: # else:
reqid = order.reqid reqid = order.reqid or str(uuid.uuid4())
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -447,14 +464,14 @@ async def handle_order_requests(
reqid=reqid, reqid=reqid,
) )
elif action == 'cancel': # elif action == 'cancel':
case {'action': 'cancel'}:
msg = BrokerdCancel(**request_msg) msg = BrokerdCancel(**request_msg)
await client.submit_cancel( await client.submit_cancel(
reqid=msg.reqid reqid=msg.reqid
) )
else: case _:
log.error(f'Unknown order command: {request_msg}') log.error(f'Unknown order command: {request_msg}')

View File

@ -63,7 +63,7 @@ from ._forms import open_form_input_handling
log = get_logger(__name__) log = get_logger(__name__)
class OrderDialog(Struct): class Dialog(Struct):
''' '''
Trade dialogue meta-data describing the lifetime Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart. of an order submission to ``emsd`` from a chart.
@ -146,7 +146,7 @@ class OrderMode:
current_pp: Optional[PositionTracker] = None current_pp: Optional[PositionTracker] = None
active: bool = False active: bool = False
name: str = 'order' name: str = 'order'
dialogs: dict[str, OrderDialog] = field(default_factory=dict) dialogs: dict[str, Dialog] = field(default_factory=dict)
_colors = { _colors = {
'alert': 'alert_yellow', 'alert': 'alert_yellow',
@ -163,6 +163,7 @@ class OrderMode:
) -> LevelLine: ) -> LevelLine:
level = order.price level = order.price
print(f'SIZE: {order.size}')
line = order_line( line = order_line(
self.chart, self.chart,
@ -175,7 +176,8 @@ class OrderMode:
color=self._colors[order.action], color=self._colors[order.action],
dotted=True if ( dotted=True if (
order.exec_mode == 'dark' and order.action != 'alert' order.exec_mode == 'dark'
and order.action != 'alert'
) else False, ) else False,
**line_kwargs, **line_kwargs,
@ -265,7 +267,7 @@ class OrderMode:
send_msg: bool = True, send_msg: bool = True,
order: Optional[Order] = None, order: Optional[Order] = None,
) -> OrderDialog: ) -> Dialog:
''' '''
Send execution order to EMS return a level line to Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
@ -304,7 +306,7 @@ class OrderMode:
uuid=order.oid, uuid=order.oid,
) )
dialog = OrderDialog( dialog = Dialog(
uuid=order.oid, uuid=order.oid,
order=order, order=order,
symbol=order.symbol, symbol=order.symbol,
@ -373,7 +375,7 @@ class OrderMode:
self, self,
uuid: str uuid: str
) -> OrderDialog: ) -> Dialog:
''' '''
Order submitted status event handler. Order submitted status event handler.
@ -428,7 +430,7 @@ class OrderMode:
self, self,
uuid: str, uuid: str,
msg: Dict[str, Any], msg: Status,
) -> None: ) -> None:
@ -452,7 +454,7 @@ class OrderMode:
# TODO: add in standard fill/exec info that maybe we # TODO: add in standard fill/exec info that maybe we
# pack in a broker independent way? # pack in a broker independent way?
f'{msg["resp"]}: {msg["trigger_price"]}', f'{msg.resp}: {msg.req.price}',
], ],
) )
log.runtime(result) log.runtime(result)
@ -524,53 +526,36 @@ class OrderMode:
def load_unknown_dialog_from_msg( def load_unknown_dialog_from_msg(
self, self,
# status: Status, msg: Status,
msg: dict,
) -> OrderDialog: ) -> Dialog:
oid = str(msg['oid'])
# oid = str(status.oid)
# bstatus = BrokerdStatus(**msg.brokerd_msg)
# NOTE: the `.order` attr **must** be set with the # NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded. # equivalent order msg in order to be loaded.
# border = BrokerdOrder(**bstatus.broker_details['order']) order = Order(**msg.req)
# msg = msg['brokerd_msg'] oid = str(msg.oid)
symbol = order.symbol
# size = border.size # TODO: MEGA UGGG ZONEEEE!
size = msg['size'] src = msg.src
if size >= 0: if (
action = 'buy' src
and src != 'dark'
and src not in symbol
):
fqsn = symbol + '.' + src
brokername = src
else: else:
action = 'sell' fqsn = symbol
*head, brokername = fqsn.rsplit('.')
# acct = border.account # fill out complex fields
# price = border.price order.oid = str(order.oid)
# price = msg['brokerd_msg']['price'] order.brokers = [brokername]
symbol = msg['symbol'] order.symbol = Symbol.from_fqsn(
deats = msg['broker_details']
brokername = deats['name']
fqsn = (
# deats['fqsn'] + '.' + deats['name']
symbol + '.' + brokername
)
symbol = Symbol.from_fqsn(
fqsn=fqsn, fqsn=fqsn,
info={}, info={},
) )
# map to order composite-type
order = Order(
action=action,
price=msg['price'],
account=msg['account'],
size=size,
symbol=symbol,
brokers=[brokername],
oid=oid,
exec_mode='live', # dark or live
)
dialog = self.submit_order( dialog = self.submit_order(
send_msg=False, send_msg=False,
order=order, order=order,
@ -770,7 +755,7 @@ async def open_order_mode(
order_pane.order_mode = mode order_pane.order_mode = mode
# select a pp to track # select a pp to track
tracker = trackers[pp_account] tracker: PositionTracker = trackers[pp_account]
mode.current_pp = tracker mode.current_pp = tracker
tracker.show() tracker.show()
tracker.hide_info() tracker.hide_info()
@ -870,12 +855,13 @@ async def process_trade_msg(
book: OrderBook, book: OrderBook,
msg: dict, msg: dict,
) -> None: ) -> tuple[Dialog, Status]:
get_index = mode.chart.get_index get_index = mode.chart.get_index
fmsg = pformat(msg) fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}') log.info(f'Received order msg:\n{fmsg}')
name = msg['name'] name = msg['name']
if name in ( if name in (
'position', 'position',
): ):
@ -901,86 +887,92 @@ async def process_trade_msg(
# short circuit to next msg to avoid # short circuit to next msg to avoid
# unnecessary msg content lookups # unnecessary msg content lookups
return return
# continue
resp = msg['resp'] msg = Status(**msg)
oid = str(msg['oid']) resp = msg.resp
dialog = mode.dialogs.get(oid) oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid)
if dialog is None: match msg:
log.warning( case Status(resp='dark_open' | 'open'):
f'received msg for untracked dialog:\n{fmsg}'
)
# dialog = mode.load_unknown_dialog_from_msg(Status(**msg))
dialog = mode.load_unknown_dialog_from_msg(msg)
# record message to dialog tracking if dialog is not None:
dialog.msgs[oid] = msg
# response to 'action' request (buy/sell)
if resp in (
'dark_submitted',
'broker_submitted'
):
# show line label once order is live # show line label once order is live
mode.on_submit(oid) mode.on_submit(oid)
# resp to 'cancel' request or error condition else:
# for action request log.warning(
elif resp in ( f'received msg for untracked dialog:\n{fmsg}'
'broker_inactive', )
'broker_errored', assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}'
sym = mode.chart.linked.symbol
fqsn = sym.front_fqsn()
order = Order(**msg.req)
if (
((order.symbol + f'.{msg.src}') == fqsn)
# a existing dark order for the same symbol
or (
order.symbol == fqsn
and (msg.src == 'dark') or (msg.src in fqsn)
)
): ):
dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid)
case Status(resp='error'):
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
broker_msg = msg['brokerd_msg'] broker_msg = msg.brokerd_msg
log.error( log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
) )
elif resp in ( case Status(resp='canceled'):
'broker_cancelled',
'dark_cancelled'
):
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
broker_msg = msg['brokerd_msg'] req = msg.req
log.cancel( log.cancel(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' f'Canceled order {oid}:\n{pformat(req)}'
) )
elif resp in ( case Status(
'dark_triggered' resp='triggered',
# req=Order(exec_mode='dark') # TODO:
req={'exec_mode': 'dark'},
): ):
log.info(f'Dark order triggered for {fmsg}') log.info(f'Dark order triggered for {fmsg}')
elif resp in ( case Status(
'alert_triggered' resp='triggered',
# req=Order(exec_mode='live', action='alert') as req, # TODO
req={'exec_mode': 'live', 'action': 'alert'} as req,
): ):
# should only be one "fill" for an alert # should only be one "fill" for an alert
# add a triangle and remove the level line # add a triangle and remove the level line
mode.on_fill( mode.on_fill(
oid, oid,
price=msg['trigger_price'], price=req.price,
arrow_index=get_index(time.time()), arrow_index=get_index(time.time()),
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
msg.req = Order(**req)
await mode.on_exec(oid, msg) await mode.on_exec(oid, msg)
# response to completed 'action' request for buy/sell # response to completed 'dialog' for order request
elif resp in ( case Status(
'broker_executed', resp='closed',
# req=Order() as req, # TODO
req=req,
): ):
# right now this is just triggering a system alert # right now this is just triggering a system alert
msg.req = Order(**req)
await mode.on_exec(oid, msg) await mode.on_exec(oid, msg)
if msg['brokerd_msg']['remaining'] == 0:
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually # each clearing tick is responded individually
elif resp in ( case Status(resp='fill'):
'broker_filled',
):
known_order = book._sent_orders.get(oid) known_order = book._sent_orders.get(oid)
if not known_order: if not known_order:
log.warning(f'order {oid} is unknown') log.warning(f'order {oid} is unknown')
@ -988,7 +980,7 @@ async def process_trade_msg(
# continue # continue
action = known_order.action action = known_order.action
details = msg['brokerd_msg'] details = msg.brokerd_msg
# TODO: some kinda progress system # TODO: some kinda progress system
mode.on_fill( mode.on_fill(
@ -1003,3 +995,9 @@ async def process_trade_msg(
# TODO: how should we look this up? # TODO: how should we look this up?
# tracker = mode.trackers[msg['account']] # tracker = mode.trackers[msg['account']]
# tracker.live_pp.fills.append(msg) # tracker.live_pp.fills.append(msg)
# record message to dialog tracking
if dialog:
dialog.msgs[oid] = msg
return dialog, msg