WIP playing with a `ChainMap` of messages

open_order_loading
Tyler Goodlet 2022-08-08 13:35:01 -04:00
parent 2548aae73d
commit 87ed9abefa
4 changed files with 236 additions and 121 deletions

View File

@ -127,7 +127,8 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
)) )
)
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -495,17 +496,16 @@ async def trades_dialogue(
'BUY': 1, 'BUY': 1,
}[order.action] * quant }[order.action] * quant
fqsn, _ = con2fqsn(trade.contract) fqsn, _ = con2fqsn(trade.contract)
reqid = order.orderId
# TODO: maybe embed a ``BrokerdOrder`` instead # TODO: maybe embed a ``BrokerdOrder`` instead
# since then we can directly load it on the client # since then we can directly load it on the client
# side in the order mode loop? # side in the order mode loop?
msg = BrokerdStatus( msg = BrokerdStatus(
reqid=order.orderId, reqid=reqid,
time_ns=time.time_ns(), time_ns=(ts := time.time_ns()),
account=accounts_def.inverse[order.account],
status='submitted', status='submitted',
size=size, account=accounts_def.inverse[order.account],
price=order.lmtPrice,
filled=0, filled=0,
reason='Existing live order', reason='Existing live order',
@ -516,6 +516,17 @@ async def trades_dialogue(
broker_details={ broker_details={
'name': 'ib', 'name': 'ib',
'fqsn': fqsn, 'fqsn': fqsn,
# this is a embedded/boxed order
# msg that can be loaded by the ems
# and for relay to clients.
'order': BrokerdOrder(
symbol=fqsn,
account=accounts_def.inverse[order.account],
oid=reqid,
time_ns=ts,
size=size,
price=order.lmtPrice,
),
}, },
) )
order_msgs.append(msg) order_msgs.append(msg)

View File

@ -18,8 +18,8 @@
In da suit parlances: "Execution management systems" In da suit parlances: "Execution management systems"
""" """
from collections import defaultdict, ChainMap
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan from math import isnan
from pprint import pformat from pprint import pformat
import time import time
@ -41,9 +41,15 @@ from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Status, Order, Order,
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, Status,
BrokerdFill, BrokerdError, BrokerdPosition, BrokerdCancel,
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdError,
BrokerdPosition,
) )
@ -90,8 +96,7 @@ def mk_check(
) )
@dataclass class _DarkBook(Struct):
class _DarkBook:
''' '''
EMS-trigger execution book. EMS-trigger execution book.
@ -116,17 +121,23 @@ class _DarkBook:
dict, # cmd / msg type dict, # cmd / msg type
] ]
] ]
] = field(default_factory=dict) ] = {}
# tracks most recent values per symbol each from data feed # tracks most recent values per symbol each from data feed
lasts: dict[ lasts: dict[
str, str,
float, float,
] = field(default_factory=dict) ] = {}
# mapping of piker ems order ids to current brokerd order flow message # _ems_entries: dict[str, str] = {}
_ems_entries: dict[str, str] = field(default_factory=dict)
_ems2brokerd_ids: dict[str, str] = field(default_factory=bidict) # mapping of ems dialog ids to msg flow history
_msgflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
_ems2brokerd_ids: dict[str, str] = bidict()
# XXX: this is in place to prevent accidental positions that are too # XXX: this is in place to prevent accidental positions that are too
@ -240,7 +251,8 @@ async def clear_dark_triggers(
# a ``BrokerdOrderAck`` msg including the # a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key # allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems. # generated by the broker's own systems.
book._ems_entries[oid] = live_req # book._ems_entries[oid] = live_req
book._msgflows[oid].append(live_req)
case _: case _:
raise ValueError(f'Invalid dark book entry: {cmd}') raise ValueError(f'Invalid dark book entry: {cmd}')
@ -281,8 +293,7 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
@dataclass class TradesRelay(Struct):
class TradesRelay:
# for now we keep only a single connection open with # for now we keep only a single connection open with
# each ``brokerd`` for simplicity. # each ``brokerd`` for simplicity.
@ -318,7 +329,10 @@ class Router(Struct):
# order id to client stream map # order id to client stream map
clients: set[tractor.MsgStream] = set() clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {} dialogues: dict[
str,
list[tractor.MsgStream]
] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors # brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {} relays: dict[str, TradesRelay] = {}
@ -341,8 +355,9 @@ class Router(Struct):
loglevel: str, loglevel: str,
) -> tuple[dict, tractor.MsgStream]: ) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none '''
already exists. Open and yield ``brokerd`` trades dialogue context-stream if
none already exists.
''' '''
relay: TradesRelay = self.relays.get(feed.mod.name) relay: TradesRelay = self.relays.get(feed.mod.name)
@ -614,7 +629,8 @@ async def translate_and_relay_brokerd_events(
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
'oid': oid, # ems order-dialog id 'oid': oid, # ems order-dialog id
} if ( } if (
entry := book._ems_entries.get(oid) # entry := book._ems_entries.get(oid)
flow := book._msgflows.get(oid)
): ):
# initial response to brokerd order request # initial response to brokerd order request
# if name == 'ack': # if name == 'ack':
@ -637,10 +653,14 @@ async def translate_and_relay_brokerd_events(
# cancelled by the ems controlling client before we # cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel # received this ack, in which case we relay that cancel
# signal **asap** to the backend broker # signal **asap** to the backend broker
action = getattr(entry, 'action', None) action = flow.get('action')
# action = getattr(entry, 'action', None)
if action and action == 'cancel': if action and action == 'cancel':
# assign newly providerd broker backend request id # assign newly providerd broker backend request id
entry.reqid = reqid flow['reqid'] = reqid
# entry.reqid = reqid
entry = flow.maps[0]
# tell broker to cancel immediately # tell broker to cancel immediately
await brokerd_trades_stream.send(entry) await brokerd_trades_stream.send(entry)
@ -649,7 +669,11 @@ async def translate_and_relay_brokerd_events(
# our book -> registered as live flow # our book -> registered as live flow
else: else:
# update the flow with the ack msg # update the flow with the ack msg
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) # book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
flow.maps.insert(
0,
BrokerdOrderAck(**brokerd_msg).to_dict()
)
# no msg to client necessary # no msg to client necessary
continue continue
@ -666,6 +690,7 @@ async def translate_and_relay_brokerd_events(
msg = BrokerdError(**brokerd_msg) msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored' resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank? log.error(pformat(msg)) # XXX make one when it's blank?
book._msgflows[oid].maps.insert(0, msg.to_dict())
# TODO: figure out how this will interact with EMS clients # TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders # for ex. on an error do we react with a dark orders
@ -686,6 +711,9 @@ async def translate_and_relay_brokerd_events(
} if ( } if (
oid := book._ems2brokerd_ids.inverse.get(reqid) oid := book._ems2brokerd_ids.inverse.get(reqid)
): ):
# ack = book._ems_entries[oid]
# ack = book._msgflows[oid].maps[0]
msg = BrokerdStatus(**brokerd_msg) msg = BrokerdStatus(**brokerd_msg)
# TODO: should we flatten out these cases and/or should # TODO: should we flatten out these cases and/or should
@ -704,6 +732,9 @@ async def translate_and_relay_brokerd_events(
# since the order dialogue should be done. # since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
# remove from active flows
book._msgflows.pop(oid)
# just log it # just log it
else: else:
log.info(f'{broker} filled {msg}') log.info(f'{broker} filled {msg}')
@ -712,7 +743,21 @@ async def translate_and_relay_brokerd_events(
# one of {submitted, cancelled} # one of {submitted, cancelled}
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# unknown valid BrokerdStatus
# book._ems_entries[oid] = msg
book._msgflows[oid].maps.insert(0, msg.to_dict())
# TODO: i wonder if we should just support receiving an
# actual ``BrokerdOrder`` msg here? Is it a bad idea to
# presume that inbound orders on the backend dialog can be
# used to drive order tracking/tracing in the EMS *over*
# a set of backends from some other non-ems owner?
# this will likely feel better once we get open_msg_scope()
# or wtv finished.
# BrokerdStatus containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the
# brokerd backend.
case { case {
'name': 'status', 'name': 'status',
'status': status, 'status': status,
@ -724,7 +769,18 @@ async def translate_and_relay_brokerd_events(
# to be able to more formally handle multi-player # to be able to more formally handle multi-player
# trading... # trading...
if status == 'submitted': if status != 'submitted':
log.error(
f'Unknown status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
else:
# existing open backend order which we broadcast to
# all currently connected clients.
order_dict = brokerd_msg['broker_details'].pop('order')
order = BrokerdOrder(**order_dict)
msg = BrokerdStatus(**brokerd_msg) msg = BrokerdStatus(**brokerd_msg)
log.info( log.info(
f'Relaying existing open order:\n {brokerd_msg}' f'Relaying existing open order:\n {brokerd_msg}'
@ -734,21 +790,48 @@ async def translate_and_relay_brokerd_events(
# may end up with collisions? # may end up with collisions?
broker = details['name'] broker = details['name']
oid = str(reqid) oid = str(reqid)
book._ems_entries[oid] = msg
# attempt to avoid collisions # attempt to avoid collisions
msg.reqid = oid msg.reqid = oid
resp = 'broker_submitted'
# XXX: MEGA HACK ALERT FOR the dialog entries delivery
# on client connect...
# TODO: fix this garbage..
msg.broker_details['resp'] = resp = 'broker_submitted'
# register this existing broker-side dialog # register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid book._ems2brokerd_ids[oid] = reqid
# book._ems_entries[oid] = msg
else: # fill in approximate msg flow history
log.error( flow = book._msgflows[oid]
f'Unknown status msg:\n' flow.maps.insert(0, order.to_dict())
f'{pformat(brokerd_msg)}\n' flow.maps.insert(0, msg.to_dict())
'Unable to relay message to client side!?' flow.maps.insert(0, details)
) flattened = dict(flow)
# await tractor.breakpoint()
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(flattened)
# Status(
# oid=oid,
# resp=resp,
# time_ns=time.time_ns(),
# broker_reqid=reqid,
# brokerd_msg=flattened,
# )
# )
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
# don't fall through
continue continue
# BrokerdFill # BrokerdFill
@ -768,16 +851,14 @@ async def translate_and_relay_brokerd_events(
raise ValueError(f'Brokerd message {brokerd_msg} is invalid') raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow # retrieve existing live flow
entry = book._ems_entries[oid] # entry = book._ems_entries[oid]
# assert entry.oid == oid # from when we only stored the first ack
if getattr(entry, 'oid', None): # old_reqid = entry.reqid
assert entry.oid == oid # if old_reqid and old_reqid != reqid:
old_reqid = entry.reqid # log.warning(
if old_reqid and old_reqid != reqid: # f'Brokerd order id change for {oid}:\n'
log.warning( # f'{old_reqid} -> {reqid}'
f'Brokerd order id change for {oid}:\n' # )
f'{old_reqid} -> {reqid}'
)
# Create and relay response status message # Create and relay response status message
# to requesting EMS client # to requesting EMS client
@ -796,31 +877,6 @@ async def translate_and_relay_brokerd_events(
log.error( log.error(
f'Received `brokerd` msg for unknown client oid: {oid}') f'Received `brokerd` msg for unknown client oid: {oid}')
else:
# existing open order relay
assert oid == entry.reqid
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
# TODO: do we want this to keep things cleaned up? # TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the # it might require a special status from brokerd to affirm the
# flow is complete? # flow is complete?
@ -854,7 +910,8 @@ async def process_client_order_cmds(
# others who are registered for such order affiliated msgs). # others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid) # live_entry = dark_book._ems_entries.get(oid)
live_entry = dark_book._msgflows.get(oid)
match cmd: match cmd:
# existing live-broker order cancel # existing live-broker order cancel
@ -862,12 +919,14 @@ async def process_client_order_cmds(
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if live_entry: } if live_entry:
reqid = live_entry.reqid # reqid = live_entry.reqid
reqid = live_entry['reqid']
msg = BrokerdCancel( msg = BrokerdCancel(
oid=oid, oid=oid,
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
account=live_entry.account, # account=live_entry.account,
account=live_entry['account'],
) )
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
@ -885,6 +944,7 @@ async def process_client_order_cmds(
# the order ack does show up later such that the brokerd # the order ack does show up later such that the brokerd
# order request can be cancelled at that time. # order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg dark_book._ems_entries[oid] = msg
live_entry.maps.insert(0, msg.to_dict())
# dark trigger cancel # dark trigger cancel
case { case {
@ -936,7 +996,8 @@ async def process_client_order_cmds(
# the only msg will be a ``BrokerdStatus`` # the only msg will be a ``BrokerdStatus``
# assert live_entry.oid == oid # assert live_entry.oid == oid
reqid = live_entry.reqid # reqid = live_entry.reqid
reqid = live_entry['reqid']
# if we already had a broker order id then # if we already had a broker order id then
# this is likely an order update commmand. # this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}") log.info(f"Modifying live {broker} order: {reqid}")
@ -971,7 +1032,8 @@ async def process_client_order_cmds(
# client, before that ack, when the ack does arrive we # client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel # immediately take the reqid from the broker and cancel
# that live order asap. # that live order asap.
dark_book._ems_entries[oid] = msg # dark_book._ems_entries[oid] = msg
dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark-order / alert submission # dark-order / alert submission
case { case {
@ -1144,12 +1206,35 @@ async def _emsd_main(
brokerd_stream = relay.brokerd_dialogue # .clone() brokerd_stream = relay.brokerd_dialogue # .clone()
# convert dialogs to status msgs for client delivery
statuses = {}
# for oid, msg in book._ems_entries.items():
for oid, msgflow in book._msgflows.items():
# we relay to the client side a msg that contains
# all data flattened from the message history.
# status = msgflow['status']
flattened = dict(msgflow)
# status = flattened['status']
flattened.pop('brokerd_msg', None)
statuses[oid] = flattened
# Status(
# oid=oid,
# time_ns=flattened['time_ns'],
# # time_ns=msg.time_ns,
# # resp=f'broker_{msg.status}',
# resp=f'broker_{status}',
# # trigger_price=msg.order.price,
# trigger_price=flattened['price'],
# brokerd_msg=flattened,
# )
# await tractor.breakpoint()
# signal to client that we're started and deliver # signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``. # all known pps and accounts for this ``brokerd``.
await ems_ctx.started(( await ems_ctx.started((
relay.positions, relay.positions,
list(relay.accounts), list(relay.accounts),
book._ems_entries, statuses,
)) ))
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and

View File

@ -67,7 +67,7 @@ class Order(Struct):
# determines whether the create execution # determines whether the create execution
# will be submitted to the ems or directly to # will be submitted to the ems or directly to
# the backend broker # the backend broker
exec_mode: str # {'dark', 'live', 'paper'} exec_mode: str # {'dark', 'live'}
# -------------- # --------------
@ -136,11 +136,14 @@ class BrokerdCancel(Struct):
class BrokerdOrder(Struct): class BrokerdOrder(Struct):
action: str # {buy, sell}
oid: str oid: str
account: str account: str
time_ns: int time_ns: int
# TODO: if we instead rely on a +ve/-ve size to determine
# the action we more or less don't need this field right?
action: str = '' # {buy, sell}
# "broker request id": broker specific/internal order id if this is # "broker request id": broker specific/internal order id if this is
# None, creates a new order otherwise if the id is valid the backend # None, creates a new order otherwise if the id is valid the backend
# api must modify the existing matching order. If the broker allows # api must modify the existing matching order. If the broker allows
@ -149,7 +152,7 @@ class BrokerdOrder(Struct):
# field # field
reqid: Optional[Union[int, str]] = None reqid: Optional[Union[int, str]] = None
symbol: str # symbol.<providername> ? symbol: str # fqsn
price: float price: float
size: float size: float
@ -183,25 +186,21 @@ class BrokerdStatus(Struct):
reqid: Union[int, str] reqid: Union[int, str]
time_ns: int time_ns: int
# XXX: should be best effort set for every update
account: str = ''
# TODO: instead (ack, pending, open, fill, clos(ed), cancelled) # TODO: instead (ack, pending, open, fill, clos(ed), cancelled)
# { # {
# 'submitted', # 'submitted', # open
# 'cancelled', # 'cancelled', # canceled
# 'filled', # 'filled', # closed
# } # }
status: str status: str
account: str
# +ve is buy, -ve is sell
size: float = 0.0
price: float = 0.0
filled: float = 0.0 filled: float = 0.0
reason: str = '' reason: str = ''
remaining: float = 0.0 remaining: float = 0.0
external: bool = False
# order: Optional[BrokerdOrder] = None
# XXX: better design/name here? # XXX: better design/name here?
# flag that can be set to indicate a message for an order # flag that can be set to indicate a message for an order
# event that wasn't originated by piker's emsd (eg. some external # event that wasn't originated by piker's emsd (eg. some external

View File

@ -49,9 +49,14 @@ from ._position import (
SettingsPane, SettingsPane,
) )
from ._forms import FieldsForm from ._forms import FieldsForm
# from ._label import FormatLabel
from ._window import MultiStatus from ._window import MultiStatus
from ..clearing._messages import Order, BrokerdPosition from ..clearing._messages import (
Order,
Status,
# BrokerdOrder,
# BrokerdStatus,
BrokerdPosition,
)
from ._forms import open_form_input_handling from ._forms import open_form_input_handling
@ -519,22 +524,36 @@ class OrderMode:
def load_unknown_dialog_from_msg( def load_unknown_dialog_from_msg(
self, self,
# status: Status,
msg: dict, msg: dict,
) -> OrderDialog: ) -> OrderDialog:
oid = str(msg['oid']) oid = str(msg['oid'])
size = msg['brokerd_msg']['size'] # oid = str(status.oid)
# bstatus = BrokerdStatus(**msg.brokerd_msg)
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
# border = BrokerdOrder(**bstatus.broker_details['order'])
# msg = msg['brokerd_msg']
# size = border.size
size = msg['size']
if size >= 0: if size >= 0:
action = 'buy' action = 'buy'
else: else:
action = 'sell' action = 'sell'
acct = msg['brokerd_msg']['account'] # acct = border.account
price = msg['brokerd_msg']['price'] # price = border.price
deats = msg['brokerd_msg']['broker_details'] # price = msg['brokerd_msg']['price']
symbol = msg['symbol']
deats = msg['broker_details']
brokername = deats['name']
fqsn = ( fqsn = (
deats['fqsn'] + '.' + deats['name'] # deats['fqsn'] + '.' + deats['name']
symbol + '.' + brokername
) )
symbol = Symbol.from_fqsn( symbol = Symbol.from_fqsn(
fqsn=fqsn, fqsn=fqsn,
@ -543,11 +562,11 @@ class OrderMode:
# map to order composite-type # map to order composite-type
order = Order( order = Order(
action=action, action=action,
price=price, price=msg['price'],
account=acct, account=msg['account'],
size=size, size=size,
symbol=symbol, symbol=symbol,
brokers=symbol.brokers, brokers=[brokername],
oid=oid, oid=oid,
exec_mode='live', # dark or live exec_mode='live', # dark or live
) )
@ -808,8 +827,8 @@ async def open_order_mode(
# HACK ALERT: ensure a resp field is filled out since # HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO: # techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side? # parse into proper ``Status`` equivalents ems-side?
msg.setdefault('resp', msg['broker_details']['resp']) # msg.setdefault('resp', msg['broker_details']['resp'])
msg.setdefault('oid', msg['broker_details']['oid']) # msg.setdefault('oid', msg['broker_details']['oid'])
msg['brokerd_msg'] = msg msg['brokerd_msg'] = msg
await process_trade_msg( await process_trade_msg(
@ -892,6 +911,7 @@ async def process_trade_msg(
log.warning( log.warning(
f'received msg for untracked dialog:\n{fmsg}' f'received msg for untracked dialog:\n{fmsg}'
) )
# dialog = mode.load_unknown_dialog_from_msg(Status(**msg))
dialog = mode.load_unknown_dialog_from_msg(msg) dialog = mode.load_unknown_dialog_from_msg(msg)
# record message to dialog tracking # record message to dialog tracking