diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5f08cfa5..81288899 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -23,7 +23,11 @@ from dataclasses import dataclass, field from math import isnan from pprint import pformat import time -from typing import AsyncIterator, Callable +from typing import ( + AsyncIterator, + Any, + Callable, +) from bidict import bidict import trio @@ -170,10 +174,9 @@ async def clear_dark_triggers( ) ): price = tick.get('price') - ttype = tick['type'] - # update to keep new cmds informed book.lasts[sym] = price + ttype = tick['type'] for oid, ( pred, @@ -189,65 +192,62 @@ async def clear_dark_triggers( ttype not in tf or not pred(price) ): - log.debug( - f'skipping quote for {sym} ' - f'{pred}, {ttype} not in {tf}?, {pred(price)}' - ) + # log.runtime( + # f'skipping quote for {sym} ' + # f'{pred} -> {pred(price)}\n' + # f'{ttype} not in {tf}?' + # ) # majority of iterations will be non-matches continue - action: str = cmd['action'] - symbol: str = cmd['symbol'] - bfqsn: str = symbol.replace(f'.{broker}', '') + match cmd: + # alert: nothing to do but relay a status + # back to the requesting ems client + case { + 'action': 'alert', + }: + resp = 'alert_triggered' - if action == 'alert': - # nothing to do but relay a status - # message back to the requesting ems client - resp = 'alert_triggered' + # executable order submission + case { + 'action': action, + 'symbol': symbol, + 'account': account, + 'size': size, + }: + bfqsn: str = symbol.replace(f'.{broker}', '') + submit_price = price + abs_diff_away + resp = 'dark_triggered' # hidden on client-side - else: # executable order submission + log.info( + f'Dark order triggered for price {price}\n' + f'Submitting order @ price {submit_price}') - # submit_price = price + price*percent_away - submit_price = price + abs_diff_away + live_req = BrokerdOrder( + action=action, + oid=oid, + account=account, + time_ns=time.time_ns(), + symbol=bfqsn, + price=submit_price, + size=size, + ) + await brokerd_orders_stream.send(live_req) - log.info( - f'Dark order triggered for price {price}\n' - f'Submitting order @ price {submit_price}') + # mark this entry as having sent an order + # request. the entry will be replaced once the + # target broker replies back with + # a ``BrokerdOrderAck`` msg including the + # allocated unique ``BrokerdOrderAck.reqid`` key + # generated by the broker's own systems. + book._ems_entries[oid] = live_req - msg = BrokerdOrder( - action=cmd['action'], - oid=oid, - account=cmd['account'], - time_ns=time.time_ns(), + case _: + raise ValueError(f'Invalid dark book entry: {cmd}') - # this **creates** new order request for the - # underlying broker so we set a "broker - # request id" (``reqid`` kwarg) to ``None`` - # so that the broker client knows that we - # aren't trying to modify an existing - # order-request and instead create a new one. - reqid=None, - - symbol=bfqsn, - price=submit_price, - size=cmd['size'], - ) - await brokerd_orders_stream.send(msg) - - # mark this entry as having sent an order - # request. the entry will be replaced once the - # target broker replies back with - # a ``BrokerdOrderAck`` msg including the - # allocated unique ``BrokerdOrderAck.reqid`` key - # generated by the broker's own systems. - book._ems_entries[oid] = msg - - # our internal status value for client-side - # triggered "dark orders" - resp = 'dark_triggered' - - msg = Status( - oid=oid, # ems order id + # fallthrough logic + resp = Status( + oid=oid, # ems dialog id time_ns=time.time_ns(), resp=resp, trigger_price=price, @@ -262,13 +262,14 @@ async def clear_dark_triggers( f'pred for {oid} was already removed!?' ) + # send response to client-side try: - await ems_client_order_stream.send(msg) + await ems_client_order_stream.send(resp) except ( trio.ClosedResourceError, ): log.warning( - f'client {ems_client_order_stream} stream is broke' + f'{ems_client_order_stream} stream broke?' ) break @@ -572,98 +573,57 @@ async def translate_and_relay_brokerd_events( assert relay.brokerd_dialogue == brokerd_trades_stream + brokerd_msg: dict[str, Any] async for brokerd_msg in brokerd_trades_stream: - - name = brokerd_msg['name'] - log.info( f'Received broker trade event:\n' f'{pformat(brokerd_msg)}' ) + match brokerd_msg: - if name == 'position': + # BrokerdPosition + case { + 'name': 'position', + 'symbol': sym, + 'broker': broker, + }: + pos_msg = BrokerdPosition(**brokerd_msg) - pos_msg = BrokerdPosition(**brokerd_msg) + # XXX: this will be useful for automatic strats yah? + # keep pps per account up to date locally in ``emsd`` mem + # sym, broker = pos_msg.symbol, pos_msg.broker - # XXX: this will be useful for automatic strats yah? - # keep pps per account up to date locally in ``emsd`` mem - sym, broker = pos_msg.symbol, pos_msg.broker + relay.positions.setdefault( + # NOTE: translate to a FQSN! + (broker, sym), + [] + ).append(pos_msg) - relay.positions.setdefault( - # NOTE: translate to a FQSN! - (broker, sym), - [] - ).append(pos_msg) - - # fan-out-relay position msgs immediately by - # broadcasting updates on all client streams - for client_stream in router.clients.copy(): - try: - await client_stream.send(pos_msg) - except( - trio.ClosedResourceError, - trio.BrokenResourceError, - ): - router.clients.remove(client_stream) - log.warning( - f'client for {client_stream} was already closed?') - - continue - - # Get the broker (order) request id, this **must** be normalized - # into messaging provided by the broker backend - reqid = brokerd_msg['reqid'] - - # all piker originated requests will have an ems generated oid field - oid = brokerd_msg.get( - 'oid', - book._ems2brokerd_ids.inverse.get(reqid) - ) - - if oid is None: - - # XXX: paper clearing special cases - # paper engine race case: ``Client.submit_limit()`` hasn't - # returned yet and provided an output reqid to register - # locally, so we need to retreive the oid that was already - # packed at submission since we already know it ahead of - # time - paper = brokerd_msg['broker_details'].get('paper_info') - ext = brokerd_msg['broker_details'].get('external') - if paper: - # paperboi keeps the ems id up front - oid = paper['oid'] - - elif ext: - # may be an order msg specified as "external" to the - # piker ems flow (i.e. generated by some other - # external broker backend client (like tws for ib) - log.error(f"External trade event {ext}") + # fan-out-relay position msgs immediately by + # broadcasting updates on all client streams + for client_stream in router.clients.copy(): + try: + await client_stream.send(pos_msg) + except( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + router.clients.remove(client_stream) + log.warning( + f'client for {client_stream} was already closed?') continue - else: - # something is out of order, we don't have an oid for - # this broker-side message. - log.error( - f'Unknown oid: {oid} for msg:\n' - f'{pformat(brokerd_msg)}\n' - 'Unable to relay message to client side!?' - ) - - else: - # check for existing live flow entry - entry = book._ems_entries.get(oid) - old_reqid = entry.reqid - - if old_reqid and old_reqid != reqid: - log.warning( - f'Brokerd order id change for {oid}:\n' - f'{old_reqid} -> {reqid}' - ) - - # initial response to brokerd order request - if name == 'ack': + # BrokerdOrderAck + case { + 'name': 'ack', + 'reqid': reqid, # brokerd generated order-request id + 'oid': oid, # ems order-dialog id + } if ( + entry := book._ems_entries.get(oid) + ): + # initial response to brokerd order request + # if name == 'ack': # register the brokerd request id (that was generated # / created internally by the broker backend) with our @@ -697,91 +657,136 @@ async def translate_and_relay_brokerd_events( # update the flow with the ack msg book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) + # no msg to client necessary continue - # a live flow now exists - oid = entry.oid + # BrokerdOrderError + case { + 'name': 'error', + 'oid': oid, # ems order-dialog id + 'reqid': reqid, # brokerd generated order-request id + 'symbol': sym, + 'broker_details': details, + # 'reason': reason, + }: + msg = BrokerdError(**brokerd_msg) + resp = 'broker_errored' + log.error(pformat(msg)) # XXX make one when it's blank? - # TODO: instead this should be our status set. - # ack, open, fill, closed, cancelled' + # TODO: figure out how this will interact with EMS clients + # for ex. on an error do we react with a dark orders + # management response, like cancelling all dark orders? + # This looks like a supervision policy for pending orders on + # some unexpected failure - something we need to think more + # about. In most default situations, with composed orders + # (ex. brackets), most brokers seem to use a oca policy. - resp = None - broker_details = {} + # BrokerdStatus + case { + 'name': 'status', + 'status': status, + 'reqid': reqid, # brokerd generated order-request id + # TODO: feels like the wrong msg for this field? + 'remaining': remaining, - if name in ( - 'error', - ): - # TODO: figure out how this will interact with EMS clients - # for ex. on an error do we react with a dark orders - # management response, like cancelling all dark orders? + } if ( + oid := book._ems2brokerd_ids.inverse.get(reqid) + ): + msg = BrokerdStatus(**brokerd_msg) - # This looks like a supervision policy for pending orders on - # some unexpected failure - something we need to think more - # about. In most default situations, with composed orders - # (ex. brackets), most brokers seem to use a oca policy. + # TODO: should we flatten out these cases and/or should + # they maybe even eventually be separate messages? + if status == 'cancelled': + log.info(f'Cancellation for {oid} is complete!') - msg = BrokerdError(**brokerd_msg) + if status == 'filled': + # conditional execution is fully complete, no more + # fills for the noted order + if not remaining: - # XXX should we make one when it's blank? - log.error(pformat(msg)) + resp = 'broker_executed' - # TODO: getting this bs, prolly need to handle status messages - # 'Market data farm connection is OK:usfarm.nj' + # be sure to pop this stream from our dialogue set + # since the order dialogue should be done. + log.info(f'Execution for {oid} is complete!') - # another stupid ib error to handle - # if 10147 in message: cancel + # just log it + else: + log.info(f'{broker} filled {msg}') - resp = 'broker_errored' - broker_details = msg - - # don't relay message to order requester client - # continue - - elif name in ( - 'status', - ): - msg = BrokerdStatus(**brokerd_msg) - - if msg.status == 'cancelled': - - log.info(f'Cancellation for {oid} is complete!') - - if msg.status == 'filled': - - # conditional execution is fully complete, no more - # fills for the noted order - if not msg.remaining: - - resp = 'broker_executed' - - # be sure to pop this stream from our dialogue set - # since the order dialogue should be done. - log.info(f'Execution for {oid} is complete!') - - # just log it else: - log.info(f'{broker} filled {msg}') + # one of {submitted, cancelled} + resp = 'broker_' + msg.status - else: - # one of {submitted, cancelled} - resp = 'broker_' + msg.status + # BrokerdFill + case { + 'name': 'fill', + 'reqid': reqid, # brokerd generated order-request id + # 'symbol': sym, # paper engine doesn't have this, nbd? + } if ( + oid := book._ems2brokerd_ids.inverse.get(reqid) + ): + # proxy through the "fill" result(s) + msg = BrokerdFill(**brokerd_msg) + resp = 'broker_filled' + log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') - # pass the BrokerdStatus msg inside the broker details field - broker_details = msg + # unknown valid message case? + # case { + # 'name': name, + # 'symbol': sym, + # 'reqid': reqid, # brokerd generated order-request id + # # 'oid': oid, # ems order-dialog id + # 'broker_details': details, - elif name in ( - 'fill', - ): - msg = BrokerdFill(**brokerd_msg) + # } if ( + # book._ems2brokerd_ids.inverse.get(reqid) is None + # ): + # # TODO: pretty sure we can drop this now? - # proxy through the "fill" result(s) - resp = 'broker_filled' - broker_details = msg + # # XXX: paper clearing special cases + # # paper engine race case: ``Client.submit_limit()`` hasn't + # # returned yet and provided an output reqid to register + # # locally, so we need to retreive the oid that was already + # # packed at submission since we already know it ahead of + # # time + # paper = details.get('paper_info') + # ext = details.get('external') - log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + # if paper: + # # paperboi keeps the ems id up front + # oid = paper['oid'] - else: - raise ValueError(f'Brokerd message {brokerd_msg} is invalid') + # elif ext: + # # may be an order msg specified as "external" to the + # # piker ems flow (i.e. generated by some other + # # external broker backend client (like tws for ib) + # log.error(f"External trade event {name}@{ext}") + + # else: + # # something is out of order, we don't have an oid for + # # this broker-side message. + # log.error( + # f'Unknown oid: {oid} for msg {name}:\n' + # f'{pformat(brokerd_msg)}\n' + # 'Unable to relay message to client side!?' + # ) + + # continue + + case _: + raise ValueError(f'Brokerd message {brokerd_msg} is invalid') + + # retrieve existing live flow + entry = book._ems_entries[oid] + assert entry.oid == oid + + old_reqid = entry.reqid + if old_reqid and old_reqid != reqid: + log.warning( + f'Brokerd order id change for {oid}:\n' + f'{old_reqid} -> {reqid}' + ) # Create and relay response status message # to requesting EMS client @@ -793,7 +798,7 @@ async def translate_and_relay_brokerd_events( resp=resp, time_ns=time.time_ns(), broker_reqid=reqid, - brokerd_msg=broker_details, + brokerd_msg=msg, ) ) except KeyError: @@ -808,11 +813,11 @@ async def translate_and_relay_brokerd_events( async def process_client_order_cmds( - client_order_stream: tractor.MsgStream, # noqa + client_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream, symbol: str, - feed: Feed, # noqa + feed: Feed, dark_book: _DarkBook, router: Router, @@ -822,34 +827,24 @@ async def process_client_order_cmds( # cmd: dict async for cmd in client_order_stream: - log.info(f'Received order cmd:\n{pformat(cmd)}') - action = cmd['action'] oid = cmd['oid'] - - # TODO: make ``tractor.MsgStream`` a frozen type again such that it - # can be stored in sets like the old context was. - # wait, maybe this **is** already working thanks to our parent - # `trio` type? - # register this stream as an active dialogue for this order id # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory # others who are registered for such order affiliated msgs). client_dialogues[oid] = client_order_stream - reqid = dark_book._ems2brokerd_ids.inverse.get(oid) live_entry = dark_book._ems_entries.get(oid) - # TODO: can't wait for this stuff to land in 3.10 - # https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings - if action in ('cancel',): - - # check for live-broker order - if live_entry: + match cmd: + # existing live-broker order cancel + case { + 'action': 'cancel', + 'oid': oid, + } if live_entry: reqid = live_entry.reqid - msg = BrokerdCancel( oid=oid, reqid=reqid, @@ -860,12 +855,10 @@ async def process_client_order_cmds( # NOTE: cancel response will be relayed back in messages # from corresponding broker if reqid is not None: - # send cancel to brokerd immediately! log.info( f'Submitting cancel for live order {reqid}' ) - await brokerd_order_stream.send(msg) else: @@ -876,7 +869,10 @@ async def process_client_order_cmds( dark_book._ems_entries[oid] = msg # dark trigger cancel - else: + case { + 'action': 'cancel', + 'oid': oid, + } if not live_entry: try: # remove from dark book clearing dark_book.orders[symbol].pop(oid, None) @@ -896,25 +892,27 @@ async def process_client_order_cmds( except KeyError: log.exception(f'No dark order for {symbol}?') - # TODO: 3.10 struct-pattern matching and unpacking here - elif action in ('alert', 'buy', 'sell',): + # live order submission + case { + 'oid': oid, + 'symbol': fqsn, + 'price': trigger_price, + 'size': size, + 'action': ('buy' | 'sell') as action, + 'exec_mode': 'live', + }: + # TODO: eventually we should be receiving + # this struct on the wire unpacked in a scoped protocol + # setup with ``tractor``. + req = Order(**cmd) + broker = req.brokers[0] - msg = Order(**cmd) - - fqsn = msg.symbol - trigger_price = msg.price - size = msg.size - exec_mode = msg.exec_mode - broker = msg.brokers[0] - # remove the broker part before creating a message - # to send to the specific broker since they probably - # aren't expectig their own name, but should they? - sym = fqsn.replace(f'.{broker}', '') - - if exec_mode == 'live' and action in ('buy', 'sell',): + # remove the broker part before creating a message + # to send to the specific broker since they probably + # aren't expectig their own name, but should they? + sym = fqsn.replace(f'.{broker}', '') if live_entry is not None: - # sanity check on emsd id assert live_entry.oid == oid reqid = live_entry.reqid @@ -934,7 +932,7 @@ async def process_client_order_cmds( action=action, price=trigger_price, size=size, - account=msg.account, + account=req.account, ) # send request to backend @@ -954,12 +952,22 @@ async def process_client_order_cmds( # that live order asap. dark_book._ems_entries[oid] = msg - # "DARK" triggers - # submit order to local EMS book and scan loop, - # effectively a local clearing engine, which - # scans for conditions and triggers matching executions - elif exec_mode in ('dark', 'paper') or ( - action in ('alert') + # dark-order / alert submission + case { + 'oid': oid, + 'symbol': fqsn, + 'price': trigger_price, + 'size': size, + 'exec_mode': exec_mode, + 'action': action, + 'brokers': brokers, # list + } if ( + # "DARK" triggers + # submit order to local EMS book and scan loop, + # effectively a local clearing engine, which + # scans for conditions and triggers matching executions + exec_mode in ('dark', 'paper') + or action == 'alert' ): # Auto-gen scanner predicate: # we automatically figure out what the alert check @@ -977,6 +985,7 @@ async def process_client_order_cmds( pred = mk_check(trigger_price, last, action) spread_slap: float = 5 + sym = fqsn.replace(f'.{brokers[0]}', '') min_tick = feed.symbols[sym].tick_size if action == 'buy': @@ -999,10 +1008,8 @@ async def process_client_order_cmds( abs_diff_away = 0 # submit execution/order to EMS scan loop - # NOTE: this may result in an override of an existing # dark book entry if the order id already exists - dark_book.orders.setdefault( fqsn, {} )[oid] = ( @@ -1029,17 +1036,16 @@ async def process_client_order_cmds( @tractor.context async def _emsd_main( - ctx: tractor.Context, fqsn: str, exec_mode: str, # ('paper', 'live') - loglevel: str = 'info', ) -> None: - '''EMS (sub)actor entrypoint providing the - execution management (micro)service which conducts broker - order clearing control on behalf of clients. + ''' + EMS (sub)actor entrypoint providing the execution management + (micro)service which conducts broker order clearing control on + behalf of clients. This is the daemon (child) side routine which starts an EMS runtime task (one per broker-feed) and and begins streaming back alerts from @@ -1083,9 +1089,8 @@ async def _emsd_main( # tractor.Context instead of strictly requiring a ctx arg. ems_ctx = ctx - feed: Feed - # spawn one task per broker feed + feed: Feed async with ( maybe_open_feed( [fqsn], diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 2c7c8603..2160bfca 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -312,50 +312,54 @@ async def simulate_fills( # this stream may eventually contain multiple symbols async for quotes in quote_stream: - for sym, quote in quotes.items(): - for tick in iterticks( quote, # dark order price filter(s) types=('ask', 'bid', 'trade', 'last') ): # print(tick) - tick_price = tick.get('price') - ttype = tick['type'] + match tick: + case { + 'price': tick_price, + 'type': 'ask', + }: + client.last_ask = ( + tick_price, + tick.get('size', client.last_ask[1]), + ) - if ttype in ('ask',): + orders = client._buys.get(sym, {}) + book_sequence = reversed( + sorted(orders.keys(), key=itemgetter(1))) - client.last_ask = ( - tick_price, - tick.get('size', client.last_ask[1]), - ) + def pred(our_price): + return tick_price <= our_price - orders = client._buys.get(sym, {}) + case { + 'price': tick_price, + 'type': 'bid', + }: + client.last_bid = ( + tick_price, + tick.get('size', client.last_bid[1]), + ) + orders = client._sells.get(sym, {}) + book_sequence = sorted( + orders.keys(), + key=itemgetter(1) + ) - book_sequence = reversed( - sorted(orders.keys(), key=itemgetter(1))) + def pred(our_price): + return tick_price >= our_price - def pred(our_price): - return tick_price < our_price - - elif ttype in ('bid',): - - client.last_bid = ( - tick_price, - tick.get('size', client.last_bid[1]), - ) - - orders = client._sells.get(sym, {}) - book_sequence = sorted(orders.keys(), key=itemgetter(1)) - - def pred(our_price): - return tick_price > our_price - - elif ttype in ('trade', 'last'): - # TODO: simulate actual book queues and our orders - # place in it, might require full L2 data? - continue + case { + 'price': tick_price, + 'type': ('trade' | 'last'), + }: + # TODO: simulate actual book queues and our orders + # place in it, might require full L2 data? + continue # iterate book prices descending for oid, our_price in book_sequence: