From c437f9370a279d8665eb3c5724f4070ef5347cfb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Oct 2022 13:16:01 -0400 Subject: [PATCH] Factor out all `maybe_open_context()` guff --- piker/clearing/_ems.py | 80 +++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 34486a91..3bada0c3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -1275,13 +1275,48 @@ async def maybe_open_trade_relays( ) -> tuple: - relay, feed, client_ready = await _router.nursery.start( - _router.open_trade_relays, - fqsn, - exec_mode, - loglevel, - ) - yield relay, feed, client_ready + def cache_on_fqsn_unless_paper( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + ) -> Hashable: + if exec_mode == 'paper': + return f'paper_{fqsn}' + else: + return fqsn + + # XXX: closure to enable below use of + # ``tractor.trionics.maybe_open_context()`` + @acm + async def cached_mngr( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + ): + relay, feed, client_ready = await _router.nursery.start( + _router.open_trade_relays, + fqsn, + exec_mode, + loglevel, + ) + yield relay, feed, client_ready + + async with tractor.trionics.maybe_open_context( + acm_func=cached_mngr, + kwargs={ + 'router': _router, + 'fqsn': fqsn, + 'exec_mode': exec_mode, + 'loglevel': loglevel, + }, + key=cache_on_fqsn_unless_paper, + ) as ( + cache_hit, + (relay, feed, client_ready) + ): + yield relay, feed, client_ready @tractor.context @@ -1351,31 +1386,13 @@ async def _emsd_main( # NOTE: open a stream with the brokerd backend for order flow # dialogue and dark clearing but only open one: we try to keep as # few duplicate streams as necessary per ems actor. - def cache_on_fqsn_unless_paper( - router: Router, - fqsn: str, - exec_mode: str, # ('paper', 'live') - loglevel: str = 'info', - ) -> Hashable: - if exec_mode == 'paper': - return f'paper_{fqsn}' - else: - return fqsn + async with maybe_open_trade_relays( + _router, + fqsn, + exec_mode, + loglevel, + ) as (relay, feed, client_ready): - async with tractor.trionics.maybe_open_context( - acm_func=maybe_open_trade_relays, - kwargs={ - 'router': _router, - 'fqsn': fqsn, - 'exec_mode': exec_mode, - 'loglevel': loglevel, - }, - key=cache_on_fqsn_unless_paper, - - ) as ( - cache_hit, - (relay, feed, client_ready) - ): brokerd_stream = relay.brokerd_stream dark_book = _router.get_dark_book(broker) @@ -1412,7 +1429,6 @@ async def _emsd_main( dark_book, _router, ) - finally: # try to remove client from subscription registry _router.subscribers[fqsn].remove(client_stream)