diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index c2f03a4f..f9dd91ea 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -60,6 +60,8 @@ from piker.pp import ( ) from piker.log import get_console_log from piker.clearing._messages import ( + Order, + Status, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, @@ -122,11 +124,13 @@ async def handle_order_requests( f'An IB account number for name {account} is not found?\n' 'Make sure you have all TWS and GW instances running.' ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=f'No account found: `{account}` ?', - )) + await ems_order_stream.send( + BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No account found: `{account}` ?', + ) + ) continue client = _accounts2clients.get(account) @@ -146,6 +150,14 @@ async def handle_order_requests( # validate order = BrokerdOrder(**request_msg) + # XXX: by default 0 tells ``ib_insync`` methods that + # there is no existing order so ask the client to create + # a new one (which it seems to do by allocating an int + # counter - collision prone..) + reqid = order.reqid + if reqid is not None: + reqid = int(reqid) + # call our client api to submit the order reqid = client.submit_limit( oid=order.oid, @@ -154,12 +166,7 @@ async def handle_order_requests( action=order.action, size=order.size, account=acct_number, - - # XXX: by default 0 tells ``ib_insync`` methods that - # there is no existing order so ask the client to create - # a new one (which it seems to do by allocating an int - # counter - collision prone..) - reqid=order.reqid, + reqid=reqid, ) if reqid is None: await ems_order_stream.send(BrokerdError( @@ -181,7 +188,7 @@ async def handle_order_requests( elif action == 'cancel': msg = BrokerdCancel(**request_msg) - client.submit_cancel(reqid=msg.reqid) + client.submit_cancel(reqid=int(msg.reqid)) else: log.error(f'Unknown order command: {request_msg}') @@ -451,7 +458,6 @@ async def trades_dialogue( # we might also want to delegate a specific actor for # ledger writing / reading for speed? async with ( - # trio.open_nursery() as nurse, open_client_proxies() as (proxies, aioclients), ): # Open a trade ledgers stack for appending trade records over @@ -459,6 +465,7 @@ async def trades_dialogue( # TODO: we probably want to generalize this into a "ledgers" api.. ledgers: dict[str, dict] = {} tables: dict[str, PpTable] = {} + order_msgs: list[Status] = [] with ( ExitStack() as lstack, ): @@ -480,6 +487,49 @@ async def trades_dialogue( for account, proxy in proxies.items(): client = aioclients[account] + trades: list[Trade] = client.ib.openTrades() + for trade in trades: + order = trade.order + quant = trade.order.totalQuantity + action = order.action.lower() + size = { + 'sell': -1, + 'buy': 1, + }[action] * quant + con = trade.contract + + # TODO: in the case of the SMART venue (aka ib's + # router-clearing sys) we probably should handle + # showing such orders overtop of the fqsn for the + # primary exchange, how to map this easily is going + # to be a bit tricky though? + deats = await proxy.con_deats(contracts=[con]) + fqsn = list(deats)[0] + + reqid = order.orderId + + # TODO: maybe embed a ``BrokerdOrder`` instead + # since then we can directly load it on the client + # side in the order mode loop? + msg = Status( + time_ns=time.time_ns(), + resp='open', + oid=str(reqid), + reqid=reqid, + + # embedded order info + req=Order( + action=action, + exec_mode='live', + oid=str(reqid), + symbol=fqsn, + account=accounts_def.inverse[order.account], + price=order.lmtPrice, + size=size, + ), + src='ib', + ) + order_msgs.append(msg) # process pp value reported from ib's system. we only use these # to cross-check sizing since average pricing on their end uses @@ -615,6 +665,9 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): + # relay existing open orders to ems + for msg in order_msgs: + await ems_stream.send(msg) for client in set(aioclients.values()): trade_event_stream = await n.start( @@ -633,6 +686,7 @@ async def trades_dialogue( # allocate event relay tasks for each client connection n.start_soon( deliver_trade_events, + n, trade_event_stream, ems_stream, accounts_def, @@ -726,6 +780,7 @@ _statuses: dict[str, str] = { async def deliver_trade_events( + nurse: trio.Nursery, trade_event_stream: trio.MemoryReceiveChannel, ems_stream: tractor.MsgStream, accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'` @@ -750,8 +805,9 @@ async def deliver_trade_events( log.info(f'ib sending {event_name}:\n{pformat(item)}') match event_name: - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: + # NOTE: we remap statuses to the ems set via the + # ``_statuses: dict`` above. + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 # short list: # - PendingSubmit @@ -781,28 +837,90 @@ async def deliver_trade_events( # unwrap needed data from ib_insync internal types trade: Trade = item status: OrderStatus = trade.orderStatus + ib_status_key = status.status.lower() + + # TODO: try out cancelling inactive orders after delay: + # https://github.com/erdewit/ib_insync/issues/363 + # acctid = accounts_def.inverse[trade.order.account] + + # # double check there is no error when + # # cancelling.. gawwwd + # if ib_status_key == 'cancelled': + # last_log = trade.log[-1] + # if ( + # last_log.message + # and 'Error' not in last_log.message + # ): + # ib_status_key = trade.log[-2].status + + # elif ib_status_key == 'inactive': + + # async def sched_cancel(): + # log.warning( + # 'OH GAWD an inactive order.scheduling a cancel\n' + # f'{pformat(item)}' + # ) + # proxy = proxies[acctid] + # await proxy.submit_cancel(reqid=trade.order.orderId) + # await trio.sleep(1) + # nurse.start_soon(sched_cancel) + + # nurse.start_soon(sched_cancel) + + status_key = ( + _statuses.get(ib_status_key.lower()) + or ib_status_key.lower() + ) + + remaining = status.remaining + if ( + status_key == 'filled' + ): + fill: Fill = trade.fills[-1] + execu: Execution = fill.execution + # execdict = asdict(execu) + # execdict.pop('acctNumber') + + fill_msg = BrokerdFill( + # should match the value returned from + # `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not + action=action_map[execu.side], + size=execu.shares, + price=execu.price, + # broker_details=execdict, + # XXX: required by order mode currently + broker_time=execu.time, + ) + await ems_stream.send(fill_msg) + + if remaining == 0: + # emit a closed status on filled statuses where + # all units were cleared. + status_key = 'closed' # skip duplicate filled updates - we get the deats # from the execution details event msg = BrokerdStatus( - reqid=trade.order.orderId, time_ns=time.time_ns(), # cuz why not account=accounts_def.inverse[trade.order.account], # everyone doin camel case.. - status=status.status.lower(), # force lower case + status=status_key, # force lower case filled=status.filled, reason=status.whyHeld, # this seems to not be necessarily up to date in the # execDetails event.. so we have to send it here I guess? - remaining=status.remaining, + remaining=remaining, broker_details={'name': 'ib'}, ) await ems_stream.send(msg) + continue case 'fill': @@ -818,8 +936,6 @@ async def deliver_trade_events( # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations trade: Trade fill: Fill - - # TODO: maybe we can use matching to better handle these cases. trade, fill = item execu: Execution = fill.execution execid = execu.execId @@ -848,22 +964,6 @@ async def deliver_trade_events( } ) - msg = BrokerdFill( - # should match the value returned from `.submit_limit()` - reqid=execu.orderId, - time_ns=time.time_ns(), # cuz why not - - action=action_map[execu.side], - size=execu.shares, - price=execu.price, - - broker_details=trade_entry, - # XXX: required by order mode currently - broker_time=trade_entry['broker_time'], - - ) - await ems_stream.send(msg) - # 2 cases: # - fill comes first or # - comms report comes first @@ -933,17 +1033,25 @@ async def deliver_trade_events( if err['reqid'] == -1: log.error(f'TWS external order error:\n{pformat(err)}') - # TODO: what schema for this msg if we're going to make it - # portable across all backends? - # msg = BrokerdError(**err) + # TODO: we don't want to relay data feed / lookup errors + # so we need some further filtering logic here.. + # for most cases the 'status' block above should take + # care of this. + # await ems_stream.send(BrokerdStatus( + # status='error', + # reqid=err['reqid'], + # reason=err['reason'], + # time_ns=time.time_ns(), + # account=accounts_def.inverse[trade.order.account], + # broker_details={'name': 'ib'}, + # )) case 'position': cid, msg = pack_position(item) log.info(f'New IB position msg: {msg}') - # acctid = msg.account = accounts_def.inverse[msg.account] # cuck ib and it's shitty fifo sys for pps! - # await ems_stream.send(msg) + continue case 'event': diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 3641934a..8503e049 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -31,6 +31,7 @@ import time from typing import ( Any, AsyncIterator, + Iterable, Union, ) @@ -39,7 +40,6 @@ from bidict import bidict import pendulum import trio import tractor -import wsproto from piker.pp import ( Position, @@ -49,6 +49,8 @@ from piker.pp import ( open_pps, ) from piker.clearing._messages import ( + Order, + Status, BrokerdCancel, BrokerdError, BrokerdFill, @@ -85,6 +87,33 @@ class TooFastEdit(Exception): 'Edit requests faster then api submissions' +# TODO: make this wrap the `Client` and `ws` instances +# and give it methods to submit cancel vs. add vs. edit +# requests? +class BrokerClient: + ''' + Actor global, client-unique order manager API. + + For now provides unique ``brokerd`` defined "request ids" + and "user reference" values to track ``kraken`` ws api order + dialogs. + + ''' + counter: Iterable = count(1) + _table: set[int] = set() + + @classmethod + def new_reqid(cls) -> int: + for reqid in cls.counter: + if reqid not in cls._table: + cls._table.add(reqid) + return reqid + + @classmethod + def add_reqid(cls, reqid: int) -> None: + cls._table.add(reqid) + + async def handle_order_requests( ws: NoBsWs, @@ -104,7 +133,6 @@ async def handle_order_requests( # XXX: UGH, let's unify this.. with ``msgspec``. msg: dict[str, Any] order: BrokerdOrder - counter = count(1) async for msg in ems_order_stream: log.info(f'Rx order msg:\n{pformat(msg)}') @@ -126,7 +154,7 @@ async def handle_order_requests( oid=msg['oid'], symbol=msg['symbol'], reason=( - f'TooFastEdit reqid:{reqid}, could not cancelling..' + f'Edit too fast:{reqid}, cancelling..' ), ) @@ -177,7 +205,8 @@ async def handle_order_requests( else: ep = 'addOrder' - reqid = next(counter) + + reqid = BrokerClient.new_reqid() ids[order.oid] = reqid log.debug( f"Adding order {reqid}\n" @@ -249,7 +278,7 @@ async def handle_order_requests( @acm async def subscribe( - ws: wsproto.WSConnection, + ws: NoBsWs, token: str, subs: list[tuple[str, dict]] = [ ('ownTrades', { @@ -632,8 +661,6 @@ async def handle_order_updates( # to do all fill/status/pp updates in that sub and just use # this one for ledger syncs? - # XXX: ASK SUPPORT ABOUT THIS! - # For eg. we could take the "last 50 trades" and do a diff # with the ledger and then only do a re-sync if something # seems amiss? @@ -696,7 +723,6 @@ async def handle_order_updates( status_msg = BrokerdStatus( reqid=reqid, time_ns=time.time_ns(), - account=acc_name, status='filled', filled=size, @@ -741,33 +767,92 @@ async def handle_order_updates( f'{pformat(order_msg)}' ) txid, update_msg = list(order_msg.items())[0] + + # XXX: eg. of full msg schema: + # {'avg_price': _, + # 'cost': _, + # 'descr': { + # 'close': None, + # 'leverage': None, + # 'order': descr, + # 'ordertype': 'limit', + # 'pair': 'XMR/EUR', + # 'price': '74.94000000', + # 'price2': '0.00000000', + # 'type': 'buy' + # }, + # 'expiretm': None, + # 'fee': '0.00000000', + # 'limitprice': '0.00000000', + # 'misc': '', + # 'oflags': 'fciq', + # 'opentm': '1656966131.337344', + # 'refid': None, + # 'starttm': None, + # 'stopprice': '0.00000000', + # 'timeinforce': 'GTC', + # 'vol': submit_vlm, # '13.34400854', + # 'vol_exec': exec_vlm} # 0.0000 match update_msg: - # XXX: eg. of full msg schema: - # {'avg_price': _, - # 'cost': _, - # 'descr': { - # 'close': None, - # 'leverage': None, - # 'order': descr, - # 'ordertype': 'limit', - # 'pair': 'XMR/EUR', - # 'price': '74.94000000', - # 'price2': '0.00000000', - # 'type': 'buy' - # }, - # 'expiretm': None, - # 'fee': '0.00000000', - # 'limitprice': '0.00000000', - # 'misc': '', - # 'oflags': 'fciq', - # 'opentm': '1656966131.337344', - # 'refid': None, - # 'starttm': None, - # 'stopprice': '0.00000000', - # 'timeinforce': 'GTC', - # 'vol': submit_vlm, # '13.34400854', - # 'vol_exec': exec_vlm} # 0.0000 + # EMS-unknown live order that needs to be + # delivered and loaded on the client-side. + case { + 'userref': reqid, + 'descr': { + 'pair': pair, + 'price': price, + 'type': action, + }, + 'vol': vol, + + # during a fill this field is **not** + # provided! but, it is always avail on + # actual status updates.. see case above. + 'status': status, + **rest, + } if ( + ids.inverse.get(reqid) is None + ): + # parse out existing live order + fqsn = pair.replace('/', '').lower() + price = float(price) + size = float(vol) + + # register the userref value from + # kraken (usually an `int` staring + # at 1?) as our reqid. + reqids2txids[reqid] = txid + oid = str(reqid) + ids[oid] = reqid # NOTE!: str -> int + + # ensure wtv reqid they give us we don't re-use on + # new order submissions to this actor's client. + BrokerClient.add_reqid(reqid) + + # fill out ``Status`` + boxed ``Order`` + status_msg = Status( + time_ns=time.time_ns(), + resp='open', + oid=oid, + reqid=reqid, + + # embedded order info + req=Order( + action=action, + exec_mode='live', + oid=oid, + symbol=fqsn, + account=acc_name, + price=price, + size=size, + ), + src='kraken', + ) + apiflows[reqid].maps.append(status_msg) + await ems_stream.send(status_msg) + continue + case { 'userref': reqid, @@ -821,66 +906,47 @@ async def handle_order_updates( ) oid = ids.inverse.get(reqid) + # XXX: too fast edit handled by the + # request handler task: this + # scenario occurs when ems side + # requests are coming in too quickly + # such that there is no known txid + # yet established for the ems + # dialog's last reqid when the + # request handler task is already + # receceiving a new update for that + # reqid. In this case we simply mark + # the reqid as being "too fast" and + # then when we get the next txid + # update from kraken's backend, and + # thus the new txid, we simply + # cancel the order for now. + # TODO: Ideally we eventually + # instead make the client side of + # the ems block until a submission + # is confirmed by the backend + # instead of this hacky throttle + # style approach and avoid requests + # coming in too quickly on the other + # side of the ems, aka the client + # <-> ems dialog. if ( status == 'open' - and ( - # XXX: too fast edit handled by the - # request handler task: this - # scenario occurs when ems side - # requests are coming in too quickly - # such that there is no known txid - # yet established for the ems - # dialog's last reqid when the - # request handler task is already - # receceiving a new update for that - # reqid. In this case we simply mark - # the reqid as being "too fast" and - # then when we get the next txid - # update from kraken's backend, and - # thus the new txid, we simply - # cancel the order for now. - - # TODO: Ideally we eventually - # instead make the client side of - # the ems block until a submission - # is confirmed by the backend - # instead of this hacky throttle - # style approach and avoid requests - # coming in too quickly on the other - # side of the ems, aka the client - # <-> ems dialog. - (toofast := isinstance( - reqids2txids.get(reqid), - TooFastEdit - )) - - # pre-existing open order NOT from - # this EMS session. - or (noid := oid is None) + and isinstance( + reqids2txids.get(reqid), + TooFastEdit ) ): - if toofast: - # TODO: don't even allow this case - # by not moving the client side line - # until an edit confirmation - # arrives... - log.cancel( - f'Received too fast edit {txid}:\n' - f'{update_msg}\n' - 'Cancelling order for now!..' - ) - - elif noid: # a non-ems-active order - # TODO: handle these and relay them - # through the EMS to the client / UI - # side! - log.cancel( - f'Rx unknown active order {txid}:\n' - f'{update_msg}\n' - 'Cancelling order for now!..' - ) - + # TODO: don't even allow this case + # by not moving the client side line + # until an edit confirmation + # arrives... + log.cancel( + f'Received too fast edit {txid}:\n' + f'{update_msg}\n' + 'Cancelling order for now!..' + ) # call ws api to cancel: # https://docs.kraken.com/websockets/#message-cancelOrder await ws.send_msg({ @@ -891,18 +957,6 @@ async def handle_order_updates( }) continue - # remap statuses to ems set. - ems_status = { - 'open': 'submitted', - 'closed': 'filled', - 'canceled': 'cancelled', - # do we even need to forward - # this state to the ems? - 'pending': 'pending', - }[status] - # TODO: i like the open / closed semantics - # more we should consider them for internals - # send BrokerdStatus messages for all # order state updates resp = BrokerdStatus( @@ -912,7 +966,7 @@ async def handle_order_updates( account=f'kraken.{acctid}', # everyone doin camel case.. - status=ems_status, # force lower case + status=status, # force lower case filled=vlm, reason='', # why held? diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 7c589d85..e67d204c 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -34,7 +34,6 @@ import pendulum from trio_typing import TaskStatus import tractor import trio -import wsproto from piker._cacheables import open_cached_client from piker.brokers._util import ( @@ -243,22 +242,6 @@ def normalize( return topic, quote -def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: - ''' - Create a request subscription packet dict. - - https://docs.kraken.com/websockets/#message-subscribe - - ''' - # eg. specific logic for this in kraken's sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - return { - 'pair': pairs, - 'event': 'subscribe', - 'subscription': data, - } - - @acm async def open_history_client( symbol: str, @@ -381,15 +364,20 @@ async def stream_quotes( } @acm - async def subscribe(ws: wsproto.WSConnection): + async def subscribe(ws: NoBsWs): + # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: + # specific logic for this in kraken's sync client: # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - ohlc_sub = make_sub( - list(ws_pairs.values()), - {'name': 'ohlc', 'interval': 1} - ) + ohlc_sub = { + 'event': 'subscribe', + 'pair': list(ws_pairs.values()), + 'subscription': { + 'name': 'ohlc', + 'interval': 1, + }, + } # TODO: we want to eventually allow unsubs which should # be completely fine to request from a separate task @@ -398,10 +386,14 @@ async def stream_quotes( await ws.send_msg(ohlc_sub) # trade data (aka L1) - l1_sub = make_sub( - list(ws_pairs.values()), - {'name': 'spread'} # 'depth': 10} - ) + l1_sub = { + 'event': 'subscribe', + 'pair': list(ws_pairs.values()), + 'subscription': { + 'name': 'spread', + # 'depth': 10} + }, + } # pull a first quote and deliver await ws.send_msg(l1_sub) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 95d80986..11eb9b69 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -83,7 +83,13 @@ class OrderBook: """Cancel an order (or alert) in the EMS. """ - cmd = self._sent_orders[uuid] + cmd = self._sent_orders.get(uuid) + if not cmd: + log.error( + f'Unknown order {uuid}!?\n' + f'Maybe there is a stale entry or line?\n' + f'You should report this as a bug!' + ) msg = Cancel( oid=uuid, symbol=cmd.symbol, @@ -149,10 +155,17 @@ async def relay_order_cmds_from_sync_code( book = get_orders() async with book._from_order_book.subscribe() as orders_stream: async for cmd in orders_stream: - if cmd.symbol == symbol_key: - log.info(f'Send order cmd:\n{pformat(cmd)}') + sym = cmd.symbol + msg = pformat(cmd) + if sym == symbol_key: + log.info(f'Send order cmd:\n{msg}') # send msg over IPC / wire await to_ems_stream.send(cmd) + else: + log.warning( + f'Ignoring unmatched order cmd for {sym} != {symbol_key}:' + f'\n{msg}' + ) @acm @@ -220,11 +233,19 @@ async def open_ems( fqsn=fqsn, exec_mode=mode, - ) as (ctx, (positions, accounts)), + ) as ( + ctx, + ( + positions, + accounts, + dialogs, + ) + ), # open 2-way trade command stream ctx.open_stream() as trades_stream, ): + # start sync code order msg delivery task async with trio.open_nursery() as n: n.start_soon( relay_order_cmds_from_sync_code, @@ -232,4 +253,10 @@ async def open_ems( trades_stream ) - yield book, trades_stream, positions, accounts + yield ( + book, + trades_stream, + positions, + accounts, + dialogs, + ) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 81288899..ae54615b 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -18,8 +18,8 @@ In da suit parlances: "Execution management systems" """ +from collections import defaultdict, ChainMap from contextlib import asynccontextmanager -from dataclasses import dataclass, field from math import isnan from pprint import pformat import time @@ -27,6 +27,7 @@ from typing import ( AsyncIterator, Any, Callable, + Optional, ) from bidict import bidict @@ -41,9 +42,16 @@ from ..data.types import Struct from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( - Status, Order, - BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, - BrokerdFill, BrokerdError, BrokerdPosition, + Order, + Status, + # Cancel, + BrokerdCancel, + BrokerdOrder, + # BrokerdOrderAck, + BrokerdStatus, + BrokerdFill, + BrokerdError, + BrokerdPosition, ) @@ -90,8 +98,7 @@ def mk_check( ) -@dataclass -class _DarkBook: +class _DarkBook(Struct): ''' EMS-trigger execution book. @@ -116,17 +123,24 @@ class _DarkBook: dict, # cmd / msg type ] ] - ] = field(default_factory=dict) + ] = {} # tracks most recent values per symbol each from data feed lasts: dict[ str, float, - ] = field(default_factory=dict) + ] = {} - # mapping of piker ems order ids to current brokerd order flow message - _ems_entries: dict[str, str] = field(default_factory=dict) - _ems2brokerd_ids: dict[str, str] = field(default_factory=bidict) + # _ems_entries: dict[str, str] = {} + _active: dict = {} + + # mapping of ems dialog ids to msg flow history + _msgflows: defaultdict[ + int, + ChainMap[dict[str, dict]], + ] = defaultdict(ChainMap) + + _ems2brokerd_ids: dict[str, str] = bidict() # XXX: this is in place to prevent accidental positions that are too @@ -181,6 +195,7 @@ async def clear_dark_triggers( for oid, ( pred, tf, + # TODO: send this msg instead? cmd, percent_away, abs_diff_away @@ -188,9 +203,9 @@ async def clear_dark_triggers( tuple(execs.items()) ): if ( - not pred or - ttype not in tf or - not pred(price) + not pred + or ttype not in tf + or not pred(price) ): # log.runtime( # f'skipping quote for {sym} ' @@ -200,30 +215,29 @@ async def clear_dark_triggers( # majority of iterations will be non-matches continue + brokerd_msg: Optional[BrokerdOrder] = None match cmd: # alert: nothing to do but relay a status # back to the requesting ems client - case { - 'action': 'alert', - }: - resp = 'alert_triggered' + case Order(action='alert'): + resp = 'triggered' # executable order submission - case { - 'action': action, - 'symbol': symbol, - 'account': account, - 'size': size, - }: + case Order( + action=action, + symbol=symbol, + account=account, + size=size, + ): bfqsn: str = symbol.replace(f'.{broker}', '') submit_price = price + abs_diff_away - resp = 'dark_triggered' # hidden on client-side + resp = 'triggered' # hidden on client-side log.info( f'Dark order triggered for price {price}\n' f'Submitting order @ price {submit_price}') - live_req = BrokerdOrder( + brokerd_msg = BrokerdOrder( action=action, oid=oid, account=account, @@ -232,26 +246,22 @@ async def clear_dark_triggers( price=submit_price, size=size, ) - await brokerd_orders_stream.send(live_req) - # mark this entry as having sent an order - # request. the entry will be replaced once the - # target broker replies back with - # a ``BrokerdOrderAck`` msg including the - # allocated unique ``BrokerdOrderAck.reqid`` key - # generated by the broker's own systems. - book._ems_entries[oid] = live_req + await brokerd_orders_stream.send(brokerd_msg) + + # book._ems_entries[oid] = live_req + # book._msgflows[oid].maps.insert(0, live_req) case _: raise ValueError(f'Invalid dark book entry: {cmd}') # fallthrough logic - resp = Status( + status = Status( oid=oid, # ems dialog id time_ns=time.time_ns(), resp=resp, - trigger_price=price, - brokerd_msg=cmd, + req=cmd, + brokerd_msg=brokerd_msg, ) # remove exec-condition from set @@ -262,9 +272,24 @@ async def clear_dark_triggers( f'pred for {oid} was already removed!?' ) + # update actives + # mark this entry as having sent an order + # request. the entry will be replaced once the + # target broker replies back with + # a ``BrokerdOrderAck`` msg including the + # allocated unique ``BrokerdOrderAck.reqid`` key + # generated by the broker's own systems. + if cmd.action == 'alert': + # don't register the alert status (so it won't + # be reloaded by clients) since it's now + # complete / closed. + book._active.pop(oid) + else: + book._active[oid] = status + # send response to client-side try: - await ems_client_order_stream.send(resp) + await ems_client_order_stream.send(status) except ( trio.ClosedResourceError, ): @@ -281,8 +306,7 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') -@dataclass -class TradesRelay: +class TradesRelay(Struct): # for now we keep only a single connection open with # each ``brokerd`` for simplicity. @@ -318,7 +342,10 @@ class Router(Struct): # order id to client stream map clients: set[tractor.MsgStream] = set() - dialogues: dict[str, list[tractor.MsgStream]] = {} + dialogues: dict[ + str, + list[tractor.MsgStream] + ] = {} # brokername to trades-dialogues streams with ``brokerd`` actors relays: dict[str, TradesRelay] = {} @@ -341,11 +368,12 @@ class Router(Struct): loglevel: str, ) -> tuple[dict, tractor.MsgStream]: - '''Open and yield ``brokerd`` trades dialogue context-stream if none - already exists. + ''' + Open and yield ``brokerd`` trades dialogue context-stream if + none already exists. ''' - relay = self.relays.get(feed.mod.name) + relay: TradesRelay = self.relays.get(feed.mod.name) if ( relay is None @@ -381,6 +409,22 @@ class Router(Struct): relay.consumers -= 1 + async def client_broadcast( + self, + msg: dict, + + ) -> None: + for client_stream in self.clients.copy(): + try: + await client_stream.send(msg) + except( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + self.clients.remove(client_stream) + log.warning( + f'client for {client_stream} was already closed?') + _router: Router = None @@ -452,7 +496,6 @@ async def open_brokerd_trades_dialogue( async with ( open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), brokerd_ctx.open_stream() as brokerd_trades_stream, - ): # XXX: really we only want one stream per `emsd` actor # to relay global `brokerd` order events unless we're @@ -502,14 +545,9 @@ async def open_brokerd_trades_dialogue( task_status.started(relay) - await translate_and_relay_brokerd_events( - broker, - brokerd_trades_stream, - _router, - ) - # this context should block here indefinitely until # the ``brokerd`` task either dies or is cancelled + await trio.sleep_forever() finally: # parent context must have been closed @@ -561,15 +599,14 @@ async def translate_and_relay_brokerd_events( broker ems 'error' -> log it locally (for now) - 'status' -> relabel as 'broker_', if complete send 'executed' - 'fill' -> 'broker_filled' + ('status' | 'fill'} -> relayed through see ``Status`` msg type. Currently handled status values from IB: {'presubmitted', 'submitted', 'cancelled', 'inactive'} ''' - book = router.get_dark_book(broker) - relay = router.relays[broker] + book: _DarkBook = router.get_dark_book(broker) + relay: TradesRelay = router.relays[broker] assert relay.brokerd_dialogue == brokerd_trades_stream @@ -601,30 +638,16 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - for client_stream in router.clients.copy(): - try: - await client_stream.send(pos_msg) - except( - trio.ClosedResourceError, - trio.BrokenResourceError, - ): - router.clients.remove(client_stream) - log.warning( - f'client for {client_stream} was already closed?') - + await router.client_broadcast(pos_msg) continue # BrokerdOrderAck + # initial response to brokerd order request case { 'name': 'ack', 'reqid': reqid, # brokerd generated order-request id 'oid': oid, # ems order-dialog id - } if ( - entry := book._ems_entries.get(oid) - ): - # initial response to brokerd order request - # if name == 'ack': - + }: # register the brokerd request id (that was generated # / created internally by the broker backend) with our # local ems order id for reverse lookup later. @@ -639,23 +662,23 @@ async def translate_and_relay_brokerd_events( # new order which has not yet be registered into the # local ems book, insert it now and handle 2 cases: - # - the order has previously been requested to be + # 1. the order has previously been requested to be # cancelled by the ems controlling client before we # received this ack, in which case we relay that cancel # signal **asap** to the backend broker - action = getattr(entry, 'action', None) - if action and action == 'cancel': + status_msg = book._active[oid] + req = status_msg.req + if req and req.action == 'cancel': # assign newly providerd broker backend request id - entry.reqid = reqid + # and tell broker to cancel immediately + status_msg.reqid = reqid + await brokerd_trades_stream.send(req) - # tell broker to cancel immediately - await brokerd_trades_stream.send(entry) - - # - the order is now active and will be mirrored in + # 2. the order is now active and will be mirrored in # our book -> registered as live flow else: - # update the flow with the ack msg - book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) + # TODO: should we relay this ack state? + status_msg.resp = 'pending' # no msg to client necessary continue @@ -666,11 +689,9 @@ async def translate_and_relay_brokerd_events( 'oid': oid, # ems order-dialog id 'reqid': reqid, # brokerd generated order-request id 'symbol': sym, - 'broker_details': details, - # 'reason': reason, - }: + } if status_msg := book._active.get(oid): + msg = BrokerdError(**brokerd_msg) - resp = 'broker_errored' log.error(pformat(msg)) # XXX make one when it's blank? # TODO: figure out how this will interact with EMS clients @@ -680,43 +701,61 @@ async def translate_and_relay_brokerd_events( # some unexpected failure - something we need to think more # about. In most default situations, with composed orders # (ex. brackets), most brokers seem to use a oca policy. + ems_client_order_stream = router.dialogues[oid] + status_msg.resp = 'error' + status_msg.brokerd_msg = msg + book._active[oid] = status_msg + await ems_client_order_stream.send(status_msg) # BrokerdStatus case { 'name': 'status', 'status': status, 'reqid': reqid, # brokerd generated order-request id - # TODO: feels like the wrong msg for this field? - 'remaining': remaining, } if ( - oid := book._ems2brokerd_ids.inverse.get(reqid) + (oid := book._ems2brokerd_ids.inverse.get(reqid)) + and status in ( + 'canceled', + 'open', + 'closed', + ) ): msg = BrokerdStatus(**brokerd_msg) - # TODO: should we flatten out these cases and/or should - # they maybe even eventually be separate messages? - if status == 'cancelled': - log.info(f'Cancellation for {oid} is complete!') + # TODO: maybe pack this into a composite type that + # contains both the IPC stream as well the + # msg-chain/dialog. + ems_client_order_stream = router.dialogues[oid] + status_msg = book._active[oid] + status_msg.resp = status - if status == 'filled': - # conditional execution is fully complete, no more - # fills for the noted order - if not remaining: + # retrieve existing live flow + old_reqid = status_msg.reqid + if old_reqid and old_reqid != reqid: + log.warning( + f'Brokerd order id change for {oid}:\n' + f'{old_reqid}:{type(old_reqid)} ->' + f' {reqid}{type(reqid)}' + ) - resp = 'broker_executed' + status_msg.reqid = reqid # THIS LINE IS CRITICAL! + status_msg.brokerd_msg = msg + status_msg.src = msg.broker_details['name'] + await ems_client_order_stream.send(status_msg) - # be sure to pop this stream from our dialogue set - # since the order dialogue should be done. - log.info(f'Execution for {oid} is complete!') + if status == 'closed': + log.info(f'Execution for {oid} is complete!') + status_msg = book._active.pop(oid) + elif status == 'canceled': + log.cancel(f'Cancellation for {oid} is complete!') + status_msg = book._active.pop(oid) + + else: # open + # relayed from backend but probably not handled so # just log it - else: - log.info(f'{broker} filled {msg}') - - else: - # one of {submitted, cancelled} - resp = 'broker_' + msg.status + log.info(f'{broker} opened order {msg}') # BrokerdFill case { @@ -728,82 +767,112 @@ async def translate_and_relay_brokerd_events( ): # proxy through the "fill" result(s) msg = BrokerdFill(**brokerd_msg) - resp = 'broker_filled' - log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') - # unknown valid message case? - # case { - # 'name': name, - # 'symbol': sym, - # 'reqid': reqid, # brokerd generated order-request id - # # 'oid': oid, # ems order-dialog id - # 'broker_details': details, + ems_client_order_stream = router.dialogues[oid] - # } if ( - # book._ems2brokerd_ids.inverse.get(reqid) is None - # ): - # # TODO: pretty sure we can drop this now? + # XXX: bleh, a fill can come after 'closed' from `ib`? + # only send a late fill event we haven't already closed + # out the dialog status locally. + status_msg = book._active.get(oid) + if status_msg: + status_msg.resp = 'fill' + status_msg.reqid = reqid + status_msg.brokerd_msg = msg + await ems_client_order_stream.send(status_msg) - # # XXX: paper clearing special cases - # # paper engine race case: ``Client.submit_limit()`` hasn't - # # returned yet and provided an output reqid to register - # # locally, so we need to retreive the oid that was already - # # packed at submission since we already know it ahead of - # # time - # paper = details.get('paper_info') - # ext = details.get('external') + # ``Status`` containing an embedded order msg which + # should be loaded as a "pre-existing open order" from the + # brokerd backend. + case { + 'name': 'status', + 'resp': status, + 'reqid': reqid, # brokerd generated order-request id + }: + if ( + status != 'open' + ): + # TODO: check for an oid we might know since it was + # registered from a previous order/status load? + log.error( + f'Unknown/transient status msg:\n' + f'{pformat(brokerd_msg)}\n' + 'Unable to relay message to client side!?' + ) - # if paper: - # # paperboi keeps the ems id up front - # oid = paper['oid'] + # TODO: we probably want some kind of "tagging" system + # for external order submissions like this eventually + # to be able to more formally handle multi-player + # trading... + else: + # existing open backend order which we broadcast to + # all currently connected clients. + log.info( + f'Relaying existing open order:\n {brokerd_msg}' + ) - # elif ext: - # # may be an order msg specified as "external" to the - # # piker ems flow (i.e. generated by some other - # # external broker backend client (like tws for ib) - # log.error(f"External trade event {name}@{ext}") + # use backend request id as our ems id though this + # may end up with collisions? + status_msg = Status(**brokerd_msg) + order = Order(**status_msg.req) + assert order.price and order.size + status_msg.req = order - # else: - # # something is out of order, we don't have an oid for - # # this broker-side message. - # log.error( - # f'Unknown oid: {oid} for msg {name}:\n' - # f'{pformat(brokerd_msg)}\n' - # 'Unable to relay message to client side!?' - # ) + assert status_msg.src # source tag? + oid = str(status_msg.reqid) - # continue + # attempt to avoid collisions + status_msg.reqid = oid + assert status_msg.resp == 'open' + + # register this existing broker-side dialog + book._ems2brokerd_ids[oid] = reqid + book._active[oid] = status_msg + + # fan-out-relay position msgs immediately by + # broadcasting updates on all client streams + await router.client_broadcast(status_msg) + + # don't fall through + continue + + # brokerd error + case { + 'name': 'status', + 'status': 'error', + }: + log.error(f'Broker error:\n{pformat(brokerd_msg)}') + # XXX: we presume the brokerd cancels its own order + + # TOO FAST ``BrokerdStatus`` that arrives + # before the ``BrokerdAck``. + case { + # XXX: sometimes there is a race with the backend (like + # `ib` where the pending stauts will be related before + # the ack, in which case we just ignore the faster + # pending msg and wait for our expected ack to arrive + # later (i.e. the first block below should enter). + 'name': 'status', + 'status': status, + 'reqid': reqid, + }: + oid = book._ems2brokerd_ids.inverse.get(reqid) + msg = f'Unhandled broker status for dialog {reqid}:\n' + if oid: + status_msg = book._active[oid] + msg += ( + f'last status msg: {pformat(status_msg)}\n\n' + f'this msg:{pformat(brokerd_msg)}\n' + ) + + log.warning(msg) case _: raise ValueError(f'Brokerd message {brokerd_msg} is invalid') - # retrieve existing live flow - entry = book._ems_entries[oid] - assert entry.oid == oid - - old_reqid = entry.reqid - if old_reqid and old_reqid != reqid: - log.warning( - f'Brokerd order id change for {oid}:\n' - f'{old_reqid} -> {reqid}' - ) - - # Create and relay response status message - # to requesting EMS client - try: - ems_client_order_stream = router.dialogues[oid] - await ems_client_order_stream.send( - Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=msg, - ) - ) - except KeyError: - log.error( - f'Received `brokerd` msg for unknown client with oid: {oid}') + # XXX: ugh sometimes we don't access it? + if status_msg: + del status_msg # TODO: do we want this to keep things cleaned up? # it might require a special status from brokerd to affirm the @@ -829,27 +898,36 @@ async def process_client_order_cmds( async for cmd in client_order_stream: log.info(f'Received order cmd:\n{pformat(cmd)}') - oid = cmd['oid'] + # CAWT DAMN we need struct support! + oid = str(cmd['oid']) + # register this stream as an active dialogue for this order id # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory # others who are registered for such order affiliated msgs). client_dialogues[oid] = client_order_stream reqid = dark_book._ems2brokerd_ids.inverse.get(oid) - live_entry = dark_book._ems_entries.get(oid) + + # any dark/live status which is current + status = dark_book._active.get(oid) match cmd: # existing live-broker order cancel case { 'action': 'cancel', 'oid': oid, - } if live_entry: - reqid = live_entry.reqid - msg = BrokerdCancel( + } if ( + (status := dark_book._active.get(oid)) + and status.resp in ('open', 'pending') + ): + reqid = status.reqid + order = status.req + to_brokerd_msg = BrokerdCancel( oid=oid, reqid=reqid, time_ns=time.time_ns(), - account=live_entry.account, + # account=live_entry.account, + account=order.account, ) # NOTE: cancel response will be relayed back in messages @@ -859,39 +937,53 @@ async def process_client_order_cmds( log.info( f'Submitting cancel for live order {reqid}' ) - await brokerd_order_stream.send(msg) + await brokerd_order_stream.send(to_brokerd_msg) else: # this might be a cancel for an order that hasn't been # acked yet by a brokerd, so register a cancel for when # the order ack does show up later such that the brokerd # order request can be cancelled at that time. - dark_book._ems_entries[oid] = msg + # dark_book._ems_entries[oid] = msg + # special case for now.. + status.req = to_brokerd_msg # dark trigger cancel case { 'action': 'cancel', 'oid': oid, - } if not live_entry: - try: - # remove from dark book clearing - dark_book.orders[symbol].pop(oid, None) + } if ( + status and status.resp == 'dark_open' + # or status and status.req + ): + # remove from dark book clearing + entry = dark_book.orders[symbol].pop(oid, None) + if entry: + ( + pred, + tickfilter, + cmd, + percent_away, + abs_diff_away + ) = entry # tell client side that we've cancelled the # dark-trigger order - await client_order_stream.send( - Status( - resp='dark_cancelled', - oid=oid, - time_ns=time.time_ns(), - ) - ) + status.resp = 'canceled' + status.req = cmd + + await client_order_stream.send(status) # de-register this client dialogue router.dialogues.pop(oid) + dark_book._active.pop(oid) - except KeyError: + else: log.exception(f'No dark order for {symbol}?') + # TODO: eventually we should be receiving + # this struct on the wire unpacked in a scoped protocol + # setup with ``tractor``. + # live order submission case { 'oid': oid, @@ -899,11 +991,9 @@ async def process_client_order_cmds( 'price': trigger_price, 'size': size, 'action': ('buy' | 'sell') as action, - 'exec_mode': 'live', + 'exec_mode': ('live' | 'paper'), }: - # TODO: eventually we should be receiving - # this struct on the wire unpacked in a scoped protocol - # setup with ``tractor``. + # TODO: relay this order msg directly? req = Order(**cmd) broker = req.brokers[0] @@ -912,13 +1002,13 @@ async def process_client_order_cmds( # aren't expectig their own name, but should they? sym = fqsn.replace(f'.{broker}', '') - if live_entry is not None: - # sanity check on emsd id - assert live_entry.oid == oid - reqid = live_entry.reqid + if status is not None: # if we already had a broker order id then # this is likely an order update commmand. log.info(f"Modifying live {broker} order: {reqid}") + reqid = status.reqid + status.req = req + status.resp = 'pending' msg = BrokerdOrder( oid=oid, # no ib support for oids... @@ -935,6 +1025,18 @@ async def process_client_order_cmds( account=req.account, ) + if status is None: + status = Status( + oid=oid, + reqid=reqid, + resp='pending', + time_ns=time.time_ns(), + brokerd_msg=msg, + req=req, + ) + + dark_book._active[oid] = status + # send request to backend # XXX: the trades data broker response loop # (``translate_and_relay_brokerd_events()`` above) will @@ -950,7 +1052,7 @@ async def process_client_order_cmds( # client, before that ack, when the ack does arrive we # immediately take the reqid from the broker and cancel # that live order asap. - dark_book._ems_entries[oid] = msg + # dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) # dark-order / alert submission case { @@ -966,9 +1068,11 @@ async def process_client_order_cmds( # submit order to local EMS book and scan loop, # effectively a local clearing engine, which # scans for conditions and triggers matching executions - exec_mode in ('dark', 'paper') + exec_mode in ('dark',) or action == 'alert' ): + req = Order(**cmd) + # Auto-gen scanner predicate: # we automatically figure out what the alert check # condition should be based on the current first @@ -1015,23 +1119,25 @@ async def process_client_order_cmds( )[oid] = ( pred, tickfilter, - cmd, + req, percent_away, abs_diff_away ) - resp = 'dark_submitted' + resp = 'dark_open' # alerts have special msgs to distinguish - if action == 'alert': - resp = 'alert_submitted' + # if action == 'alert': + # resp = 'open' - await client_order_stream.send( - Status( - resp=resp, - oid=oid, - time_ns=time.time_ns(), - ) + status = Status( + resp=resp, + oid=oid, + time_ns=time.time_ns(), + req=req, + src='dark', ) + dark_book._active[oid] = status + await client_order_stream.send(status) @tractor.context @@ -1099,10 +1205,9 @@ async def _emsd_main( ): # XXX: this should be initial price quote from target provider - first_quote = feed.first_quotes[fqsn] - - book = _router.get_dark_book(broker) - book.lasts[fqsn] = first_quote['last'] + first_quote: dict = feed.first_quotes[fqsn] + book: _DarkBook = _router.get_dark_book(broker) + book.lasts[fqsn]: float = first_quote['last'] # open a stream with the brokerd backend for order # flow dialogue @@ -1129,12 +1234,25 @@ async def _emsd_main( await ems_ctx.started(( relay.positions, list(relay.accounts), + book._active, )) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates async with ems_ctx.open_stream() as ems_client_order_stream: + # register the client side before startingn the + # brokerd-side relay task to ensure the client is + # delivered all exisiting open orders on startup. + _router.clients.add(ems_client_order_stream) + + n.start_soon( + translate_and_relay_brokerd_events, + broker, + brokerd_stream, + _router, + ) + # trigger scan and exec loop n.start_soon( clear_dark_triggers, @@ -1149,7 +1267,6 @@ async def _emsd_main( # start inbound (from attached client) order request processing try: - _router.clients.add(ems_client_order_stream) # main entrypoint, run here until cancelled. await process_client_order_cmds( diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index c30ada54..f8fd6937 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -18,24 +18,92 @@ Clearing sub-system message and protocols. """ -from typing import Optional, Union +# from collections import ( +# ChainMap, +# deque, +# ) +from typing import ( + Optional, + Literal, +) from ..data._source import Symbol from ..data.types import Struct +# TODO: a composite for tracking msg flow on 2-legged +# dialogs. +# class Dialog(ChainMap): +# ''' +# Msg collection abstraction to easily track the state changes of +# a msg flow in one high level, query-able and immutable construct. + +# The main use case is to query data from a (long-running) +# msg-transaction-sequence + + +# ''' +# def update( +# self, +# msg, +# ) -> None: +# self.maps.insert(0, msg.to_dict()) + +# def flatten(self) -> dict: +# return dict(self) + + # TODO: ``msgspec`` stuff worth paying attention to: -# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution +# - schema evolution: +# https://jcristharif.com/msgspec/usage.html#schema-evolution +# - for eg. ``BrokerdStatus``, instead just have separate messages? # - use literals for a common msg determined by diff keys? # - https://jcristharif.com/msgspec/usage.html#literal -# - for eg. ``BrokerdStatus``, instead just have separate messages? # -------------- # Client -> emsd # -------------- +class Order(Struct): + + # TODO: ideally we can combine these 2 fields into + # 1 and just use the size polarity to determine a buy/sell. + # i would like to see this become more like + # https://jcristharif.com/msgspec/usage.html#literal + # action: Literal[ + # 'live', + # 'dark', + # 'alert', + # ] + + action: Literal[ + 'buy', + 'sell', + 'alert', + ] + # determines whether the create execution + # will be submitted to the ems or directly to + # the backend broker + exec_mode: Literal[ + 'dark', + 'live', + # 'paper', no right? + ] + + # internal ``emdsd`` unique "order id" + oid: str # uuid4 + symbol: str | Symbol + account: str # should we set a default as '' ? + + price: float + size: float # -ve is "sell", +ve is "buy" + + brokers: Optional[list[str]] = [] + + class Cancel(Struct): - '''Cancel msg for removing a dark (ems triggered) or + ''' + Cancel msg for removing a dark (ems triggered) or broker-submitted (live) trigger/order. ''' @@ -44,32 +112,6 @@ class Cancel(Struct): symbol: str -class Order(Struct): - - # TODO: use ``msgspec.Literal`` - # https://jcristharif.com/msgspec/usage.html#literal - action: str # {'buy', 'sell', 'alert'} - # internal ``emdsd`` unique "order id" - oid: str # uuid4 - symbol: Union[str, Symbol] - account: str # should we set a default as '' ? - - price: float - # TODO: could we drop the ``.action`` field above and instead just - # use +/- values here? Would make the msg smaller at the sake of a - # teensie fp precision? - size: float - brokers: list[str] - - # Assigned once initial ack is received - # ack_time_ns: Optional[int] = None - - # determines whether the create execution - # will be submitted to the ems or directly to - # the backend broker - exec_mode: str # {'dark', 'live', 'paper'} - - # -------------- # Client <- emsd # -------------- @@ -79,37 +121,39 @@ class Order(Struct): class Status(Struct): name: str = 'status' - oid: str # uuid4 time_ns: int + oid: str # uuid4 ems-order dialog id - # { - # 'dark_submitted', - # 'dark_cancelled', - # 'dark_triggered', - - # 'broker_submitted', - # 'broker_cancelled', - # 'broker_executed', - # 'broker_filled', - # 'broker_errored', - - # 'alert_submitted', - # 'alert_triggered', - - # } - resp: str # "response", see above - - # trigger info - trigger_price: Optional[float] = None - # price: float - - # broker: Optional[str] = None + resp: Literal[ + 'pending', # acked by broker but not yet open + 'open', + 'dark_open', # dark/algo triggered order is open in ems clearing loop + 'triggered', # above triggered order sent to brokerd, or an alert closed + 'closed', # fully cleared all size/units + 'fill', # partial execution + 'canceled', + 'error', + ] # this maps normally to the ``BrokerdOrder.reqid`` below, an id # normally allocated internally by the backend broker routing system - broker_reqid: Optional[Union[int, str]] = None + reqid: Optional[int | str] = None - # for relaying backend msg data "through" the ems layer + # the (last) source order/request msg if provided + # (eg. the Order/Cancel which causes this msg) and + # acts as a back-reference to the corresponding + # request message which was the source of this msg. + req: Optional[Order | Cancel] = None + + # XXX: better design/name here? + # flag that can be set to indicate a message for an order + # event that wasn't originated by piker's emsd (eg. some external + # trading system which does it's own order control but that you + # might want to "track" using piker UIs/systems). + src: Optional[str] = None + + # for relaying a boxed brokerd-dialog-side msg data "through" the + # ems layer to clients. brokerd_msg: dict = {} @@ -131,25 +175,28 @@ class BrokerdCancel(Struct): # for setting a unique order id then this value will be relayed back # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` # field - reqid: Optional[Union[int, str]] = None + reqid: Optional[int | str] = None class BrokerdOrder(Struct): - action: str # {buy, sell} oid: str account: str time_ns: int + # TODO: if we instead rely on a +ve/-ve size to determine + # the action we more or less don't need this field right? + action: str = '' # {buy, sell} + # "broker request id": broker specific/internal order id if this is # None, creates a new order otherwise if the id is valid the backend # api must modify the existing matching order. If the broker allows # for setting a unique order id then this value will be relayed back # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` # field - reqid: Optional[Union[int, str]] = None + reqid: Optional[int | str] = None - symbol: str # symbol. ? + symbol: str # fqsn price: float size: float @@ -170,7 +217,7 @@ class BrokerdOrderAck(Struct): name: str = 'ack' # defined and provided by backend - reqid: Union[int, str] + reqid: int | str # emsd id originally sent in matching request msg oid: str @@ -180,30 +227,22 @@ class BrokerdOrderAck(Struct): class BrokerdStatus(Struct): name: str = 'status' - reqid: Union[int, str] + reqid: int | str time_ns: int + status: Literal[ + 'open', + 'canceled', + 'fill', + 'pending', + 'error', + ] - # XXX: should be best effort set for every update - account: str = '' - - # TODO: instead (ack, pending, open, fill, clos(ed), cancelled) - # { - # 'submitted', - # 'cancelled', - # 'filled', - # } - status: str - + account: str filled: float = 0.0 reason: str = '' remaining: float = 0.0 - # XXX: better design/name here? - # flag that can be set to indicate a message for an order - # event that wasn't originated by piker's emsd (eg. some external - # trading system which does it's own order control but that you - # might want to "track" using piker UIs/systems). - external: bool = False + # external: bool = False # XXX: not required schema as of yet broker_details: dict = { @@ -218,7 +257,7 @@ class BrokerdFill(Struct): ''' name: str = 'fill' - reqid: Union[int, str] + reqid: int | str time_ns: int # order exeuction related @@ -248,7 +287,7 @@ class BrokerdError(Struct): # if no brokerd order request was actually submitted (eg. we errored # at the ``pikerd`` layer) then there will be ``reqid`` allocated. - reqid: Optional[Union[int, str]] = None + reqid: Optional[int | str] = None symbol: str reason: str diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 2160bfca..2936ff59 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -33,10 +33,10 @@ from bidict import bidict import pendulum import trio import tractor -from dataclasses import dataclass from .. import data from ..data._source import Symbol +from ..data.types import Struct from ..pp import ( Position, Transaction, @@ -45,16 +45,20 @@ from ..data._normalize import iterticks from ..data._source import unpack_fqsn from ..log import get_logger from ._messages import ( - BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, - BrokerdFill, BrokerdPosition, BrokerdError + BrokerdCancel, + BrokerdOrder, + BrokerdOrderAck, + BrokerdStatus, + BrokerdFill, + BrokerdPosition, + BrokerdError, ) log = get_logger(__name__) -@dataclass -class PaperBoi: +class PaperBoi(Struct): """ Emulates a broker order client providing the same API and delivering an order-event response stream but with methods for @@ -68,8 +72,8 @@ class PaperBoi: # map of paper "live" orders which be used # to simulate fills based on paper engine settings - _buys: bidict - _sells: bidict + _buys: dict + _sells: dict _reqids: bidict _positions: dict[str, Position] _trade_ledger: dict[str, Any] @@ -94,6 +98,10 @@ class PaperBoi: ''' is_modify: bool = False + if action == 'alert': + # bypass all fill simulation + return reqid + entry = self._reqids.get(reqid) if entry: # order is already existing, this is a modify @@ -104,10 +112,6 @@ class PaperBoi: # register order internally self._reqids[reqid] = (oid, symbol, action, price) - if action == 'alert': - # bypass all fill simulation - return reqid - # TODO: net latency model # we checkpoint here quickly particulalry # for dark orders since we want the dark_executed @@ -119,7 +123,9 @@ class PaperBoi: size = -size msg = BrokerdStatus( - status='submitted', + status='open', + # account=f'paper_{self.broker}', + account='paper', reqid=reqid, time_ns=time.time_ns(), filled=0.0, @@ -136,7 +142,14 @@ class PaperBoi: ) or ( action == 'sell' and (clear_price := self.last_bid[0]) >= price ): - await self.fake_fill(symbol, clear_price, size, action, reqid, oid) + await self.fake_fill( + symbol, + clear_price, + size, + action, + reqid, + oid, + ) else: # register this submissions as a paper live order @@ -178,7 +191,9 @@ class PaperBoi: await trio.sleep(0.05) msg = BrokerdStatus( - status='cancelled', + status='canceled', + # account=f'paper_{self.broker}', + account='paper', reqid=reqid, time_ns=time.time_ns(), broker_details={'name': 'paperboi'}, @@ -230,25 +245,14 @@ class PaperBoi: self._trade_ledger.update(fill_msg.to_dict()) if order_complete: - msg = BrokerdStatus( - reqid=reqid, time_ns=time.time_ns(), - - status='filled', + # account=f'paper_{self.broker}', + account='paper', + status='closed', filled=size, remaining=0 if order_complete else remaining, - - broker_details={ - 'paper_info': { - 'oid': oid, - }, - 'action': action, - 'size': size, - 'price': price, - 'name': self.broker, - }, ) await self.ems_trades_stream.send(msg) @@ -257,7 +261,10 @@ class PaperBoi: pp = self._positions.setdefault( token, Position( - Symbol(key=symbol), + Symbol( + key=symbol, + broker_info={self.broker: {}}, + ), size=size, ppu=price, bsuid=symbol, @@ -390,72 +397,75 @@ async def handle_order_requests( ) -> None: - # order_request: dict + request_msg: dict async for request_msg in ems_order_stream: + match request_msg: + case {'action': ('buy' | 'sell')}: + order = BrokerdOrder(**request_msg) + account = order.account + if account != 'paper': + log.error( + 'This is a paper account,' + ' only a `paper` selection is valid' + ) + await ems_order_stream.send(BrokerdError( + oid=order.oid, + symbol=order.symbol, + reason=f'Paper only. No account found: `{account}` ?', + )) + continue - action = request_msg['action'] + reqid = order.reqid or str(uuid.uuid4()) - if action in {'buy', 'sell'}: - - account = request_msg['account'] - if account != 'paper': - log.error( - 'This is a paper account,' - ' only a `paper` selection is valid' + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + oid=order.oid, + reqid=reqid, + ) ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=f'Paper only. No account found: `{account}` ?', - )) - continue - # validate - order = BrokerdOrder(**request_msg) - - if order.reqid is None: - reqid = str(uuid.uuid4()) - else: - reqid = order.reqid - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - - # ems order request id + # call our client api to submit the order + reqid = await client.submit_limit( oid=order.oid, - - # broker specific request id + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + # XXX: by default 0 tells ``ib_insync`` methods that + # there is no existing order so ask the client to create + # a new one (which it seems to do by allocating an int + # counter - collision prone..) reqid=reqid, - ) - ) - # call our client api to submit the order - reqid = await client.submit_limit( + # elif action == 'cancel': + case {'action': 'cancel'}: + msg = BrokerdCancel(**request_msg) + await client.submit_cancel( + reqid=msg.reqid + ) - oid=order.oid, - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, + case _: + log.error(f'Unknown order command: {request_msg}') - # XXX: by default 0 tells ``ib_insync`` methods that - # there is no existing order so ask the client to create - # a new one (which it seems to do by allocating an int - # counter - collision prone..) - reqid=reqid, - ) - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) - - await client.submit_cancel( - reqid=msg.reqid - ) - - else: - log.error(f'Unknown order command: {request_msg}') +_reqids: bidict[str, tuple] = {} +_buys: dict[ + str, + dict[ + tuple[str, float], + tuple[float, str, str], + ] +] = {} +_sells: dict[ + str, + dict[ + tuple[str, float], + tuple[float, str, str], + ] +] = {} +_positions: dict[str, Position] = {} @tractor.context @@ -467,6 +477,7 @@ async def trades_dialogue( loglevel: str = None, ) -> None: + tractor.log.get_console_log(loglevel) async with ( @@ -476,10 +487,22 @@ async def trades_dialogue( ) as feed, ): + pp_msgs: list[BrokerdPosition] = [] + pos: Position + token: str # f'{symbol}.{self.broker}' + for token, pos in _positions.items(): + pp_msgs.append(BrokerdPosition( + broker=broker, + account='paper', + symbol=pos.symbol.front_fqsn(), + size=pos.size, + avg_price=pos.ppu, + )) + # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` # await ctx.started(all_positions) - await ctx.started(({}, ['paper'])) + await ctx.started((pp_msgs, ['paper'])) async with ( ctx.open_stream() as ems_stream, @@ -488,13 +511,13 @@ async def trades_dialogue( client = PaperBoi( broker, ems_stream, - _buys={}, - _sells={}, + _buys=_buys, + _sells=_sells, - _reqids={}, + _reqids=_reqids, # TODO: load paper positions from ``positions.toml`` - _positions={}, + _positions=_positions, # TODO: load postions from ledger file _trade_ledger={}, diff --git a/piker/data/types.py b/piker/data/types.py index d8926610..c23f6266 100644 --- a/piker/data/types.py +++ b/piker/data/types.py @@ -18,6 +18,7 @@ Built-in (extension) types. """ +import sys from typing import Optional from pprint import pformat @@ -42,7 +43,15 @@ class Struct( } def __repr__(self): - return f'Struct({pformat(self.to_dict())})' + # only turn on pprint when we detect a python REPL + # at runtime B) + if ( + hasattr(sys, 'ps1') + # TODO: check if we're in pdb + ): + return f'Struct({pformat(self.to_dict())})' + + return super().__repr__() def copy( self, diff --git a/piker/ui/_editors.py b/piker/ui/_editors.py index 03fd208e..38d30da4 100644 --- a/piker/ui/_editors.py +++ b/piker/ui/_editors.py @@ -140,9 +140,9 @@ class LineEditor: ) -> LevelLine: - staged_line = self._active_staged_line - if not staged_line: - raise RuntimeError("No line is currently staged!?") + # staged_line = self._active_staged_line + # if not staged_line: + # raise RuntimeError("No line is currently staged!?") # for now, until submission reponse arrives line.hide_labels() diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index d8f65dd9..71797a33 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -221,6 +221,7 @@ async def handle_viewmode_kb_inputs( # TODO: show pp config mini-params in status bar widget # mode.pp_config.show() + trigger_type: str = 'dark' if ( # 's' for "submit" to activate "live" order Qt.Key_S in pressed or @@ -228,9 +229,6 @@ async def handle_viewmode_kb_inputs( ): trigger_type: str = 'live' - else: - trigger_type: str = 'dark' - # order mode trigger "actions" if Qt.Key_D in pressed: # for "damp eet" action = 'sell' @@ -397,8 +395,11 @@ class ChartView(ViewBox): ''' if self._ic is None: - self.chart.pause_all_feeds() - self._ic = trio.Event() + try: + self.chart.pause_all_feeds() + self._ic = trio.Event() + except RuntimeError: + pass def signal_ic( self, @@ -411,9 +412,12 @@ class ChartView(ViewBox): ''' if self._ic: - self._ic.set() - self._ic = None - self.chart.resume_all_feeds() + try: + self._ic.set() + self._ic = None + self.chart.resume_all_feeds() + except RuntimeError: + pass @asynccontextmanager async def open_async_input_handler( @@ -669,7 +673,10 @@ class ChartView(ViewBox): # XXX: WHY ev.accept() - self.start_ic() + try: + self.start_ic() + except RuntimeError: + pass # if self._ic is None: # self.chart.pause_all_feeds() # self._ic = trio.Event() diff --git a/piker/ui/_lines.py b/piker/ui/_lines.py index 421d4ec8..697e889f 100644 --- a/piker/ui/_lines.py +++ b/piker/ui/_lines.py @@ -421,6 +421,10 @@ class LevelLine(pg.InfiniteLine): return path + @property + def marker(self) -> LevelMarker: + return self._marker + def hoverEvent(self, ev): ''' Mouse hover callback. diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 41078e05..cbe1bf9f 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -49,16 +49,21 @@ from ._position import ( SettingsPane, ) from ._forms import FieldsForm -# from ._label import FormatLabel from ._window import MultiStatus -from ..clearing._messages import Order, BrokerdPosition +from ..clearing._messages import ( + Order, + Status, + # BrokerdOrder, + # BrokerdStatus, + BrokerdPosition, +) from ._forms import open_form_input_handling log = get_logger(__name__) -class OrderDialog(Struct): +class Dialog(Struct): ''' Trade dialogue meta-data describing the lifetime of an order submission to ``emsd`` from a chart. @@ -74,38 +79,6 @@ class OrderDialog(Struct): fills: Dict[str, Any] = {} -def on_level_change_update_next_order_info( - - level: float, - - # these are all ``partial``-ed in at callback assignment time. - line: LevelLine, - order: Order, - tracker: PositionTracker, - -) -> None: - ''' - A callback applied for each level change to the line - which will recompute the order size based on allocator - settings. this is assigned inside - ``OrderMode.line_from_order()`` - - ''' - # NOTE: the ``Order.account`` is set at order stage time - # inside ``OrderMode.line_from_order()``. - order_info = tracker.alloc.next_order_info( - startup_pp=tracker.startup_pp, - live_pp=tracker.live_pp, - price=level, - action=order.action, - ) - line.update_labels(order_info) - - # update bound-in staged order - order.price = level - order.size = order_info['size'] - - @dataclass class OrderMode: ''' @@ -141,7 +114,7 @@ class OrderMode: current_pp: Optional[PositionTracker] = None active: bool = False name: str = 'order' - dialogs: dict[str, OrderDialog] = field(default_factory=dict) + dialogs: dict[str, Dialog] = field(default_factory=dict) _colors = { 'alert': 'alert_yellow', @@ -150,12 +123,45 @@ class OrderMode: } _staged_order: Optional[Order] = None + def on_level_change_update_next_order_info( + self, + level: float, + + # these are all ``partial``-ed in at callback assignment time. + line: LevelLine, + order: Order, + tracker: PositionTracker, + + ) -> None: + ''' + A callback applied for each level change to the line + which will recompute the order size based on allocator + settings. this is assigned inside + ``OrderMode.line_from_order()`` + + ''' + # NOTE: the ``Order.account`` is set at order stage time inside + # ``OrderMode.line_from_order()`` or is inside ``Order`` msg + # field for loaded orders. + order_info = tracker.alloc.next_order_info( + startup_pp=tracker.startup_pp, + live_pp=tracker.live_pp, + price=level, + action=order.action, + ) + line.update_labels(order_info) + + # update bound-in staged order + order.price = level + order.size = order_info['size'] + + # when an order is changed we flip the settings side-pane to + # reflect the corresponding account and pos info. + self.pane.on_ui_settings_change('account', order.account) + def line_from_order( self, - order: Order, - symbol: Symbol, - **line_kwargs, ) -> LevelLine: @@ -173,8 +179,8 @@ class OrderMode: color=self._colors[order.action], dotted=True if ( - order.exec_mode == 'dark' and - order.action != 'alert' + order.exec_mode == 'dark' + and order.action != 'alert' ) else False, **line_kwargs, @@ -184,10 +190,12 @@ class OrderMode: # immediately if order.action != 'alert': line._on_level_change = partial( - on_level_change_update_next_order_info, + self.on_level_change_update_next_order_info, line=line, order=order, - tracker=self.current_pp, + # use the corresponding position tracker for the + # order's account. + tracker=self.trackers[order.account], ) else: @@ -236,8 +244,6 @@ class OrderMode: line = self.line_from_order( order, - symbol, - show_markers=True, # just for the stage line to avoid # flickering while moving the cursor @@ -249,7 +255,6 @@ class OrderMode: # prevent flickering of marker while moving/tracking cursor only_show_markers_on_hover=False, ) - line = self.lines.stage_line(line) # hide crosshair y-line and label @@ -262,25 +267,26 @@ class OrderMode: def submit_order( self, + send_msg: bool = True, + order: Optional[Order] = None, - ) -> OrderDialog: + ) -> Dialog: ''' Send execution order to EMS return a level line to represent the order on a chart. ''' - staged = self._staged_order - symbol: Symbol = staged.symbol - oid = str(uuid.uuid4()) + if not order: + staged = self._staged_order + # apply order fields for ems + oid = str(uuid.uuid4()) + order = staged.copy() + order.oid = oid - # format order data for ems - order = staged.copy() - order.oid = oid - order.symbol = symbol.front_fqsn() + order.symbol = order.symbol.front_fqsn() line = self.line_from_order( order, - symbol, show_markers=True, only_show_markers_on_hover=True, @@ -298,17 +304,17 @@ class OrderMode: # color once the submission ack arrives. self.lines.submit_line( line=line, - uuid=oid, + uuid=order.oid, ) - dialog = OrderDialog( - uuid=oid, + dialog = Dialog( + uuid=order.oid, order=order, - symbol=symbol, + symbol=order.symbol, line=line, last_status_close=self.multistatus.open_status( - f'submitting {self._trigger_type}-{order.action}', - final_msg=f'submitted {self._trigger_type}-{order.action}', + f'submitting {order.exec_mode}-{order.action}', + final_msg=f'submitted {order.exec_mode}-{order.action}', clear_on_next=True, ) ) @@ -318,14 +324,21 @@ class OrderMode: # enter submission which will be popped once a response # from the EMS is received to move the order to a different# status - self.dialogs[oid] = dialog + self.dialogs[order.oid] = dialog # hook up mouse drag handlers line._on_drag_start = self.order_line_modify_start line._on_drag_end = self.order_line_modify_complete # send order cmd to ems - self.book.send(order) + if send_msg: + self.book.send(order) + else: + # just register for control over this order + # TODO: some kind of mini-perms system here based on + # an out-of-band tagging/auth sub-sys for multiplayer + # order control? + self.book._sent_orders[order.oid] = order return dialog @@ -363,7 +376,7 @@ class OrderMode: self, uuid: str - ) -> OrderDialog: + ) -> Dialog: ''' Order submitted status event handler. @@ -418,7 +431,7 @@ class OrderMode: self, uuid: str, - msg: Dict[str, Any], + msg: Status, ) -> None: @@ -442,7 +455,7 @@ class OrderMode: # TODO: add in standard fill/exec info that maybe we # pack in a broker independent way? - f'{msg["resp"]}: {msg["trigger_price"]}', + f'{msg.resp}: {msg.req.price}', ], ) log.runtime(result) @@ -502,7 +515,7 @@ class OrderMode: oid = dialog.uuid cancel_status_close = self.multistatus.open_status( - f'cancelling order {oid[:6]}', + f'cancelling order {oid}', group_key=key, ) dialog.last_status_close = cancel_status_close @@ -512,6 +525,44 @@ class OrderMode: return ids + def load_unknown_dialog_from_msg( + self, + msg: Status, + + ) -> Dialog: + # NOTE: the `.order` attr **must** be set with the + # equivalent order msg in order to be loaded. + order = msg.req + oid = str(msg.oid) + symbol = order.symbol + + # TODO: MEGA UGGG ZONEEEE! + src = msg.src + if ( + src + and src not in ('dark', 'paperboi') + and src not in symbol + ): + fqsn = symbol + '.' + src + brokername = src + else: + fqsn = symbol + *head, brokername = fqsn.rsplit('.') + + # fill out complex fields + order.oid = str(order.oid) + order.brokers = [brokername] + order.symbol = Symbol.from_fqsn( + fqsn=fqsn, + info={}, + ) + dialog = self.submit_order( + send_msg=False, + order=order, + ) + assert self.dialogs[oid] == dialog + return dialog + @asynccontextmanager async def open_order_mode( @@ -549,6 +600,7 @@ async def open_order_mode( trades_stream, position_msgs, brokerd_accounts, + ems_dialog_msgs, ), trio.open_nursery() as tn, @@ -596,10 +648,10 @@ async def open_order_mode( sym = msg['symbol'] if ( - sym == symkey or - # mega-UGH, i think we need to fix the FQSN stuff sooner - # then later.. - sym == symkey.removesuffix(f'.{broker}') + (sym == symkey) or ( + # mega-UGH, i think we need to fix the FQSN + # stuff sooner then later.. + sym == symkey.removesuffix(f'.{broker}')) ): pps_by_account[acctid] = msg @@ -653,7 +705,7 @@ async def open_order_mode( # setup order mode sidepane widgets form: FieldsForm = chart.sidepane form.vbox.setSpacing( - int((1 + 5/8)*_font.px_size) + int((1 + 5 / 8) * _font.px_size) ) from ._feedstatus import mk_feed_label @@ -703,7 +755,7 @@ async def open_order_mode( order_pane.order_mode = mode # select a pp to track - tracker = trackers[pp_account] + tracker: PositionTracker = trackers[pp_account] mode.current_pp = tracker tracker.show() tracker.hide_info() @@ -755,151 +807,186 @@ async def open_order_mode( # to handle input since the ems connection is ready started.set() + for oid, msg in ems_dialog_msgs.items(): + + # HACK ALERT: ensure a resp field is filled out since + # techincally the call below expects a ``Status``. TODO: + # parse into proper ``Status`` equivalents ems-side? + # msg.setdefault('resp', msg['broker_details']['resp']) + # msg.setdefault('oid', msg['broker_details']['oid']) + msg['brokerd_msg'] = msg + + await process_trade_msg( + mode, + book, + msg, + ) + tn.start_soon( process_trades_and_update_ui, - tn, - feed, - mode, trades_stream, + mode, book, ) + yield mode async def process_trades_and_update_ui( - n: trio.Nursery, - feed: Feed, - mode: OrderMode, trades_stream: tractor.MsgStream, + mode: OrderMode, book: OrderBook, ) -> None: - get_index = mode.chart.get_index - global _pnl_tasks - # this is where we receive **back** messages # about executions **from** the EMS actor async for msg in trades_stream: + await process_trade_msg( + mode, + book, + msg, + ) - fmsg = pformat(msg) - log.info(f'Received order msg:\n{fmsg}') - name = msg['name'] - if name in ( - 'position', +async def process_trade_msg( + mode: OrderMode, + book: OrderBook, + msg: dict, + +) -> tuple[Dialog, Status]: + + get_index = mode.chart.get_index + fmsg = pformat(msg) + log.debug(f'Received order msg:\n{fmsg}') + name = msg['name'] + + if name in ( + 'position', + ): + sym = mode.chart.linked.symbol + pp_msg_symbol = msg['symbol'].lower() + fqsn = sym.front_fqsn() + broker, key = sym.front_feed() + if ( + pp_msg_symbol == fqsn + or pp_msg_symbol == fqsn.removesuffix(f'.{broker}') ): - sym = mode.chart.linked.symbol - pp_msg_symbol = msg['symbol'].lower() - fqsn = sym.front_fqsn() - broker, key = sym.front_feed() - if ( - pp_msg_symbol == fqsn - or pp_msg_symbol == fqsn.removesuffix(f'.{broker}') - ): - log.info(f'{fqsn} matched pp msg: {fmsg}') - tracker = mode.trackers[msg['account']] - tracker.live_pp.update_from_msg(msg) - # update order pane widgets - tracker.update_from_pp() - mode.pane.update_status_ui(tracker) + log.info(f'{fqsn} matched pp msg: {fmsg}') + tracker = mode.trackers[msg['account']] + tracker.live_pp.update_from_msg(msg) + # update order pane widgets + tracker.update_from_pp() + mode.pane.update_status_ui(tracker) - if tracker.live_pp.size: - # display pnl - mode.pane.display_pnl(tracker) + if tracker.live_pp.size: + # display pnl + mode.pane.display_pnl(tracker) - # short circuit to next msg to avoid - # unnecessary msg content lookups - continue + # short circuit to next msg to avoid + # unnecessary msg content lookups + return - resp = msg['resp'] - oid = msg['oid'] + msg = Status(**msg) + resp = msg.resp + oid = msg.oid + dialog: Dialog = mode.dialogs.get(oid) - dialog = mode.dialogs.get(oid) - if dialog is None: - log.warning(f'received msg for untracked dialog:\n{fmsg}') + match msg: + case Status(resp='dark_open' | 'open'): - # TODO: enable pure tracking / mirroring of dialogs - # is desired. - continue + if dialog is not None: + # show line label once order is live + mode.on_submit(oid) - # record message to dialog tracking - dialog.msgs[oid] = msg + else: + log.warning( + f'received msg for untracked dialog:\n{fmsg}' + ) + assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}' - # response to 'action' request (buy/sell) - if resp in ( - 'dark_submitted', - 'broker_submitted' - ): + sym = mode.chart.linked.symbol + fqsn = sym.front_fqsn() + order = Order(**msg.req) + if ( + ((order.symbol + f'.{msg.src}') == fqsn) - # show line label once order is live - mode.on_submit(oid) + # a existing dark order for the same symbol + or ( + order.symbol == fqsn + and ( + msg.src in ('dark', 'paperboi') + or (msg.src in fqsn) - # resp to 'cancel' request or error condition - # for action request - elif resp in ( - 'broker_inactive', - 'broker_errored', - ): + ) + ) + ): + msg.req = order + dialog = mode.load_unknown_dialog_from_msg(msg) + mode.on_submit(oid) + # return dialog, msg + + case Status(resp='error'): # delete level line from view mode.on_cancel(oid) - broker_msg = msg['brokerd_msg'] + broker_msg = msg.brokerd_msg log.error( f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' ) - elif resp in ( - 'broker_cancelled', - 'dark_cancelled' - ): + case Status(resp='canceled'): # delete level line from view mode.on_cancel(oid) - broker_msg = msg['brokerd_msg'] - log.cancel( - f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' - ) + req = Order(**msg.req) + log.cancel(f'Canceled {req.action}:{oid}') - elif resp in ( - 'dark_triggered' + case Status( + resp='triggered', + # req=Order(exec_mode='dark') # TODO: + req={'exec_mode': 'dark'}, ): + # TODO: UX for a "pending" clear/live order log.info(f'Dark order triggered for {fmsg}') - elif resp in ( - 'alert_triggered' + case Status( + resp='triggered', + # req=Order(exec_mode='live', action='alert') as req, # TODO + req={'exec_mode': 'live', 'action': 'alert'} as req, ): # should only be one "fill" for an alert # add a triangle and remove the level line + req = Order(**req) mode.on_fill( oid, - price=msg['trigger_price'], + price=req.price, arrow_index=get_index(time.time()), ) mode.lines.remove_line(uuid=oid) + msg.req = req await mode.on_exec(oid, msg) - # response to completed 'action' request for buy/sell - elif resp in ( - 'broker_executed', + # response to completed 'dialog' for order request + case Status( + resp='closed', + # req=Order() as req, # TODO + req=req, ): - # right now this is just triggering a system alert + msg.req = Order(**req) await mode.on_exec(oid, msg) - - if msg['brokerd_msg']['remaining'] == 0: - mode.lines.remove_line(uuid=oid) + mode.lines.remove_line(uuid=oid) # each clearing tick is responded individually - elif resp in ( - 'broker_filled', - ): + case Status(resp='fill'): + # handle out-of-piker fills reporting? known_order = book._sent_orders.get(oid) if not known_order: log.warning(f'order {oid} is unknown') - continue + return action = known_order.action - details = msg['brokerd_msg'] + details = msg.brokerd_msg # TODO: some kinda progress system mode.on_fill( @@ -914,3 +1001,9 @@ async def process_trades_and_update_ui( # TODO: how should we look this up? # tracker = mode.trackers[msg['account']] # tracker.live_pp.fills.append(msg) + + # record message to dialog tracking + if dialog: + dialog.msgs[oid] = msg + + return dialog, msg