From 87ed9abefa8d516f102c6722315be3819a309c36 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 8 Aug 2022 13:35:01 -0400 Subject: [PATCH] WIP playing with a `ChainMap` of messages --- piker/brokers/ib/broker.py | 23 +++- piker/clearing/_ems.py | 263 ++++++++++++++++++++++++------------ piker/clearing/_messages.py | 27 ++-- piker/ui/order_mode.py | 44 ++++-- 4 files changed, 236 insertions(+), 121 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index b2d6bd0d..96f8572d 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -127,7 +127,8 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'No account found: `{account}` ?', - )) + ) + ) continue client = _accounts2clients.get(account) @@ -495,17 +496,16 @@ async def trades_dialogue( 'BUY': 1, }[order.action] * quant fqsn, _ = con2fqsn(trade.contract) + reqid = order.orderId # TODO: maybe embed a ``BrokerdOrder`` instead # since then we can directly load it on the client # side in the order mode loop? msg = BrokerdStatus( - reqid=order.orderId, - time_ns=time.time_ns(), - account=accounts_def.inverse[order.account], + reqid=reqid, + time_ns=(ts := time.time_ns()), status='submitted', - size=size, - price=order.lmtPrice, + account=accounts_def.inverse[order.account], filled=0, reason='Existing live order', @@ -516,6 +516,17 @@ async def trades_dialogue( broker_details={ 'name': 'ib', 'fqsn': fqsn, + # this is a embedded/boxed order + # msg that can be loaded by the ems + # and for relay to clients. + 'order': BrokerdOrder( + symbol=fqsn, + account=accounts_def.inverse[order.account], + oid=reqid, + time_ns=ts, + size=size, + price=order.lmtPrice, + ), }, ) order_msgs.append(msg) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 01cc25a2..857460d6 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -18,8 +18,8 @@ In da suit parlances: "Execution management systems" """ +from collections import defaultdict, ChainMap from contextlib import asynccontextmanager -from dataclasses import dataclass, field from math import isnan from pprint import pformat import time @@ -41,9 +41,15 @@ from ..data.types import Struct from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( - Status, Order, - BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, - BrokerdFill, BrokerdError, BrokerdPosition, + Order, + Status, + BrokerdCancel, + BrokerdOrder, + BrokerdOrderAck, + BrokerdStatus, + BrokerdFill, + BrokerdError, + BrokerdPosition, ) @@ -90,8 +96,7 @@ def mk_check( ) -@dataclass -class _DarkBook: +class _DarkBook(Struct): ''' EMS-trigger execution book. @@ -116,17 +121,23 @@ class _DarkBook: dict, # cmd / msg type ] ] - ] = field(default_factory=dict) + ] = {} # tracks most recent values per symbol each from data feed lasts: dict[ str, float, - ] = field(default_factory=dict) + ] = {} - # mapping of piker ems order ids to current brokerd order flow message - _ems_entries: dict[str, str] = field(default_factory=dict) - _ems2brokerd_ids: dict[str, str] = field(default_factory=bidict) + # _ems_entries: dict[str, str] = {} + + # mapping of ems dialog ids to msg flow history + _msgflows: defaultdict[ + int, + ChainMap[dict[str, dict]], + ] = defaultdict(ChainMap) + + _ems2brokerd_ids: dict[str, str] = bidict() # XXX: this is in place to prevent accidental positions that are too @@ -240,7 +251,8 @@ async def clear_dark_triggers( # a ``BrokerdOrderAck`` msg including the # allocated unique ``BrokerdOrderAck.reqid`` key # generated by the broker's own systems. - book._ems_entries[oid] = live_req + # book._ems_entries[oid] = live_req + book._msgflows[oid].append(live_req) case _: raise ValueError(f'Invalid dark book entry: {cmd}') @@ -281,8 +293,7 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') -@dataclass -class TradesRelay: +class TradesRelay(Struct): # for now we keep only a single connection open with # each ``brokerd`` for simplicity. @@ -318,7 +329,10 @@ class Router(Struct): # order id to client stream map clients: set[tractor.MsgStream] = set() - dialogues: dict[str, list[tractor.MsgStream]] = {} + dialogues: dict[ + str, + list[tractor.MsgStream] + ] = {} # brokername to trades-dialogues streams with ``brokerd`` actors relays: dict[str, TradesRelay] = {} @@ -341,8 +355,9 @@ class Router(Struct): loglevel: str, ) -> tuple[dict, tractor.MsgStream]: - '''Open and yield ``brokerd`` trades dialogue context-stream if none - already exists. + ''' + Open and yield ``brokerd`` trades dialogue context-stream if + none already exists. ''' relay: TradesRelay = self.relays.get(feed.mod.name) @@ -614,7 +629,8 @@ async def translate_and_relay_brokerd_events( 'reqid': reqid, # brokerd generated order-request id 'oid': oid, # ems order-dialog id } if ( - entry := book._ems_entries.get(oid) + # entry := book._ems_entries.get(oid) + flow := book._msgflows.get(oid) ): # initial response to brokerd order request # if name == 'ack': @@ -637,10 +653,14 @@ async def translate_and_relay_brokerd_events( # cancelled by the ems controlling client before we # received this ack, in which case we relay that cancel # signal **asap** to the backend broker - action = getattr(entry, 'action', None) + action = flow.get('action') + # action = getattr(entry, 'action', None) if action and action == 'cancel': # assign newly providerd broker backend request id - entry.reqid = reqid + flow['reqid'] = reqid + # entry.reqid = reqid + + entry = flow.maps[0] # tell broker to cancel immediately await brokerd_trades_stream.send(entry) @@ -649,7 +669,11 @@ async def translate_and_relay_brokerd_events( # our book -> registered as live flow else: # update the flow with the ack msg - book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) + # book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) + flow.maps.insert( + 0, + BrokerdOrderAck(**brokerd_msg).to_dict() + ) # no msg to client necessary continue @@ -666,6 +690,7 @@ async def translate_and_relay_brokerd_events( msg = BrokerdError(**brokerd_msg) resp = 'broker_errored' log.error(pformat(msg)) # XXX make one when it's blank? + book._msgflows[oid].maps.insert(0, msg.to_dict()) # TODO: figure out how this will interact with EMS clients # for ex. on an error do we react with a dark orders @@ -686,6 +711,9 @@ async def translate_and_relay_brokerd_events( } if ( oid := book._ems2brokerd_ids.inverse.get(reqid) ): + + # ack = book._ems_entries[oid] + # ack = book._msgflows[oid].maps[0] msg = BrokerdStatus(**brokerd_msg) # TODO: should we flatten out these cases and/or should @@ -704,6 +732,9 @@ async def translate_and_relay_brokerd_events( # since the order dialogue should be done. log.info(f'Execution for {oid} is complete!') + # remove from active flows + book._msgflows.pop(oid) + # just log it else: log.info(f'{broker} filled {msg}') @@ -712,7 +743,21 @@ async def translate_and_relay_brokerd_events( # one of {submitted, cancelled} resp = 'broker_' + msg.status - # unknown valid BrokerdStatus + + # book._ems_entries[oid] = msg + book._msgflows[oid].maps.insert(0, msg.to_dict()) + + # TODO: i wonder if we should just support receiving an + # actual ``BrokerdOrder`` msg here? Is it a bad idea to + # presume that inbound orders on the backend dialog can be + # used to drive order tracking/tracing in the EMS *over* + # a set of backends from some other non-ems owner? + # this will likely feel better once we get open_msg_scope() + # or wtv finished. + + # BrokerdStatus containing an embedded order msg which + # should be loaded as a "pre-existing open order" from the + # brokerd backend. case { 'name': 'status', 'status': status, @@ -724,7 +769,18 @@ async def translate_and_relay_brokerd_events( # to be able to more formally handle multi-player # trading... - if status == 'submitted': + if status != 'submitted': + log.error( + f'Unknown status msg:\n' + f'{pformat(brokerd_msg)}\n' + 'Unable to relay message to client side!?' + ) + + else: + # existing open backend order which we broadcast to + # all currently connected clients. + order_dict = brokerd_msg['broker_details'].pop('order') + order = BrokerdOrder(**order_dict) msg = BrokerdStatus(**brokerd_msg) log.info( f'Relaying existing open order:\n {brokerd_msg}' @@ -734,22 +790,49 @@ async def translate_and_relay_brokerd_events( # may end up with collisions? broker = details['name'] oid = str(reqid) - book._ems_entries[oid] = msg - # attempt to avoid collisions msg.reqid = oid - resp = 'broker_submitted' + + # XXX: MEGA HACK ALERT FOR the dialog entries delivery + # on client connect... + # TODO: fix this garbage.. + msg.broker_details['resp'] = resp = 'broker_submitted' # register this existing broker-side dialog book._ems2brokerd_ids[oid] = reqid + # book._ems_entries[oid] = msg - else: - log.error( - f'Unknown status msg:\n' - f'{pformat(brokerd_msg)}\n' - 'Unable to relay message to client side!?' - ) - continue + # fill in approximate msg flow history + flow = book._msgflows[oid] + flow.maps.insert(0, order.to_dict()) + flow.maps.insert(0, msg.to_dict()) + flow.maps.insert(0, details) + flattened = dict(flow) + # await tractor.breakpoint() + + # fan-out-relay position msgs immediately by + # broadcasting updates on all client streams + for client_stream in router.clients.copy(): + try: + await client_stream.send(flattened) + # Status( + # oid=oid, + # resp=resp, + # time_ns=time.time_ns(), + # broker_reqid=reqid, + # brokerd_msg=flattened, + # ) + # ) + except( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + router.clients.remove(client_stream) + log.warning( + f'client for {client_stream} was already closed?') + + # don't fall through + continue # BrokerdFill case { @@ -768,58 +851,31 @@ async def translate_and_relay_brokerd_events( raise ValueError(f'Brokerd message {brokerd_msg} is invalid') # retrieve existing live flow - entry = book._ems_entries[oid] + # entry = book._ems_entries[oid] + # assert entry.oid == oid # from when we only stored the first ack + # old_reqid = entry.reqid + # if old_reqid and old_reqid != reqid: + # log.warning( + # f'Brokerd order id change for {oid}:\n' + # f'{old_reqid} -> {reqid}' + # ) - if getattr(entry, 'oid', None): - assert entry.oid == oid - old_reqid = entry.reqid - if old_reqid and old_reqid != reqid: - log.warning( - f'Brokerd order id change for {oid}:\n' - f'{old_reqid} -> {reqid}' + # Create and relay response status message + # to requesting EMS client + try: + ems_client_order_stream = router.dialogues[oid] + await ems_client_order_stream.send( + Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=msg, ) - - # Create and relay response status message - # to requesting EMS client - try: - ems_client_order_stream = router.dialogues[oid] - await ems_client_order_stream.send( - Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=msg, - ) - ) - except KeyError: - log.error( - f'Received `brokerd` msg for unknown client oid: {oid}') - - else: - # existing open order relay - assert oid == entry.reqid - - # fan-out-relay position msgs immediately by - # broadcasting updates on all client streams - for client_stream in router.clients.copy(): - try: - await client_stream.send( - Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=msg, - ) - ) - except( - trio.ClosedResourceError, - trio.BrokenResourceError, - ): - router.clients.remove(client_stream) - log.warning( - f'client for {client_stream} was already closed?') + ) + except KeyError: + log.error( + f'Received `brokerd` msg for unknown client oid: {oid}') # TODO: do we want this to keep things cleaned up? # it might require a special status from brokerd to affirm the @@ -854,7 +910,8 @@ async def process_client_order_cmds( # others who are registered for such order affiliated msgs). client_dialogues[oid] = client_order_stream reqid = dark_book._ems2brokerd_ids.inverse.get(oid) - live_entry = dark_book._ems_entries.get(oid) + # live_entry = dark_book._ems_entries.get(oid) + live_entry = dark_book._msgflows.get(oid) match cmd: # existing live-broker order cancel @@ -862,12 +919,14 @@ async def process_client_order_cmds( 'action': 'cancel', 'oid': oid, } if live_entry: - reqid = live_entry.reqid + # reqid = live_entry.reqid + reqid = live_entry['reqid'] msg = BrokerdCancel( oid=oid, reqid=reqid, time_ns=time.time_ns(), - account=live_entry.account, + # account=live_entry.account, + account=live_entry['account'], ) # NOTE: cancel response will be relayed back in messages @@ -885,6 +944,7 @@ async def process_client_order_cmds( # the order ack does show up later such that the brokerd # order request can be cancelled at that time. dark_book._ems_entries[oid] = msg + live_entry.maps.insert(0, msg.to_dict()) # dark trigger cancel case { @@ -936,7 +996,8 @@ async def process_client_order_cmds( # the only msg will be a ``BrokerdStatus`` # assert live_entry.oid == oid - reqid = live_entry.reqid + # reqid = live_entry.reqid + reqid = live_entry['reqid'] # if we already had a broker order id then # this is likely an order update commmand. log.info(f"Modifying live {broker} order: {reqid}") @@ -971,7 +1032,8 @@ async def process_client_order_cmds( # client, before that ack, when the ack does arrive we # immediately take the reqid from the broker and cancel # that live order asap. - dark_book._ems_entries[oid] = msg + # dark_book._ems_entries[oid] = msg + dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) # dark-order / alert submission case { @@ -1144,12 +1206,35 @@ async def _emsd_main( brokerd_stream = relay.brokerd_dialogue # .clone() + # convert dialogs to status msgs for client delivery + statuses = {} + # for oid, msg in book._ems_entries.items(): + for oid, msgflow in book._msgflows.items(): + # we relay to the client side a msg that contains + # all data flattened from the message history. + # status = msgflow['status'] + flattened = dict(msgflow) + # status = flattened['status'] + flattened.pop('brokerd_msg', None) + statuses[oid] = flattened + # Status( + # oid=oid, + # time_ns=flattened['time_ns'], + # # time_ns=msg.time_ns, + # # resp=f'broker_{msg.status}', + # resp=f'broker_{status}', + # # trigger_price=msg.order.price, + # trigger_price=flattened['price'], + # brokerd_msg=flattened, + # ) + # await tractor.breakpoint() + # signal to client that we're started and deliver # all known pps and accounts for this ``brokerd``. await ems_ctx.started(( relay.positions, list(relay.accounts), - book._ems_entries, + statuses, )) # establish 2-way stream with requesting order-client and diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index eb94b147..ffd46ff2 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -67,7 +67,7 @@ class Order(Struct): # determines whether the create execution # will be submitted to the ems or directly to # the backend broker - exec_mode: str # {'dark', 'live', 'paper'} + exec_mode: str # {'dark', 'live'} # -------------- @@ -136,11 +136,14 @@ class BrokerdCancel(Struct): class BrokerdOrder(Struct): - action: str # {buy, sell} oid: str account: str time_ns: int + # TODO: if we instead rely on a +ve/-ve size to determine + # the action we more or less don't need this field right? + action: str = '' # {buy, sell} + # "broker request id": broker specific/internal order id if this is # None, creates a new order otherwise if the id is valid the backend # api must modify the existing matching order. If the broker allows @@ -149,7 +152,7 @@ class BrokerdOrder(Struct): # field reqid: Optional[Union[int, str]] = None - symbol: str # symbol. ? + symbol: str # fqsn price: float size: float @@ -183,25 +186,21 @@ class BrokerdStatus(Struct): reqid: Union[int, str] time_ns: int - # XXX: should be best effort set for every update - account: str = '' - # TODO: instead (ack, pending, open, fill, clos(ed), cancelled) # { - # 'submitted', - # 'cancelled', - # 'filled', + # 'submitted', # open + # 'cancelled', # canceled + # 'filled', # closed # } status: str - - # +ve is buy, -ve is sell - size: float = 0.0 - price: float = 0.0 - + account: str filled: float = 0.0 reason: str = '' remaining: float = 0.0 + external: bool = False + # order: Optional[BrokerdOrder] = None + # XXX: better design/name here? # flag that can be set to indicate a message for an order # event that wasn't originated by piker's emsd (eg. some external diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index b8dd37f9..b9d23ab3 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -49,9 +49,14 @@ from ._position import ( SettingsPane, ) from ._forms import FieldsForm -# from ._label import FormatLabel from ._window import MultiStatus -from ..clearing._messages import Order, BrokerdPosition +from ..clearing._messages import ( + Order, + Status, + # BrokerdOrder, + # BrokerdStatus, + BrokerdPosition, +) from ._forms import open_form_input_handling @@ -519,22 +524,36 @@ class OrderMode: def load_unknown_dialog_from_msg( self, + # status: Status, msg: dict, ) -> OrderDialog: oid = str(msg['oid']) - size = msg['brokerd_msg']['size'] + # oid = str(status.oid) + + # bstatus = BrokerdStatus(**msg.brokerd_msg) + # NOTE: the `.order` attr **must** be set with the + # equivalent order msg in order to be loaded. + # border = BrokerdOrder(**bstatus.broker_details['order']) + # msg = msg['brokerd_msg'] + + # size = border.size + size = msg['size'] if size >= 0: action = 'buy' else: action = 'sell' - acct = msg['brokerd_msg']['account'] - price = msg['brokerd_msg']['price'] - deats = msg['brokerd_msg']['broker_details'] + # acct = border.account + # price = border.price + # price = msg['brokerd_msg']['price'] + symbol = msg['symbol'] + deats = msg['broker_details'] + brokername = deats['name'] fqsn = ( - deats['fqsn'] + '.' + deats['name'] + # deats['fqsn'] + '.' + deats['name'] + symbol + '.' + brokername ) symbol = Symbol.from_fqsn( fqsn=fqsn, @@ -543,11 +562,11 @@ class OrderMode: # map to order composite-type order = Order( action=action, - price=price, - account=acct, + price=msg['price'], + account=msg['account'], size=size, symbol=symbol, - brokers=symbol.brokers, + brokers=[brokername], oid=oid, exec_mode='live', # dark or live ) @@ -808,8 +827,8 @@ async def open_order_mode( # HACK ALERT: ensure a resp field is filled out since # techincally the call below expects a ``Status``. TODO: # parse into proper ``Status`` equivalents ems-side? - msg.setdefault('resp', msg['broker_details']['resp']) - msg.setdefault('oid', msg['broker_details']['oid']) + # msg.setdefault('resp', msg['broker_details']['resp']) + # msg.setdefault('oid', msg['broker_details']['oid']) msg['brokerd_msg'] = msg await process_trade_msg( @@ -892,6 +911,7 @@ async def process_trade_msg( log.warning( f'received msg for untracked dialog:\n{fmsg}' ) + # dialog = mode.load_unknown_dialog_from_msg(Status(**msg)) dialog = mode.load_unknown_dialog_from_msg(msg) # record message to dialog tracking