diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3853ef66..85a01c0f 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -23,7 +23,11 @@ from dataclasses import dataclass, field from math import isnan from pprint import pformat import time -from typing import AsyncIterator, Callable +from typing import ( + AsyncIterator, + Any, + Callable, +) from bidict import bidict import trio @@ -572,98 +576,57 @@ async def translate_and_relay_brokerd_events( assert relay.brokerd_dialogue == brokerd_trades_stream + brokerd_msg: dict[str, Any] async for brokerd_msg in brokerd_trades_stream: - - name = brokerd_msg['name'] - log.info( f'Received broker trade event:\n' f'{pformat(brokerd_msg)}' ) + match brokerd_msg: - if name == 'position': + # BrokerdPosition + case { + 'name': 'position', + 'symbol': sym, + 'broker': broker, + }: + pos_msg = BrokerdPosition(**brokerd_msg) - pos_msg = BrokerdPosition(**brokerd_msg) + # XXX: this will be useful for automatic strats yah? + # keep pps per account up to date locally in ``emsd`` mem + # sym, broker = pos_msg.symbol, pos_msg.broker - # XXX: this will be useful for automatic strats yah? - # keep pps per account up to date locally in ``emsd`` mem - sym, broker = pos_msg.symbol, pos_msg.broker + relay.positions.setdefault( + # NOTE: translate to a FQSN! + (broker, sym), + [] + ).append(pos_msg) - relay.positions.setdefault( - # NOTE: translate to a FQSN! - (broker, sym), - [] - ).append(pos_msg) - - # fan-out-relay position msgs immediately by - # broadcasting updates on all client streams - for client_stream in router.clients.copy(): - try: - await client_stream.send(pos_msg) - except( - trio.ClosedResourceError, - trio.BrokenResourceError, - ): - router.clients.remove(client_stream) - log.warning( - f'client for {client_stream} was already closed?') - - continue - - # Get the broker (order) request id, this **must** be normalized - # into messaging provided by the broker backend - reqid = brokerd_msg['reqid'] - - # all piker originated requests will have an ems generated oid field - oid = brokerd_msg.get( - 'oid', - book._ems2brokerd_ids.inverse.get(reqid) - ) - - if oid is None: - - # XXX: paper clearing special cases - # paper engine race case: ``Client.submit_limit()`` hasn't - # returned yet and provided an output reqid to register - # locally, so we need to retreive the oid that was already - # packed at submission since we already know it ahead of - # time - paper = brokerd_msg['broker_details'].get('paper_info') - ext = brokerd_msg['broker_details'].get('external') - if paper: - # paperboi keeps the ems id up front - oid = paper['oid'] - - elif ext: - # may be an order msg specified as "external" to the - # piker ems flow (i.e. generated by some other - # external broker backend client (like tws for ib) - log.error(f"External trade event {ext}") + # fan-out-relay position msgs immediately by + # broadcasting updates on all client streams + for client_stream in router.clients.copy(): + try: + await client_stream.send(pos_msg) + except( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + router.clients.remove(client_stream) + log.warning( + f'client for {client_stream} was already closed?') continue - else: - # something is out of order, we don't have an oid for - # this broker-side message. - log.error( - f'Unknown oid: {oid} for msg:\n' - f'{pformat(brokerd_msg)}\n' - 'Unable to relay message to client side!?' - ) - - else: - # check for existing live flow entry - entry = book._ems_entries.get(oid) - old_reqid = entry.reqid - - if old_reqid and old_reqid != reqid: - log.warning( - f'Brokerd order id change for {oid}:\n' - f'{old_reqid} -> {reqid}' - ) - - # initial response to brokerd order request - if name == 'ack': + # BrokerdOrderAck + case { + 'name': 'ack', + 'reqid': reqid, # brokerd generated order-request id + 'oid': oid, # ems order-dialog id + } if ( + entry := book._ems_entries.get(oid) + ): + # initial response to brokerd order request + # if name == 'ack': # register the brokerd request id (that was generated # / created internally by the broker backend) with our @@ -697,91 +660,136 @@ async def translate_and_relay_brokerd_events( # update the flow with the ack msg book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) + # no msg to client necessary continue - # a live flow now exists - oid = entry.oid + # BrokerdOrderError + case { + 'name': 'error', + 'oid': oid, # ems order-dialog id + 'reqid': reqid, # brokerd generated order-request id + 'symbol': sym, + 'broker_details': details, + # 'reason': reason, + }: + msg = BrokerdError(**brokerd_msg) + resp = 'broker_errored' + log.error(pformat(msg)) # XXX make one when it's blank? - # TODO: instead this should be our status set. - # ack, open, fill, closed, cancelled' + # TODO: figure out how this will interact with EMS clients + # for ex. on an error do we react with a dark orders + # management response, like cancelling all dark orders? + # This looks like a supervision policy for pending orders on + # some unexpected failure - something we need to think more + # about. In most default situations, with composed orders + # (ex. brackets), most brokers seem to use a oca policy. - resp = None - broker_details = {} - if name in ( - 'error', - ): - # TODO: figure out how this will interact with EMS clients - # for ex. on an error do we react with a dark orders - # management response, like cancelling all dark orders? + # BrokerdStatus + case { + 'name': 'status', + 'status': status, + 'reqid': reqid, # brokerd generated order-request id + # TODO: feels like the wrong msg for this field? + 'remaining': remaining, - # This looks like a supervision policy for pending orders on - # some unexpected failure - something we need to think more - # about. In most default situations, with composed orders - # (ex. brackets), most brokers seem to use a oca policy. + } if ( + oid := book._ems2brokerd_ids.inverse.get(reqid) + ): + msg = BrokerdStatus(**brokerd_msg) - msg = BrokerdError(**brokerd_msg) + # TODO: should we flatten out these cases and/or should + # they maybe even eventually be separate messages? + if status == 'cancelled': + log.info(f'Cancellation for {oid} is complete!') - # XXX should we make one when it's blank? - log.error(pformat(msg)) + if status == 'filled': + # conditional execution is fully complete, no more + # fills for the noted order + if not remaining: - # TODO: getting this bs, prolly need to handle status messages - # 'Market data farm connection is OK:usfarm.nj' + resp = 'broker_executed' - # another stupid ib error to handle - # if 10147 in message: cancel + # be sure to pop this stream from our dialogue set + # since the order dialogue should be done. + log.info(f'Execution for {oid} is complete!') - resp = 'broker_errored' - broker_details = msg + # just log it + else: + log.info(f'{broker} filled {msg}') - # don't relay message to order requester client - # continue - - elif name in ( - 'status', - ): - msg = BrokerdStatus(**brokerd_msg) - - if msg.status == 'cancelled': - - log.info(f'Cancellation for {oid} is complete!') - - if msg.status == 'filled': - - # conditional execution is fully complete, no more - # fills for the noted order - if not msg.remaining: - - resp = 'broker_executed' - - # be sure to pop this stream from our dialogue set - # since the order dialogue should be done. - log.info(f'Execution for {oid} is complete!') - - # just log it else: - log.info(f'{broker} filled {msg}') + # one of {submitted, cancelled} + resp = 'broker_' + msg.status - else: - # one of {submitted, cancelled} - resp = 'broker_' + msg.status + # BrokerdFill + case { + 'name': 'fill', + 'reqid': reqid, # brokerd generated order-request id + # 'symbol': sym, # paper engine doesn't have this, nbd? + } if ( + oid := book._ems2brokerd_ids.inverse.get(reqid) + ): + # proxy through the "fill" result(s) + msg = BrokerdFill(**brokerd_msg) + resp = 'broker_filled' + log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') - # pass the BrokerdStatus msg inside the broker details field - broker_details = msg + # unknown valid message case? + case { + 'name': name, + 'symbol': sym, + 'reqid': reqid, # brokerd generated order-request id + # 'oid': oid, # ems order-dialog id + 'broker_details': details, - elif name in ( - 'fill', - ): - msg = BrokerdFill(**brokerd_msg) + } if ( + book._ems2brokerd_ids.inverse.get(reqid) is None + ): + # TODO: pretty sure we can drop this now? + # XXX: paper clearing special cases + # paper engine race case: ``Client.submit_limit()`` hasn't + # returned yet and provided an output reqid to register + # locally, so we need to retreive the oid that was already + # packed at submission since we already know it ahead of + # time + paper = details.get('paper_info') + ext = details.get('external') - # proxy through the "fill" result(s) - resp = 'broker_filled' - broker_details = msg + if paper: + # paperboi keeps the ems id up front + oid = paper['oid'] - log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + elif ext: + # may be an order msg specified as "external" to the + # piker ems flow (i.e. generated by some other + # external broker backend client (like tws for ib) + log.error(f"External trade event {name}@{ext}") - else: - raise ValueError(f'Brokerd message {brokerd_msg} is invalid') + else: + # something is out of order, we don't have an oid for + # this broker-side message. + log.error( + f'Unknown oid: {oid} for msg {name}:\n' + f'{pformat(brokerd_msg)}\n' + 'Unable to relay message to client side!?' + ) + + continue + + case _: + raise ValueError(f'Brokerd message {brokerd_msg} is invalid') + + # retrieve existing live flow + entry = book._ems_entries[oid] + assert entry.oid == oid + + old_reqid = entry.reqid + if old_reqid and old_reqid != reqid: + log.warning( + f'Brokerd order id change for {oid}:\n' + f'{old_reqid} -> {reqid}' + ) # Create and relay response status message # to requesting EMS client @@ -793,7 +801,7 @@ async def translate_and_relay_brokerd_events( resp=resp, time_ns=time.time_ns(), broker_reqid=reqid, - brokerd_msg=broker_details, + brokerd_msg=msg, ) ) except KeyError: