From 94f81587abcfbc2ed0d2d25e4c3550d942b2b201 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Oct 2022 17:37:05 -0400 Subject: [PATCH] Cache EMS trade relay tasks on feed fqsn Except for paper accounts (in which case we need a trades dialog and paper engine per symbol to enable simulated clearing) we can rely on the instrument feed (symbol name) to be the caching key. Utilize `tractor.trionics.maybe_open_context()` and the new key-as-callable support in the paper case to ensure we have separate paper clearing loops per symbol. Requires https://github.com/goodboy/tractor/pull/329 --- piker/clearing/_ems.py | 154 +++++++++++++++++++++++++---------------- 1 file changed, 95 insertions(+), 59 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 6fe815dc..34486a91 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -31,6 +31,7 @@ from typing import ( AsyncIterator, Any, Callable, + Hashable, Optional, ) @@ -1265,6 +1266,24 @@ async def process_client_order_cmds( ) +@acm +async def maybe_open_trade_relays( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + +) -> tuple: + + relay, feed, client_ready = await _router.nursery.start( + _router.open_trade_relays, + fqsn, + exec_mode, + loglevel, + ) + yield relay, feed, client_ready + + @tractor.context async def _emsd_main( ctx: tractor.Context, @@ -1329,69 +1348,86 @@ async def _emsd_main( feed: Feed client_ready: trio.Event - # open a stream with the brokerd backend for order flow dialogue - # only open if one isn't already up: we try to keep as few duplicate - # streams as necessary. - # TODO: should we try using `tractor.trionics.maybe_open_context()` - # here? - relay, feed, client_ready = await _router.nursery.start( - _router.open_trade_relays, - fqsn, - exec_mode, - loglevel, - ) - brokerd_stream = relay.brokerd_stream - dark_book = _router.get_dark_book(broker) + # NOTE: open a stream with the brokerd backend for order flow + # dialogue and dark clearing but only open one: we try to keep as + # few duplicate streams as necessary per ems actor. + def cache_on_fqsn_unless_paper( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + ) -> Hashable: + if exec_mode == 'paper': + return f'paper_{fqsn}' + else: + return fqsn - # signal to client that we're started and deliver - # all known pps and accounts for this ``brokerd``. - await ems_ctx.started(( - relay.positions, - list(relay.accounts), - dark_book._active, - )) + async with tractor.trionics.maybe_open_context( + acm_func=maybe_open_trade_relays, + kwargs={ + 'router': _router, + 'fqsn': fqsn, + 'exec_mode': exec_mode, + 'loglevel': loglevel, + }, + key=cache_on_fqsn_unless_paper, - # establish 2-way stream with requesting order-client and - # begin handling inbound order requests and updates - async with ems_ctx.open_stream() as client_stream: + ) as ( + cache_hit, + (relay, feed, client_ready) + ): + brokerd_stream = relay.brokerd_stream + dark_book = _router.get_dark_book(broker) - # register the client side before starting the - # brokerd-side relay task to ensure the client is - # delivered all exisiting open orders on startup. - # TODO: instead of by fqsn we need a subscription - # system/schema here to limit what each new client is - # allowed to see in terms of broadcasted order flow - # updates per dialog. - _router.subscribers[fqsn].add(client_stream) - client_ready.set() + # signal to client that we're started and deliver + # all known pps and accounts for this ``brokerd``. + await ems_ctx.started(( + relay.positions, + list(relay.accounts), + dark_book._active, + )) - # start inbound (from attached client) order request processing - # main entrypoint, run here until cancelled. - try: - await process_client_order_cmds( - client_stream, - brokerd_stream, - fqsn, - feed, - dark_book, - _router, - ) + # establish 2-way stream with requesting order-client and + # begin handling inbound order requests and updates + async with ems_ctx.open_stream() as client_stream: - finally: - # try to remove client from subscription registry - _router.subscribers[fqsn].remove(client_stream) + # register the client side before starting the + # brokerd-side relay task to ensure the client is + # delivered all exisiting open orders on startup. + # TODO: instead of by fqsn we need a subscription + # system/schema here to limit what each new client is + # allowed to see in terms of broadcasted order flow + # updates per dialog. + _router.subscribers[fqsn].add(client_stream) + client_ready.set() - for oid, client_streams in _router.dialogs.items(): - client_streams.discard(client_stream) + # start inbound (from attached client) order request processing + # main entrypoint, run here until cancelled. + try: + await process_client_order_cmds( + client_stream, + brokerd_stream, + fqsn, + feed, + dark_book, + _router, + ) - # TODO: for order dialogs left "alive" in - # the ems this is where we should allow some - # system to take over management. Likely we - # want to allow the user to choose what kind - # of policy to use (eg. cancel all orders - # from client, run some algo, etc.) - if not client_streams: - log.warning( - f'Order dialog is not being monitored:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - ) + finally: + # try to remove client from subscription registry + _router.subscribers[fqsn].remove(client_stream) + + for oid, client_streams in _router.dialogs.items(): + client_streams.discard(client_stream) + + # TODO: for order dialogs left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.) + if not client_streams: + log.warning( + f'Order dialog is not being monitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + )