WIP playing with a `ChainMap` of messages

dict_differ
Tyler Goodlet 2022-08-08 13:35:01 -04:00
parent a3812cd169
commit bbbdcad33b
4 changed files with 236 additions and 121 deletions

View File

@ -128,7 +128,8 @@ async def handle_order_requests(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
))
)
)
continue
client = _accounts2clients.get(account)
@ -482,17 +483,16 @@ async def trades_dialogue(
'BUY': 1,
}[order.action] * quant
fqsn, _ = con2fqsn(trade.contract)
reqid = order.orderId
# TODO: maybe embed a ``BrokerdOrder`` instead
# since then we can directly load it on the client
# side in the order mode loop?
msg = BrokerdStatus(
reqid=order.orderId,
time_ns=time.time_ns(),
account=accounts_def.inverse[order.account],
reqid=reqid,
time_ns=(ts := time.time_ns()),
status='submitted',
size=size,
price=order.lmtPrice,
account=accounts_def.inverse[order.account],
filled=0,
reason='Existing live order',
@ -503,6 +503,17 @@ async def trades_dialogue(
broker_details={
'name': 'ib',
'fqsn': fqsn,
# this is a embedded/boxed order
# msg that can be loaded by the ems
# and for relay to clients.
'order': BrokerdOrder(
symbol=fqsn,
account=accounts_def.inverse[order.account],
oid=reqid,
time_ns=ts,
size=size,
price=order.lmtPrice,
),
},
)
order_msgs.append(msg)

View File

@ -18,8 +18,8 @@
In da suit parlances: "Execution management systems"
"""
from collections import defaultdict, ChainMap
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
@ -41,9 +41,15 @@ from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
Status, Order,
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill, BrokerdError, BrokerdPosition,
Order,
Status,
BrokerdCancel,
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdError,
BrokerdPosition,
)
@ -90,8 +96,7 @@ def mk_check(
)
@dataclass
class _DarkBook:
class _DarkBook(Struct):
'''
EMS-trigger execution book.
@ -116,17 +121,23 @@ class _DarkBook:
dict, # cmd / msg type
]
]
] = field(default_factory=dict)
] = {}
# tracks most recent values per symbol each from data feed
lasts: dict[
str,
float,
] = field(default_factory=dict)
] = {}
# mapping of piker ems order ids to current brokerd order flow message
_ems_entries: dict[str, str] = field(default_factory=dict)
_ems2brokerd_ids: dict[str, str] = field(default_factory=bidict)
# _ems_entries: dict[str, str] = {}
# mapping of ems dialog ids to msg flow history
_msgflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
_ems2brokerd_ids: dict[str, str] = bidict()
# XXX: this is in place to prevent accidental positions that are too
@ -240,7 +251,8 @@ async def clear_dark_triggers(
# a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems.
book._ems_entries[oid] = live_req
# book._ems_entries[oid] = live_req
book._msgflows[oid].append(live_req)
case _:
raise ValueError(f'Invalid dark book entry: {cmd}')
@ -281,8 +293,7 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}')
@dataclass
class TradesRelay:
class TradesRelay(Struct):
# for now we keep only a single connection open with
# each ``brokerd`` for simplicity.
@ -318,7 +329,10 @@ class Router(Struct):
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {}
dialogues: dict[
str,
list[tractor.MsgStream]
] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
@ -341,8 +355,9 @@ class Router(Struct):
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
Open and yield ``brokerd`` trades dialogue context-stream if
none already exists.
'''
relay: TradesRelay = self.relays.get(feed.mod.name)
@ -614,7 +629,8 @@ async def translate_and_relay_brokerd_events(
'reqid': reqid, # brokerd generated order-request id
'oid': oid, # ems order-dialog id
} if (
entry := book._ems_entries.get(oid)
# entry := book._ems_entries.get(oid)
flow := book._msgflows.get(oid)
):
# initial response to brokerd order request
# if name == 'ack':
@ -637,10 +653,14 @@ async def translate_and_relay_brokerd_events(
# cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
action = getattr(entry, 'action', None)
action = flow.get('action')
# action = getattr(entry, 'action', None)
if action and action == 'cancel':
# assign newly providerd broker backend request id
entry.reqid = reqid
flow['reqid'] = reqid
# entry.reqid = reqid
entry = flow.maps[0]
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry)
@ -649,7 +669,11 @@ async def translate_and_relay_brokerd_events(
# our book -> registered as live flow
else:
# update the flow with the ack msg
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
# book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
flow.maps.insert(
0,
BrokerdOrderAck(**brokerd_msg).to_dict()
)
# no msg to client necessary
continue
@ -666,6 +690,7 @@ async def translate_and_relay_brokerd_events(
msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank?
book._msgflows[oid].maps.insert(0, msg.to_dict())
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
@ -686,6 +711,9 @@ async def translate_and_relay_brokerd_events(
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# ack = book._ems_entries[oid]
# ack = book._msgflows[oid].maps[0]
msg = BrokerdStatus(**brokerd_msg)
# TODO: should we flatten out these cases and/or should
@ -704,6 +732,9 @@ async def translate_and_relay_brokerd_events(
# since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!')
# remove from active flows
book._msgflows.pop(oid)
# just log it
else:
log.info(f'{broker} filled {msg}')
@ -712,7 +743,21 @@ async def translate_and_relay_brokerd_events(
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
# unknown valid BrokerdStatus
# book._ems_entries[oid] = msg
book._msgflows[oid].maps.insert(0, msg.to_dict())
# TODO: i wonder if we should just support receiving an
# actual ``BrokerdOrder`` msg here? Is it a bad idea to
# presume that inbound orders on the backend dialog can be
# used to drive order tracking/tracing in the EMS *over*
# a set of backends from some other non-ems owner?
# this will likely feel better once we get open_msg_scope()
# or wtv finished.
# BrokerdStatus containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the
# brokerd backend.
case {
'name': 'status',
'status': status,
@ -724,7 +769,18 @@ async def translate_and_relay_brokerd_events(
# to be able to more formally handle multi-player
# trading...
if status == 'submitted':
if status != 'submitted':
log.error(
f'Unknown status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
else:
# existing open backend order which we broadcast to
# all currently connected clients.
order_dict = brokerd_msg['broker_details'].pop('order')
order = BrokerdOrder(**order_dict)
msg = BrokerdStatus(**brokerd_msg)
log.info(
f'Relaying existing open order:\n {brokerd_msg}'
@ -734,22 +790,49 @@ async def translate_and_relay_brokerd_events(
# may end up with collisions?
broker = details['name']
oid = str(reqid)
book._ems_entries[oid] = msg
# attempt to avoid collisions
msg.reqid = oid
resp = 'broker_submitted'
# XXX: MEGA HACK ALERT FOR the dialog entries delivery
# on client connect...
# TODO: fix this garbage..
msg.broker_details['resp'] = resp = 'broker_submitted'
# register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid
# book._ems_entries[oid] = msg
else:
log.error(
f'Unknown status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
continue
# fill in approximate msg flow history
flow = book._msgflows[oid]
flow.maps.insert(0, order.to_dict())
flow.maps.insert(0, msg.to_dict())
flow.maps.insert(0, details)
flattened = dict(flow)
# await tractor.breakpoint()
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(flattened)
# Status(
# oid=oid,
# resp=resp,
# time_ns=time.time_ns(),
# broker_reqid=reqid,
# brokerd_msg=flattened,
# )
# )
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
# don't fall through
continue
# BrokerdFill
case {
@ -768,58 +851,31 @@ async def translate_and_relay_brokerd_events(
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow
entry = book._ems_entries[oid]
# entry = book._ems_entries[oid]
# assert entry.oid == oid # from when we only stored the first ack
# old_reqid = entry.reqid
# if old_reqid and old_reqid != reqid:
# log.warning(
# f'Brokerd order id change for {oid}:\n'
# f'{old_reqid} -> {reqid}'
# )
if getattr(entry, 'oid', None):
assert entry.oid == oid
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client oid: {oid}')
else:
# existing open order relay
assert oid == entry.reqid
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client oid: {oid}')
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
@ -854,7 +910,8 @@ async def process_client_order_cmds(
# others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
# live_entry = dark_book._ems_entries.get(oid)
live_entry = dark_book._msgflows.get(oid)
match cmd:
# existing live-broker order cancel
@ -862,12 +919,14 @@ async def process_client_order_cmds(
'action': 'cancel',
'oid': oid,
} if live_entry:
reqid = live_entry.reqid
# reqid = live_entry.reqid
reqid = live_entry['reqid']
msg = BrokerdCancel(
oid=oid,
reqid=reqid,
time_ns=time.time_ns(),
account=live_entry.account,
# account=live_entry.account,
account=live_entry['account'],
)
# NOTE: cancel response will be relayed back in messages
@ -885,6 +944,7 @@ async def process_client_order_cmds(
# the order ack does show up later such that the brokerd
# order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg
live_entry.maps.insert(0, msg.to_dict())
# dark trigger cancel
case {
@ -936,7 +996,8 @@ async def process_client_order_cmds(
# the only msg will be a ``BrokerdStatus``
# assert live_entry.oid == oid
reqid = live_entry.reqid
# reqid = live_entry.reqid
reqid = live_entry['reqid']
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}")
@ -971,7 +1032,8 @@ async def process_client_order_cmds(
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that live order asap.
dark_book._ems_entries[oid] = msg
# dark_book._ems_entries[oid] = msg
dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark-order / alert submission
case {
@ -1144,12 +1206,35 @@ async def _emsd_main(
brokerd_stream = relay.brokerd_dialogue # .clone()
# convert dialogs to status msgs for client delivery
statuses = {}
# for oid, msg in book._ems_entries.items():
for oid, msgflow in book._msgflows.items():
# we relay to the client side a msg that contains
# all data flattened from the message history.
# status = msgflow['status']
flattened = dict(msgflow)
# status = flattened['status']
flattened.pop('brokerd_msg', None)
statuses[oid] = flattened
# Status(
# oid=oid,
# time_ns=flattened['time_ns'],
# # time_ns=msg.time_ns,
# # resp=f'broker_{msg.status}',
# resp=f'broker_{status}',
# # trigger_price=msg.order.price,
# trigger_price=flattened['price'],
# brokerd_msg=flattened,
# )
# await tractor.breakpoint()
# signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``.
await ems_ctx.started((
relay.positions,
list(relay.accounts),
book._ems_entries,
statuses,
))
# establish 2-way stream with requesting order-client and

View File

@ -67,7 +67,7 @@ class Order(Struct):
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
exec_mode: str # {'dark', 'live'}
# --------------
@ -136,11 +136,14 @@ class BrokerdCancel(Struct):
class BrokerdOrder(Struct):
action: str # {buy, sell}
oid: str
account: str
time_ns: int
# TODO: if we instead rely on a +ve/-ve size to determine
# the action we more or less don't need this field right?
action: str = '' # {buy, sell}
# "broker request id": broker specific/internal order id if this is
# None, creates a new order otherwise if the id is valid the backend
# api must modify the existing matching order. If the broker allows
@ -149,7 +152,7 @@ class BrokerdOrder(Struct):
# field
reqid: Optional[Union[int, str]] = None
symbol: str # symbol.<providername> ?
symbol: str # fqsn
price: float
size: float
@ -183,25 +186,21 @@ class BrokerdStatus(Struct):
reqid: Union[int, str]
time_ns: int
# XXX: should be best effort set for every update
account: str = ''
# TODO: instead (ack, pending, open, fill, clos(ed), cancelled)
# {
# 'submitted',
# 'cancelled',
# 'filled',
# 'submitted', # open
# 'cancelled', # canceled
# 'filled', # closed
# }
status: str
# +ve is buy, -ve is sell
size: float = 0.0
price: float = 0.0
account: str
filled: float = 0.0
reason: str = ''
remaining: float = 0.0
external: bool = False
# order: Optional[BrokerdOrder] = None
# XXX: better design/name here?
# flag that can be set to indicate a message for an order
# event that wasn't originated by piker's emsd (eg. some external

View File

@ -49,9 +49,14 @@ from ._position import (
SettingsPane,
)
from ._forms import FieldsForm
# from ._label import FormatLabel
from ._window import MultiStatus
from ..clearing._messages import Order, BrokerdPosition
from ..clearing._messages import (
Order,
Status,
# BrokerdOrder,
# BrokerdStatus,
BrokerdPosition,
)
from ._forms import open_form_input_handling
@ -519,22 +524,36 @@ class OrderMode:
def load_unknown_dialog_from_msg(
self,
# status: Status,
msg: dict,
) -> OrderDialog:
oid = str(msg['oid'])
size = msg['brokerd_msg']['size']
# oid = str(status.oid)
# bstatus = BrokerdStatus(**msg.brokerd_msg)
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
# border = BrokerdOrder(**bstatus.broker_details['order'])
# msg = msg['brokerd_msg']
# size = border.size
size = msg['size']
if size >= 0:
action = 'buy'
else:
action = 'sell'
acct = msg['brokerd_msg']['account']
price = msg['brokerd_msg']['price']
deats = msg['brokerd_msg']['broker_details']
# acct = border.account
# price = border.price
# price = msg['brokerd_msg']['price']
symbol = msg['symbol']
deats = msg['broker_details']
brokername = deats['name']
fqsn = (
deats['fqsn'] + '.' + deats['name']
# deats['fqsn'] + '.' + deats['name']
symbol + '.' + brokername
)
symbol = Symbol.from_fqsn(
fqsn=fqsn,
@ -543,11 +562,11 @@ class OrderMode:
# map to order composite-type
order = Order(
action=action,
price=price,
account=acct,
price=msg['price'],
account=msg['account'],
size=size,
symbol=symbol,
brokers=symbol.brokers,
brokers=[brokername],
oid=oid,
exec_mode='live', # dark or live
)
@ -808,8 +827,8 @@ async def open_order_mode(
# HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side?
msg.setdefault('resp', msg['broker_details']['resp'])
msg.setdefault('oid', msg['broker_details']['oid'])
# msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid'])
msg['brokerd_msg'] = msg
await process_trade_msg(
@ -892,6 +911,7 @@ async def process_trade_msg(
log.warning(
f'received msg for untracked dialog:\n{fmsg}'
)
# dialog = mode.load_unknown_dialog_from_msg(Status(**msg))
dialog = mode.load_unknown_dialog_from_msg(msg)
# record message to dialog tracking