diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index b9cb0aad..81288899 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -192,10 +192,11 @@ async def clear_dark_triggers( ttype not in tf or not pred(price) ): - log.debug( - f'skipping quote for {sym} ' - f'{pred}, {ttype} not in {tf}?, {pred(price)}' - ) + # log.runtime( + # f'skipping quote for {sym} ' + # f'{pred} -> {pred(price)}\n' + # f'{ttype} not in {tf}?' + # ) # majority of iterations will be non-matches continue @@ -731,46 +732,47 @@ async def translate_and_relay_brokerd_events( log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') # unknown valid message case? - case { - 'name': name, - 'symbol': sym, - 'reqid': reqid, # brokerd generated order-request id - # 'oid': oid, # ems order-dialog id - 'broker_details': details, + # case { + # 'name': name, + # 'symbol': sym, + # 'reqid': reqid, # brokerd generated order-request id + # # 'oid': oid, # ems order-dialog id + # 'broker_details': details, - } if ( - book._ems2brokerd_ids.inverse.get(reqid) is None - ): - # TODO: pretty sure we can drop this now? - # XXX: paper clearing special cases - # paper engine race case: ``Client.submit_limit()`` hasn't - # returned yet and provided an output reqid to register - # locally, so we need to retreive the oid that was already - # packed at submission since we already know it ahead of - # time - paper = details.get('paper_info') - ext = details.get('external') + # } if ( + # book._ems2brokerd_ids.inverse.get(reqid) is None + # ): + # # TODO: pretty sure we can drop this now? - if paper: - # paperboi keeps the ems id up front - oid = paper['oid'] + # # XXX: paper clearing special cases + # # paper engine race case: ``Client.submit_limit()`` hasn't + # # returned yet and provided an output reqid to register + # # locally, so we need to retreive the oid that was already + # # packed at submission since we already know it ahead of + # # time + # paper = details.get('paper_info') + # ext = details.get('external') - elif ext: - # may be an order msg specified as "external" to the - # piker ems flow (i.e. generated by some other - # external broker backend client (like tws for ib) - log.error(f"External trade event {name}@{ext}") + # if paper: + # # paperboi keeps the ems id up front + # oid = paper['oid'] - else: - # something is out of order, we don't have an oid for - # this broker-side message. - log.error( - f'Unknown oid: {oid} for msg {name}:\n' - f'{pformat(brokerd_msg)}\n' - 'Unable to relay message to client side!?' - ) + # elif ext: + # # may be an order msg specified as "external" to the + # # piker ems flow (i.e. generated by some other + # # external broker backend client (like tws for ib) + # log.error(f"External trade event {name}@{ext}") - continue + # else: + # # something is out of order, we don't have an oid for + # # this broker-side message. + # log.error( + # f'Unknown oid: {oid} for msg {name}:\n' + # f'{pformat(brokerd_msg)}\n' + # 'Unable to relay message to client side!?' + # ) + + # continue case _: raise ValueError(f'Brokerd message {brokerd_msg} is invalid') @@ -811,11 +813,11 @@ async def translate_and_relay_brokerd_events( async def process_client_order_cmds( - client_order_stream: tractor.MsgStream, # noqa + client_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream, symbol: str, - feed: Feed, # noqa + feed: Feed, dark_book: _DarkBook, router: Router, @@ -837,7 +839,6 @@ async def process_client_order_cmds( live_entry = dark_book._ems_entries.get(oid) match cmd: - # existing live-broker order cancel case { 'action': 'cancel', @@ -1035,17 +1036,16 @@ async def process_client_order_cmds( @tractor.context async def _emsd_main( - ctx: tractor.Context, fqsn: str, exec_mode: str, # ('paper', 'live') - loglevel: str = 'info', ) -> None: - '''EMS (sub)actor entrypoint providing the - execution management (micro)service which conducts broker - order clearing control on behalf of clients. + ''' + EMS (sub)actor entrypoint providing the execution management + (micro)service which conducts broker order clearing control on + behalf of clients. This is the daemon (child) side routine which starts an EMS runtime task (one per broker-feed) and and begins streaming back alerts from @@ -1089,9 +1089,8 @@ async def _emsd_main( # tractor.Context instead of strictly requiring a ctx arg. ems_ctx = ctx - feed: Feed - # spawn one task per broker feed + feed: Feed async with ( maybe_open_feed( [fqsn],