Factor out all `maybe_open_context()` guff

offline_dark_clearing
Tyler Goodlet 2022-10-07 13:16:01 -04:00
parent 94f81587ab
commit c437f9370a
1 changed files with 48 additions and 32 deletions

View File

@ -1275,6 +1275,26 @@ async def maybe_open_trade_relays(
) -> tuple: ) -> tuple:
def cache_on_fqsn_unless_paper(
router: Router,
fqsn: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
) -> Hashable:
if exec_mode == 'paper':
return f'paper_{fqsn}'
else:
return fqsn
# XXX: closure to enable below use of
# ``tractor.trionics.maybe_open_context()``
@acm
async def cached_mngr(
router: Router,
fqsn: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
):
relay, feed, client_ready = await _router.nursery.start( relay, feed, client_ready = await _router.nursery.start(
_router.open_trade_relays, _router.open_trade_relays,
fqsn, fqsn,
@ -1283,6 +1303,21 @@ async def maybe_open_trade_relays(
) )
yield relay, feed, client_ready yield relay, feed, client_ready
async with tractor.trionics.maybe_open_context(
acm_func=cached_mngr,
kwargs={
'router': _router,
'fqsn': fqsn,
'exec_mode': exec_mode,
'loglevel': loglevel,
},
key=cache_on_fqsn_unless_paper,
) as (
cache_hit,
(relay, feed, client_ready)
):
yield relay, feed, client_ready
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
@ -1351,31 +1386,13 @@ async def _emsd_main(
# NOTE: open a stream with the brokerd backend for order flow # NOTE: open a stream with the brokerd backend for order flow
# dialogue and dark clearing but only open one: we try to keep as # dialogue and dark clearing but only open one: we try to keep as
# few duplicate streams as necessary per ems actor. # few duplicate streams as necessary per ems actor.
def cache_on_fqsn_unless_paper( async with maybe_open_trade_relays(
router: Router, _router,
fqsn: str, fqsn,
exec_mode: str, # ('paper', 'live') exec_mode,
loglevel: str = 'info', loglevel,
) -> Hashable: ) as (relay, feed, client_ready):
if exec_mode == 'paper':
return f'paper_{fqsn}'
else:
return fqsn
async with tractor.trionics.maybe_open_context(
acm_func=maybe_open_trade_relays,
kwargs={
'router': _router,
'fqsn': fqsn,
'exec_mode': exec_mode,
'loglevel': loglevel,
},
key=cache_on_fqsn_unless_paper,
) as (
cache_hit,
(relay, feed, client_ready)
):
brokerd_stream = relay.brokerd_stream brokerd_stream = relay.brokerd_stream
dark_book = _router.get_dark_book(broker) dark_book = _router.get_dark_book(broker)
@ -1412,7 +1429,6 @@ async def _emsd_main(
dark_book, dark_book,
_router, _router,
) )
finally: finally:
# try to remove client from subscription registry # try to remove client from subscription registry
_router.subscribers[fqsn].remove(client_stream) _router.subscribers[fqsn].remove(client_stream)