diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 96f8572d..b7c121e7 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -60,6 +60,8 @@ from piker.pp import ( ) from piker.log import get_console_log from piker.clearing._messages import ( + Order, + Status, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, @@ -184,7 +186,7 @@ async def handle_order_requests( ) ) - elif action == 'cancel': + if action == 'cancel': msg = BrokerdCancel(**request_msg) client.submit_cancel(reqid=int(msg.reqid)) @@ -491,43 +493,43 @@ async def trades_dialogue( order = trade.order quant = trade.order.totalQuantity + action = order.action.lower() size = { - 'SELL': -1, - 'BUY': 1, - }[order.action] * quant - fqsn, _ = con2fqsn(trade.contract) + 'sell': -1, + 'buy': 1, + }[action] * quant + con = trade.contract + + # TODO: in the case of the SMART venue (aka ib's + # router-clearing sys) we probably should handle + # showing such orders overtop of the fqsn for the + # primary exchange, how to map this easily is going + # to be a bit tricky though? + deats = await proxy.con_deats(contracts=[con]) + fqsn = list(deats)[0] + reqid = order.orderId # TODO: maybe embed a ``BrokerdOrder`` instead # since then we can directly load it on the client # side in the order mode loop? - msg = BrokerdStatus( + msg = Status( + time_ns=time.time_ns(), + resp='open', + oid=str(reqid), reqid=reqid, - time_ns=(ts := time.time_ns()), - status='submitted', - account=accounts_def.inverse[order.account], - filled=0, - reason='Existing live order', - # this seems to not be necessarily up to date in - # the execDetails event.. so we have to send it - # here I guess? - remaining=quant, - broker_details={ - 'name': 'ib', - 'fqsn': fqsn, - # this is a embedded/boxed order - # msg that can be loaded by the ems - # and for relay to clients. - 'order': BrokerdOrder( - symbol=fqsn, - account=accounts_def.inverse[order.account], - oid=reqid, - time_ns=ts, - size=size, - price=order.lmtPrice, - ), - }, + # embedded order info + req=Order( + action=action, + exec_mode='live', + oid=str(reqid), + symbol=fqsn, + account=accounts_def.inverse[order.account], + price=order.lmtPrice, + size=size, + ), + src='ib', ) order_msgs.append(msg) @@ -686,6 +688,7 @@ async def trades_dialogue( # allocate event relay tasks for each client connection n.start_soon( deliver_trade_events, + n, trade_event_stream, ems_stream, accounts_def, @@ -779,6 +782,7 @@ _statuses: dict[str, str] = { async def deliver_trade_events( + nurse: trio.Nursery, trade_event_stream: trio.MemoryReceiveChannel, ems_stream: tractor.MsgStream, accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'` @@ -834,14 +838,35 @@ async def deliver_trade_events( # unwrap needed data from ib_insync internal types trade: Trade = item status: OrderStatus = trade.orderStatus - status_key = status.status.lower() + ib_status_key = status.status.lower() + + acctid = accounts_def.inverse[trade.order.account] # double check there is no error when # cancelling.. gawwwd - if status_key == 'cancelled': + if ib_status_key == 'cancelled': last_log = trade.log[-1] if last_log.message: - status_key = trade.log[-2].status + ib_status_key = trade.log[-2].status + + elif ib_status_key == 'inactive': + async def sched_cancel(): + log.warning( + 'OH GAWD an inactive order..scheduling a cancel\n' + f'{pformat(item)}' + ) + proxy = proxies[acctid] + await proxy.submit_cancel(reqid=trade.order.orderId) + await trio.sleep(1) + nurse.start_soon(sched_cancel) + + nurse.start_soon(sched_cancel) + + status_key = _statuses.get(ib_status_key) or ib_status_key + + remaining = status.remaining + if remaining == 0: + status_key = 'closed' # skip duplicate filled updates - we get the deats # from the execution details event @@ -859,7 +884,7 @@ async def deliver_trade_events( # this seems to not be necessarily up to date in the # execDetails event.. so we have to send it here I guess? - remaining=status.remaining, + remaining=remaining, broker_details={'name': 'ib'}, ) @@ -1002,9 +1027,8 @@ async def deliver_trade_events( cid, msg = pack_position(item) log.info(f'New IB position msg: {msg}') - # acctid = msg.account = accounts_def.inverse[msg.account] # cuck ib and it's shitty fifo sys for pps! - # await ems_stream.send(msg) + continue case 'event': diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 857460d6..0411f026 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -27,6 +27,7 @@ from typing import ( AsyncIterator, Any, Callable, + Optional, ) from bidict import bidict @@ -43,9 +44,10 @@ from . import _paper_engine as paper from ._messages import ( Order, Status, + # Cancel, BrokerdCancel, BrokerdOrder, - BrokerdOrderAck, + # BrokerdOrderAck, BrokerdStatus, BrokerdFill, BrokerdError, @@ -130,6 +132,7 @@ class _DarkBook(Struct): ] = {} # _ems_entries: dict[str, str] = {} + _active: dict = {} # mapping of ems dialog ids to msg flow history _msgflows: defaultdict[ @@ -192,6 +195,7 @@ async def clear_dark_triggers( for oid, ( pred, tf, + # TODO: send this msg instead? cmd, percent_away, abs_diff_away @@ -211,30 +215,29 @@ async def clear_dark_triggers( # majority of iterations will be non-matches continue + brokerd_msg: Optional[BrokerdOrder] = None match cmd: # alert: nothing to do but relay a status # back to the requesting ems client - case { - 'action': 'alert', - }: - resp = 'alert_triggered' + case Order(action='alert'): + resp = 'triggered' # executable order submission - case { - 'action': action, - 'symbol': symbol, - 'account': account, - 'size': size, - }: + case Order( + action=action, + symbol=symbol, + account=account, + size=size, + ): bfqsn: str = symbol.replace(f'.{broker}', '') submit_price = price + abs_diff_away - resp = 'dark_triggered' # hidden on client-side + resp = 'triggered' # hidden on client-side log.info( f'Dark order triggered for price {price}\n' f'Submitting order @ price {submit_price}') - live_req = BrokerdOrder( + brokerd_msg = BrokerdOrder( action=action, oid=oid, account=account, @@ -243,7 +246,8 @@ async def clear_dark_triggers( price=submit_price, size=size, ) - await brokerd_orders_stream.send(live_req) + + await brokerd_orders_stream.send(brokerd_msg) # mark this entry as having sent an order # request. the entry will be replaced once the @@ -252,18 +256,18 @@ async def clear_dark_triggers( # allocated unique ``BrokerdOrderAck.reqid`` key # generated by the broker's own systems. # book._ems_entries[oid] = live_req - book._msgflows[oid].append(live_req) + # book._msgflows[oid].maps.insert(0, live_req) case _: raise ValueError(f'Invalid dark book entry: {cmd}') # fallthrough logic - resp = Status( + status = Status( oid=oid, # ems dialog id time_ns=time.time_ns(), resp=resp, - trigger_price=price, - brokerd_msg=cmd, + req=cmd, + brokerd_msg=brokerd_msg, ) # remove exec-condition from set @@ -274,9 +278,18 @@ async def clear_dark_triggers( f'pred for {oid} was already removed!?' ) + # update actives + if cmd.action == 'alert': + # don't register the alert status (so it won't + # be reloaded by clients) since it's now + # complete / closed. + book._active.pop(oid) + else: + book._active[oid] = status + # send response to client-side try: - await ems_client_order_stream.send(resp) + await ems_client_order_stream.send(status) except ( trio.ClosedResourceError, ): @@ -396,6 +409,22 @@ class Router(Struct): relay.consumers -= 1 + async def client_broadcast( + self, + msg: dict, + + ) -> None: + for client_stream in self.clients.copy(): + try: + await client_stream.send(msg) + except( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + self.clients.remove(client_stream) + log.warning( + f'client for {client_stream} was already closed?') + _router: Router = None @@ -570,8 +599,7 @@ async def translate_and_relay_brokerd_events( broker ems 'error' -> log it locally (for now) - 'status' -> relabel as 'broker_', if complete send 'executed' - 'fill' -> 'broker_filled' + ('status' | 'fill'} -> relayed through see ``Status`` msg type. Currently handled status values from IB: {'presubmitted', 'submitted', 'cancelled', 'inactive'} @@ -610,31 +638,16 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - for client_stream in router.clients.copy(): - try: - await client_stream.send(pos_msg) - except( - trio.ClosedResourceError, - trio.BrokenResourceError, - ): - router.clients.remove(client_stream) - log.warning( - f'client for {client_stream} was already closed?') - + await router.client_broadcast(pos_msg) continue # BrokerdOrderAck + # initial response to brokerd order request case { 'name': 'ack', 'reqid': reqid, # brokerd generated order-request id 'oid': oid, # ems order-dialog id - } if ( - # entry := book._ems_entries.get(oid) - flow := book._msgflows.get(oid) - ): - # initial response to brokerd order request - # if name == 'ack': - + }: # register the brokerd request id (that was generated # / created internally by the broker backend) with our # local ems order id for reverse lookup later. @@ -649,31 +662,24 @@ async def translate_and_relay_brokerd_events( # new order which has not yet be registered into the # local ems book, insert it now and handle 2 cases: - # - the order has previously been requested to be + # 1. the order has previously been requested to be # cancelled by the ems controlling client before we # received this ack, in which case we relay that cancel # signal **asap** to the backend broker - action = flow.get('action') - # action = getattr(entry, 'action', None) - if action and action == 'cancel': + # status = book._active.get(oid) + status = book._active[oid] + req = status.req + if req and req.action == 'cancel': # assign newly providerd broker backend request id - flow['reqid'] = reqid - # entry.reqid = reqid + # and tell broker to cancel immediately + status.reqid = reqid + await brokerd_trades_stream.send(req) - entry = flow.maps[0] - - # tell broker to cancel immediately - await brokerd_trades_stream.send(entry) - - # - the order is now active and will be mirrored in + # 2. the order is now active and will be mirrored in # our book -> registered as live flow else: - # update the flow with the ack msg - # book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) - flow.maps.insert( - 0, - BrokerdOrderAck(**brokerd_msg).to_dict() - ) + # TODO: should we relay this ack state? + status.resp = 'pending' # no msg to client necessary continue @@ -684,13 +690,10 @@ async def translate_and_relay_brokerd_events( 'oid': oid, # ems order-dialog id 'reqid': reqid, # brokerd generated order-request id 'symbol': sym, - 'broker_details': details, - # 'reason': reason, - }: + } if status_msg := book._active.get(oid): + msg = BrokerdError(**brokerd_msg) - resp = 'broker_errored' log.error(pformat(msg)) # XXX make one when it's blank? - book._msgflows[oid].maps.insert(0, msg.to_dict()) # TODO: figure out how this will interact with EMS clients # for ex. on an error do we react with a dark orders @@ -699,141 +702,132 @@ async def translate_and_relay_brokerd_events( # some unexpected failure - something we need to think more # about. In most default situations, with composed orders # (ex. brackets), most brokers seem to use a oca policy. + ems_client_order_stream = router.dialogues[oid] + status_msg.resp = 'error' + status_msg.brokerd_msg = msg + book._active[oid] = status_msg + await ems_client_order_stream.send(status_msg) # BrokerdStatus case { 'name': 'status', 'status': status, 'reqid': reqid, # brokerd generated order-request id - # TODO: feels like the wrong msg for this field? - 'remaining': remaining, } if ( - oid := book._ems2brokerd_ids.inverse.get(reqid) + (oid := book._ems2brokerd_ids.inverse.get(reqid)) + and status in ( + 'canceled', + 'open', + 'closed', + ) ): - - # ack = book._ems_entries[oid] - # ack = book._msgflows[oid].maps[0] msg = BrokerdStatus(**brokerd_msg) - # TODO: should we flatten out these cases and/or should - # they maybe even eventually be separate messages? - if status == 'cancelled': + # TODO: maybe pack this into a composite type that + # contains both the IPC stream as well the + # msg-chain/dialog. + ems_client_order_stream = router.dialogues[oid] + status_msg = book._active[oid] + status_msg.resp = status + + # retrieve existing live flow + old_reqid = status_msg.reqid + if old_reqid and old_reqid != reqid: + log.warning( + f'Brokerd order id change for {oid}:\n' + f'{old_reqid} -> {reqid}' + ) + + status_msg.reqid = reqid # THIS LINE IS CRITICAL! + status_msg.brokerd_msg = msg + status_msg.src = msg.broker_details['name'] + await ems_client_order_stream.send(status_msg) + + if status == 'closed': + log.info(f'Execution for {oid} is complete!') + status_msg = book._active.pop(oid) + + elif status == 'canceled': log.info(f'Cancellation for {oid} is complete!') - if status == 'filled': - # conditional execution is fully complete, no more - # fills for the noted order - if not remaining: - - resp = 'broker_executed' - - # be sure to pop this stream from our dialogue set - # since the order dialogue should be done. - log.info(f'Execution for {oid} is complete!') - - # remove from active flows - book._msgflows.pop(oid) - + else: # open + # relayed from backend but probably not handled so # just log it - else: - log.info(f'{broker} filled {msg}') + log.info(f'{broker} opened order {msg}') - else: - # one of {submitted, cancelled} - resp = 'broker_' + msg.status - - - # book._ems_entries[oid] = msg - book._msgflows[oid].maps.insert(0, msg.to_dict()) - - # TODO: i wonder if we should just support receiving an - # actual ``BrokerdOrder`` msg here? Is it a bad idea to - # presume that inbound orders on the backend dialog can be - # used to drive order tracking/tracing in the EMS *over* - # a set of backends from some other non-ems owner? - # this will likely feel better once we get open_msg_scope() - # or wtv finished. - - # BrokerdStatus containing an embedded order msg which + # ``Status`` containing an embedded order msg which # should be loaded as a "pre-existing open order" from the # brokerd backend. case { 'name': 'status', - 'status': status, + 'resp': status, 'reqid': reqid, # brokerd generated order-request id - 'broker_details': details, }: - # TODO: we probably want some kind of "tagging" system - # for external order submissions like this eventually - # to be able to more formally handle multi-player - # trading... - - if status != 'submitted': + if ( + status != 'open' + ): + # TODO: check for an oid we might know since it was + # registered from a previous order/status load? log.error( - f'Unknown status msg:\n' + f'Unknown/transient status msg:\n' f'{pformat(brokerd_msg)}\n' 'Unable to relay message to client side!?' ) + # TODO: we probably want some kind of "tagging" system + # for external order submissions like this eventually + # to be able to more formally handle multi-player + # trading... else: # existing open backend order which we broadcast to # all currently connected clients. - order_dict = brokerd_msg['broker_details'].pop('order') - order = BrokerdOrder(**order_dict) - msg = BrokerdStatus(**brokerd_msg) log.info( f'Relaying existing open order:\n {brokerd_msg}' ) # use backend request id as our ems id though this # may end up with collisions? - broker = details['name'] - oid = str(reqid) - # attempt to avoid collisions - msg.reqid = oid + status_msg = Status(**brokerd_msg) + order = Order(**status_msg.req) + assert order.price and order.size + status_msg.req = order - # XXX: MEGA HACK ALERT FOR the dialog entries delivery - # on client connect... - # TODO: fix this garbage.. - msg.broker_details['resp'] = resp = 'broker_submitted' + assert status_msg.src # source tag? + oid = str(status_msg.reqid) + + # attempt to avoid collisions + status_msg.reqid = oid + assert status_msg.resp == 'open' # register this existing broker-side dialog book._ems2brokerd_ids[oid] = reqid - # book._ems_entries[oid] = msg - - # fill in approximate msg flow history - flow = book._msgflows[oid] - flow.maps.insert(0, order.to_dict()) - flow.maps.insert(0, msg.to_dict()) - flow.maps.insert(0, details) - flattened = dict(flow) - # await tractor.breakpoint() + book._active[oid] = status_msg # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - for client_stream in router.clients.copy(): - try: - await client_stream.send(flattened) - # Status( - # oid=oid, - # resp=resp, - # time_ns=time.time_ns(), - # broker_reqid=reqid, - # brokerd_msg=flattened, - # ) - # ) - except( - trio.ClosedResourceError, - trio.BrokenResourceError, - ): - router.clients.remove(client_stream) - log.warning( - f'client for {client_stream} was already closed?') + await router.client_broadcast(status_msg) # don't fall through continue + # TOO FAST ``BrokerdStatus`` that arrives + # before the ``BrokerdAck``. + case { + # XXX: sometimes there is a race with the backend (like + # `ib` where the pending stauts will be related before + # the ack, in which case we just ignore the faster + # pending msg and wait for our expected ack to arrive + # later (i.e. the first block below should enter). + 'name': 'status', + 'status': status, + 'reqid': reqid, + }: + log.warning( + 'Unhandled broker status:\n' + f'{pformat(brokerd_msg)}\n' + ) + # BrokerdFill case { 'name': 'fill', @@ -843,40 +837,18 @@ async def translate_and_relay_brokerd_events( oid := book._ems2brokerd_ids.inverse.get(reqid) ): # proxy through the "fill" result(s) + log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') msg = BrokerdFill(**brokerd_msg) - resp = 'broker_filled' - log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + ems_client_order_stream = router.dialogues[oid] + status_msg = book._active[oid] + status_msg.resp = 'fill' + status_msg.reqid = reqid + status_msg.brokerd_msg = msg + await ems_client_order_stream.send(status_msg) case _: raise ValueError(f'Brokerd message {brokerd_msg} is invalid') - # retrieve existing live flow - # entry = book._ems_entries[oid] - # assert entry.oid == oid # from when we only stored the first ack - # old_reqid = entry.reqid - # if old_reqid and old_reqid != reqid: - # log.warning( - # f'Brokerd order id change for {oid}:\n' - # f'{old_reqid} -> {reqid}' - # ) - - # Create and relay response status message - # to requesting EMS client - try: - ems_client_order_stream = router.dialogues[oid] - await ems_client_order_stream.send( - Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=msg, - ) - ) - except KeyError: - log.error( - f'Received `brokerd` msg for unknown client oid: {oid}') - # TODO: do we want this to keep things cleaned up? # it might require a special status from brokerd to affirm the # flow is complete? @@ -910,23 +882,27 @@ async def process_client_order_cmds( # others who are registered for such order affiliated msgs). client_dialogues[oid] = client_order_stream reqid = dark_book._ems2brokerd_ids.inverse.get(oid) - # live_entry = dark_book._ems_entries.get(oid) - live_entry = dark_book._msgflows.get(oid) + + # any dark/live status which is current + status = dark_book._active.get(oid) match cmd: # existing live-broker order cancel case { 'action': 'cancel', 'oid': oid, - } if live_entry: - # reqid = live_entry.reqid - reqid = live_entry['reqid'] - msg = BrokerdCancel( + } if ( + (status := dark_book._active.get(oid)) + and status.resp in ('open', 'pending') + ): + reqid = status.reqid + order = status.req + to_brokerd_msg = BrokerdCancel( oid=oid, reqid=reqid, time_ns=time.time_ns(), # account=live_entry.account, - account=live_entry['account'], + account=order.account, ) # NOTE: cancel response will be relayed back in messages @@ -936,39 +912,52 @@ async def process_client_order_cmds( log.info( f'Submitting cancel for live order {reqid}' ) - await brokerd_order_stream.send(msg) + await brokerd_order_stream.send(to_brokerd_msg) else: # this might be a cancel for an order that hasn't been # acked yet by a brokerd, so register a cancel for when # the order ack does show up later such that the brokerd # order request can be cancelled at that time. - dark_book._ems_entries[oid] = msg - live_entry.maps.insert(0, msg.to_dict()) + # dark_book._ems_entries[oid] = msg + # special case for now.. + status.req = to_brokerd_msg # dark trigger cancel case { 'action': 'cancel', 'oid': oid, - } if not live_entry: - # try: + } if ( + status and status.resp == 'dark_open' + # or status and status.req + ): # remove from dark book clearing - dark_book.orders[symbol].pop(oid, None) + entry = dark_book.orders[symbol].pop(oid, None) + if entry: + ( + pred, + tickfilter, + cmd, + percent_away, + abs_diff_away + ) = entry - # tell client side that we've cancelled the - # dark-trigger order - await client_order_stream.send( - Status( - resp='dark_cancelled', - oid=oid, - time_ns=time.time_ns(), - ) - ) - # de-register this client dialogue - router.dialogues.pop(oid) + # tell client side that we've cancelled the + # dark-trigger order + status.resp = 'canceled' + status.req = cmd - # except KeyError: - # log.exception(f'No dark order for {symbol}?') + await client_order_stream.send(status) + # de-register this client dialogue + router.dialogues.pop(oid) + dark_book._active.pop(oid) + + else: + log.exception(f'No dark order for {symbol}?') + + # TODO: eventually we should be receiving + # this struct on the wire unpacked in a scoped protocol + # setup with ``tractor``. # live order submission case { @@ -977,11 +966,9 @@ async def process_client_order_cmds( 'price': trigger_price, 'size': size, 'action': ('buy' | 'sell') as action, - 'exec_mode': 'live', + 'exec_mode': ('live' | 'paper'), }: - # TODO: eventually we should be receiving - # this struct on the wire unpacked in a scoped protocol - # setup with ``tractor``. + # TODO: relay this order msg directly? req = Order(**cmd) broker = req.brokers[0] @@ -990,17 +977,13 @@ async def process_client_order_cmds( # aren't expectig their own name, but should they? sym = fqsn.replace(f'.{broker}', '') - if live_entry is not None: - # sanity check on emsd id, but it won't work - # for pre-existing orders that we load since - # the only msg will be a ``BrokerdStatus`` - # assert live_entry.oid == oid - - # reqid = live_entry.reqid - reqid = live_entry['reqid'] + if status is not None: # if we already had a broker order id then # this is likely an order update commmand. log.info(f"Modifying live {broker} order: {reqid}") + reqid = status.reqid + status.req = req + status.resp = 'pending' msg = BrokerdOrder( oid=oid, # no ib support for oids... @@ -1017,6 +1000,18 @@ async def process_client_order_cmds( account=req.account, ) + if status is None: + status = Status( + oid=oid, + reqid=reqid, + resp='pending', + time_ns=time.time_ns(), + brokerd_msg=msg, + req=req, + ) + + dark_book._active[oid] = status + # send request to backend # XXX: the trades data broker response loop # (``translate_and_relay_brokerd_events()`` above) will @@ -1032,8 +1027,7 @@ async def process_client_order_cmds( # client, before that ack, when the ack does arrive we # immediately take the reqid from the broker and cancel # that live order asap. - # dark_book._ems_entries[oid] = msg - dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) + # dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) # dark-order / alert submission case { @@ -1049,9 +1043,11 @@ async def process_client_order_cmds( # submit order to local EMS book and scan loop, # effectively a local clearing engine, which # scans for conditions and triggers matching executions - exec_mode in ('dark', 'paper') + exec_mode in ('dark',) or action == 'alert' ): + req = Order(**cmd) + # Auto-gen scanner predicate: # we automatically figure out what the alert check # condition should be based on the current first @@ -1098,23 +1094,25 @@ async def process_client_order_cmds( )[oid] = ( pred, tickfilter, - cmd, + req, percent_away, abs_diff_away ) - resp = 'dark_submitted' + resp = 'dark_open' # alerts have special msgs to distinguish - if action == 'alert': - resp = 'alert_submitted' + # if action == 'alert': + # resp = 'open' - await client_order_stream.send( - Status( - resp=resp, - oid=oid, - time_ns=time.time_ns(), - ) + status = Status( + resp=resp, + oid=oid, + time_ns=time.time_ns(), + req=req, + src='dark', ) + dark_book._active[oid] = status + await client_order_stream.send(status) @tractor.context @@ -1206,35 +1204,12 @@ async def _emsd_main( brokerd_stream = relay.brokerd_dialogue # .clone() - # convert dialogs to status msgs for client delivery - statuses = {} - # for oid, msg in book._ems_entries.items(): - for oid, msgflow in book._msgflows.items(): - # we relay to the client side a msg that contains - # all data flattened from the message history. - # status = msgflow['status'] - flattened = dict(msgflow) - # status = flattened['status'] - flattened.pop('brokerd_msg', None) - statuses[oid] = flattened - # Status( - # oid=oid, - # time_ns=flattened['time_ns'], - # # time_ns=msg.time_ns, - # # resp=f'broker_{msg.status}', - # resp=f'broker_{status}', - # # trigger_price=msg.order.price, - # trigger_price=flattened['price'], - # brokerd_msg=flattened, - # ) - # await tractor.breakpoint() - # signal to client that we're started and deliver # all known pps and accounts for this ``brokerd``. await ems_ctx.started(( relay.positions, list(relay.accounts), - statuses, + book._active, )) # establish 2-way stream with requesting order-client and diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index ffd46ff2..2c0d95c8 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -18,56 +18,99 @@ Clearing sub-system message and protocols. """ -from typing import Optional, Union +from collections import ( + ChainMap, + deque, +) +from typing import ( + Optional, + Literal, + Union, +) from ..data._source import Symbol from ..data.types import Struct +# TODO: a composite for tracking msg flow on 2-legged +# dialogs. +# class Dialog(ChainMap): +# ''' +# Msg collection abstraction to easily track the state changes of +# a msg flow in one high level, query-able and immutable construct. + +# The main use case is to query data from a (long-running) +# msg-transaction-sequence + + +# ''' +# def update( +# self, +# msg, +# ) -> None: +# self.maps.insert(0, msg.to_dict()) + +# def flatten(self) -> dict: +# return dict(self) + + # TODO: ``msgspec`` stuff worth paying attention to: # - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution +# - for eg. ``BrokerdStatus``, instead just have separate messages? # - use literals for a common msg determined by diff keys? # - https://jcristharif.com/msgspec/usage.html#literal -# - for eg. ``BrokerdStatus``, instead just have separate messages? # -------------- # Client -> emsd # -------------- -class Cancel(Struct): - '''Cancel msg for removing a dark (ems triggered) or - broker-submitted (live) trigger/order. - - ''' - action: str = 'cancel' - oid: str # uuid4 - symbol: str - - class Order(Struct): - # TODO: use ``msgspec.Literal`` + # TODO: ideally we can combine these 2 fields into + # 1 and just use the size polarity to determine a buy/sell. + # i would like to see this become more like # https://jcristharif.com/msgspec/usage.html#literal - action: str # {'buy', 'sell', 'alert'} + # action: Literal[ + # 'live', + # 'dark', + # 'alert', + # ] + + action: Literal[ + 'buy', + 'sell', + 'alert', + ] + # determines whether the create execution + # will be submitted to the ems or directly to + # the backend broker + exec_mode: Literal[ + 'dark', + 'live', + # 'paper', no right? + ] + # internal ``emdsd`` unique "order id" oid: str # uuid4 symbol: Union[str, Symbol] account: str # should we set a default as '' ? price: float - # TODO: could we drop the ``.action`` field above and instead just - # use +/- values here? Would make the msg smaller at the sake of a - # teensie fp precision? - size: float - brokers: list[str] + size: float # -ve is "sell", +ve is "buy" - # Assigned once initial ack is received - # ack_time_ns: Optional[int] = None + brokers: Optional[list[str]] = [] - # determines whether the create execution - # will be submitted to the ems or directly to - # the backend broker - exec_mode: str # {'dark', 'live'} + +class Cancel(Struct): + ''' + Cancel msg for removing a dark (ems triggered) or + broker-submitted (live) trigger/order. + + ''' + action: str = 'cancel' + oid: str # uuid4 + symbol: str + req: Optional[Order] = None # -------------- @@ -79,35 +122,30 @@ class Order(Struct): class Status(Struct): name: str = 'status' - oid: str # uuid4 time_ns: int - # { - # 'dark_submitted', - # 'dark_cancelled', - # 'dark_triggered', + resp: Literal[ + 'pending', # acked but not yet open + 'open', + 'dark_open', # live in dark loop + 'triggered', # dark-submitted to brokerd-backend + 'closed', # fully cleared all size/units + 'fill', # partial execution + 'canceled', + 'error', + ] - # 'broker_submitted', - # 'broker_cancelled', - # 'broker_executed', - # 'broker_filled', - # 'broker_errored', - - # 'alert_submitted', - # 'alert_triggered', - - # } - resp: str # "response", see above - - # trigger info - trigger_price: Optional[float] = None - # price: float - - # broker: Optional[str] = None + oid: str # uuid4 # this maps normally to the ``BrokerdOrder.reqid`` below, an id # normally allocated internally by the backend broker routing system - broker_reqid: Optional[Union[int, str]] = None + reqid: Optional[Union[int, str]] = None + + # the (last) source order/request msg if provided + # (eg. the Order/Cancel which causes this msg) + req: Optional[Union[Order, Cancel]] = None + + src: Optional[str] = None # for relaying backend msg data "through" the ems layer brokerd_msg: dict = {} @@ -185,20 +223,19 @@ class BrokerdStatus(Struct): name: str = 'status' reqid: Union[int, str] time_ns: int + status: Literal[ + 'open', + 'canceled', + 'fill', + 'pending', + ] - # TODO: instead (ack, pending, open, fill, clos(ed), cancelled) - # { - # 'submitted', # open - # 'cancelled', # canceled - # 'filled', # closed - # } - status: str account: str filled: float = 0.0 reason: str = '' remaining: float = 0.0 - external: bool = False + # external: bool = False # order: Optional[BrokerdOrder] = None # XXX: better design/name here? @@ -206,7 +243,7 @@ class BrokerdStatus(Struct): # event that wasn't originated by piker's emsd (eg. some external # trading system which does it's own order control but that you # might want to "track" using piker UIs/systems). - external: bool = False + # external: bool = False # XXX: not required schema as of yet broker_details: dict = { diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 2160bfca..e4fbb1bb 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -45,8 +45,13 @@ from ..data._normalize import iterticks from ..data._source import unpack_fqsn from ..log import get_logger from ._messages import ( - BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, - BrokerdFill, BrokerdPosition, BrokerdError + BrokerdCancel, + BrokerdOrder, + BrokerdOrderAck, + BrokerdStatus, + BrokerdFill, + BrokerdPosition, + BrokerdError, ) @@ -94,6 +99,10 @@ class PaperBoi: ''' is_modify: bool = False + if action == 'alert': + # bypass all fill simulation + return reqid + entry = self._reqids.get(reqid) if entry: # order is already existing, this is a modify @@ -104,10 +113,6 @@ class PaperBoi: # register order internally self._reqids[reqid] = (oid, symbol, action, price) - if action == 'alert': - # bypass all fill simulation - return reqid - # TODO: net latency model # we checkpoint here quickly particulalry # for dark orders since we want the dark_executed @@ -119,7 +124,9 @@ class PaperBoi: size = -size msg = BrokerdStatus( - status='submitted', + status='open', + # account=f'paper_{self.broker}', + account='paper', reqid=reqid, time_ns=time.time_ns(), filled=0.0, @@ -136,7 +143,14 @@ class PaperBoi: ) or ( action == 'sell' and (clear_price := self.last_bid[0]) >= price ): - await self.fake_fill(symbol, clear_price, size, action, reqid, oid) + await self.fake_fill( + symbol, + clear_price, + size, + action, + reqid, + oid, + ) else: # register this submissions as a paper live order @@ -178,7 +192,9 @@ class PaperBoi: await trio.sleep(0.05) msg = BrokerdStatus( - status='cancelled', + status='canceled', + # account=f'paper_{self.broker}', + account='paper', reqid=reqid, time_ns=time.time_ns(), broker_details={'name': 'paperboi'}, @@ -230,25 +246,23 @@ class PaperBoi: self._trade_ledger.update(fill_msg.to_dict()) if order_complete: - msg = BrokerdStatus( - reqid=reqid, time_ns=time.time_ns(), - - status='filled', + # account=f'paper_{self.broker}', + account='paper', + status='closed', filled=size, remaining=0 if order_complete else remaining, - - broker_details={ - 'paper_info': { - 'oid': oid, - }, - 'action': action, - 'size': size, - 'price': price, - 'name': self.broker, - }, + # broker_details={ + # 'paper_info': { + # 'oid': oid, + # }, + # 'action': action, + # 'size': size, + # 'price': price, + # 'name': self.broker, + # }, ) await self.ems_trades_stream.send(msg) @@ -393,69 +407,72 @@ async def handle_order_requests( # order_request: dict async for request_msg in ems_order_stream: - action = request_msg['action'] - - if action in {'buy', 'sell'}: - - account = request_msg['account'] - if account != 'paper': - log.error( - 'This is a paper account,' - ' only a `paper` selection is valid' - ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=f'Paper only. No account found: `{account}` ?', - )) - continue + # action = request_msg['action'] + match request_msg: + # if action in {'buy', 'sell'}: + case {'action': ('buy' | 'sell')}: + order = BrokerdOrder(**request_msg) + account = order.account + if account != 'paper': + log.error( + 'This is a paper account,' + ' only a `paper` selection is valid' + ) + await ems_order_stream.send(BrokerdError( + # oid=request_msg['oid'], + oid=order.oid, + # symbol=request_msg['symbol'], + symbol=order.symbol, + reason=f'Paper only. No account found: `{account}` ?', + )) + continue # validate - order = BrokerdOrder(**request_msg) + # order = BrokerdOrder(**request_msg) - if order.reqid is None: - reqid = str(uuid.uuid4()) - else: - reqid = order.reqid + # if order.reqid is None: + # reqid = + # else: + reqid = order.reqid or str(uuid.uuid4()) - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( - # ems order request id - oid=order.oid, + # ems order request id + oid=order.oid, - # broker specific request id - reqid=reqid, + # broker specific request id + reqid=reqid, + ) ) - ) - # call our client api to submit the order - reqid = await client.submit_limit( + # call our client api to submit the order + reqid = await client.submit_limit( - oid=order.oid, - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, - # XXX: by default 0 tells ``ib_insync`` methods that - # there is no existing order so ask the client to create - # a new one (which it seems to do by allocating an int - # counter - collision prone..) - reqid=reqid, - ) + # XXX: by default 0 tells ``ib_insync`` methods that + # there is no existing order so ask the client to create + # a new one (which it seems to do by allocating an int + # counter - collision prone..) + reqid=reqid, + ) - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) + # elif action == 'cancel': + case {'action': 'cancel'}: + msg = BrokerdCancel(**request_msg) + await client.submit_cancel( + reqid=msg.reqid + ) - await client.submit_cancel( - reqid=msg.reqid - ) - - else: - log.error(f'Unknown order command: {request_msg}') + case _: + log.error(f'Unknown order command: {request_msg}') @tractor.context diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index b9d23ab3..4e8d9e66 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -63,7 +63,7 @@ from ._forms import open_form_input_handling log = get_logger(__name__) -class OrderDialog(Struct): +class Dialog(Struct): ''' Trade dialogue meta-data describing the lifetime of an order submission to ``emsd`` from a chart. @@ -146,7 +146,7 @@ class OrderMode: current_pp: Optional[PositionTracker] = None active: bool = False name: str = 'order' - dialogs: dict[str, OrderDialog] = field(default_factory=dict) + dialogs: dict[str, Dialog] = field(default_factory=dict) _colors = { 'alert': 'alert_yellow', @@ -163,6 +163,7 @@ class OrderMode: ) -> LevelLine: level = order.price + print(f'SIZE: {order.size}') line = order_line( self.chart, @@ -175,7 +176,8 @@ class OrderMode: color=self._colors[order.action], dotted=True if ( - order.exec_mode == 'dark' and order.action != 'alert' + order.exec_mode == 'dark' + and order.action != 'alert' ) else False, **line_kwargs, @@ -265,7 +267,7 @@ class OrderMode: send_msg: bool = True, order: Optional[Order] = None, - ) -> OrderDialog: + ) -> Dialog: ''' Send execution order to EMS return a level line to represent the order on a chart. @@ -304,7 +306,7 @@ class OrderMode: uuid=order.oid, ) - dialog = OrderDialog( + dialog = Dialog( uuid=order.oid, order=order, symbol=order.symbol, @@ -373,7 +375,7 @@ class OrderMode: self, uuid: str - ) -> OrderDialog: + ) -> Dialog: ''' Order submitted status event handler. @@ -428,7 +430,7 @@ class OrderMode: self, uuid: str, - msg: Dict[str, Any], + msg: Status, ) -> None: @@ -452,7 +454,7 @@ class OrderMode: # TODO: add in standard fill/exec info that maybe we # pack in a broker independent way? - f'{msg["resp"]}: {msg["trigger_price"]}', + f'{msg.resp}: {msg.req.price}', ], ) log.runtime(result) @@ -524,53 +526,36 @@ class OrderMode: def load_unknown_dialog_from_msg( self, - # status: Status, - msg: dict, + msg: Status, - ) -> OrderDialog: + ) -> Dialog: - oid = str(msg['oid']) - # oid = str(status.oid) - - # bstatus = BrokerdStatus(**msg.brokerd_msg) # NOTE: the `.order` attr **must** be set with the # equivalent order msg in order to be loaded. - # border = BrokerdOrder(**bstatus.broker_details['order']) - # msg = msg['brokerd_msg'] + order = Order(**msg.req) + oid = str(msg.oid) + symbol = order.symbol - # size = border.size - size = msg['size'] - if size >= 0: - action = 'buy' + # TODO: MEGA UGGG ZONEEEE! + src = msg.src + if ( + src + and src != 'dark' + and src not in symbol + ): + fqsn = symbol + '.' + src + brokername = src else: - action = 'sell' + fqsn = symbol + *head, brokername = fqsn.rsplit('.') - # acct = border.account - # price = border.price - # price = msg['brokerd_msg']['price'] - symbol = msg['symbol'] - deats = msg['broker_details'] - brokername = deats['name'] - fqsn = ( - # deats['fqsn'] + '.' + deats['name'] - symbol + '.' + brokername - ) - symbol = Symbol.from_fqsn( + # fill out complex fields + order.oid = str(order.oid) + order.brokers = [brokername] + order.symbol = Symbol.from_fqsn( fqsn=fqsn, info={}, ) - # map to order composite-type - order = Order( - action=action, - price=msg['price'], - account=msg['account'], - size=size, - symbol=symbol, - brokers=[brokername], - oid=oid, - exec_mode='live', # dark or live - ) - dialog = self.submit_order( send_msg=False, order=order, @@ -770,7 +755,7 @@ async def open_order_mode( order_pane.order_mode = mode # select a pp to track - tracker = trackers[pp_account] + tracker: PositionTracker = trackers[pp_account] mode.current_pp = tracker tracker.show() tracker.hide_info() @@ -870,12 +855,13 @@ async def process_trade_msg( book: OrderBook, msg: dict, -) -> None: +) -> tuple[Dialog, Status]: get_index = mode.chart.get_index fmsg = pformat(msg) log.info(f'Received order msg:\n{fmsg}') name = msg['name'] + if name in ( 'position', ): @@ -901,105 +887,117 @@ async def process_trade_msg( # short circuit to next msg to avoid # unnecessary msg content lookups return - # continue - resp = msg['resp'] - oid = str(msg['oid']) - dialog = mode.dialogs.get(oid) + msg = Status(**msg) + resp = msg.resp + oid = msg.oid + dialog: Dialog = mode.dialogs.get(oid) - if dialog is None: - log.warning( - f'received msg for untracked dialog:\n{fmsg}' - ) - # dialog = mode.load_unknown_dialog_from_msg(Status(**msg)) - dialog = mode.load_unknown_dialog_from_msg(msg) + match msg: + case Status(resp='dark_open' | 'open'): - # record message to dialog tracking - dialog.msgs[oid] = msg + if dialog is not None: + # show line label once order is live + mode.on_submit(oid) - # response to 'action' request (buy/sell) - if resp in ( - 'dark_submitted', - 'broker_submitted' - ): - # show line label once order is live - mode.on_submit(oid) + else: + log.warning( + f'received msg for untracked dialog:\n{fmsg}' + ) + assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}' - # resp to 'cancel' request or error condition - # for action request - elif resp in ( - 'broker_inactive', - 'broker_errored', - ): - # delete level line from view - mode.on_cancel(oid) - broker_msg = msg['brokerd_msg'] - log.error( - f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' - ) + sym = mode.chart.linked.symbol + fqsn = sym.front_fqsn() + order = Order(**msg.req) + if ( + ((order.symbol + f'.{msg.src}') == fqsn) - elif resp in ( - 'broker_cancelled', - 'dark_cancelled' - ): - # delete level line from view - mode.on_cancel(oid) - broker_msg = msg['brokerd_msg'] - log.cancel( - f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' - ) + # a existing dark order for the same symbol + or ( + order.symbol == fqsn + and (msg.src == 'dark') or (msg.src in fqsn) + ) + ): + dialog = mode.load_unknown_dialog_from_msg(msg) + mode.on_submit(oid) - elif resp in ( - 'dark_triggered' - ): - log.info(f'Dark order triggered for {fmsg}') + case Status(resp='error'): + # delete level line from view + mode.on_cancel(oid) + broker_msg = msg.brokerd_msg + log.error( + f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' + ) - elif resp in ( - 'alert_triggered' - ): - # should only be one "fill" for an alert - # add a triangle and remove the level line - mode.on_fill( - oid, - price=msg['trigger_price'], - arrow_index=get_index(time.time()), - ) - mode.lines.remove_line(uuid=oid) - await mode.on_exec(oid, msg) + case Status(resp='canceled'): + # delete level line from view + mode.on_cancel(oid) + req = msg.req + log.cancel( + f'Canceled order {oid}:\n{pformat(req)}' + ) - # response to completed 'action' request for buy/sell - elif resp in ( - 'broker_executed', - ): - # right now this is just triggering a system alert - await mode.on_exec(oid, msg) + case Status( + resp='triggered', + # req=Order(exec_mode='dark') # TODO: + req={'exec_mode': 'dark'}, + ): + log.info(f'Dark order triggered for {fmsg}') - if msg['brokerd_msg']['remaining'] == 0: + case Status( + resp='triggered', + # req=Order(exec_mode='live', action='alert') as req, # TODO + req={'exec_mode': 'live', 'action': 'alert'} as req, + ): + # should only be one "fill" for an alert + # add a triangle and remove the level line + mode.on_fill( + oid, + price=req.price, + arrow_index=get_index(time.time()), + ) + mode.lines.remove_line(uuid=oid) + msg.req = Order(**req) + await mode.on_exec(oid, msg) + + # response to completed 'dialog' for order request + case Status( + resp='closed', + # req=Order() as req, # TODO + req=req, + ): + # right now this is just triggering a system alert + msg.req = Order(**req) + await mode.on_exec(oid, msg) mode.lines.remove_line(uuid=oid) - # each clearing tick is responded individually - elif resp in ( - 'broker_filled', - ): - known_order = book._sent_orders.get(oid) - if not known_order: - log.warning(f'order {oid} is unknown') - return - # continue + # each clearing tick is responded individually + case Status(resp='fill'): + known_order = book._sent_orders.get(oid) + if not known_order: + log.warning(f'order {oid} is unknown') + return + # continue - action = known_order.action - details = msg['brokerd_msg'] + action = known_order.action + details = msg.brokerd_msg - # TODO: some kinda progress system - mode.on_fill( - oid, - price=details['price'], - pointing='up' if action == 'buy' else 'down', + # TODO: some kinda progress system + mode.on_fill( + oid, + price=details['price'], + pointing='up' if action == 'buy' else 'down', - # TODO: put the actual exchange timestamp - arrow_index=get_index(details['broker_time']), - ) + # TODO: put the actual exchange timestamp + arrow_index=get_index(details['broker_time']), + ) - # TODO: how should we look this up? - # tracker = mode.trackers[msg['account']] - # tracker.live_pp.fills.append(msg) + # TODO: how should we look this up? + # tracker = mode.trackers[msg['account']] + # tracker.live_pp.fills.append(msg) + + # record message to dialog tracking + if dialog: + dialog.msgs[oid] = msg + + return dialog, msg