Port to new `tractor.trionics.maybe_open_context()` api

fspd_cluster
Tyler Goodlet 2021-12-17 13:34:04 -05:00
parent 5b368992f6
commit 422977d27a
3 changed files with 25 additions and 22 deletions

View File

@ -66,14 +66,14 @@ def async_lifo_cache(maxsize=128):
async def open_cached_client( async def open_cached_client(
brokername: str, brokername: str,
) -> 'Client': # noqa ) -> 'Client': # noqa
'''Get a cached broker client from the current actor's local vars. '''
Get a cached broker client from the current actor's local vars.
If one has not been setup do it and cache it. If one has not been setup do it and cache it.
''' '''
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
async with maybe_open_context( async with maybe_open_context(
key=brokername, acm_func=brokermod.get_client,
mngr=brokermod.get_client(),
) as (cache_hit, client): ) as (cache_hit, client):
yield client yield client

View File

@ -27,7 +27,6 @@ from types import ModuleType
from typing import ( from typing import (
Any, Sequence, Any, Sequence,
AsyncIterator, Optional, AsyncIterator, Optional,
Awaitable, Callable,
) )
import trio import trio
@ -372,11 +371,12 @@ async def open_sample_step_stream(
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
async with maybe_open_context( async with maybe_open_context(
key=delay_s, acm_func=partial(
mngr=portal.open_stream_from( portal.open_stream_from,
iter_ohlc_periods, iter_ohlc_periods,
delay_s=delay_s, # must be kwarg
), ),
kwargs={'delay_s': delay_s},
) as (cache_hit, istream): ) as (cache_hit, istream):
if cache_hit: if cache_hit:
# add a new broadcast subscription for the quote stream # add a new broadcast subscription for the quote stream
@ -524,7 +524,7 @@ async def open_feed(
) as (ctx, (init_msg, first_quotes)), ) as (ctx, (init_msg, first_quotes)),
ctx.open_stream( ctx.open_stream(
# XXX: be explicit about stream backpressure since we should # XXX: be explicit about stream backpressure since we should
# **never** overrun on feeds being too fast, which will # **never** overrun on feeds being too fast, which will
# pretty much always happen with HFT XD # pretty much always happen with HFT XD
backpressure=True backpressure=True
@ -574,6 +574,7 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates) feed._max_sample_rate = max(ohlc_sample_rates)
# yield feed
try: try:
yield feed yield feed
finally: finally:
@ -599,16 +600,18 @@ async def maybe_open_feed(
sym = symbols[0].lower() sym = symbols[0].lower()
async with maybe_open_context( async with maybe_open_context(
key=(brokername, sym), acm_func=open_feed,
mngr=open_feed( kwargs={
brokername, 'brokername': brokername,
[sym], 'symbols': [sym],
loglevel=loglevel, 'loglevel': loglevel,
**kwargs, 'tick_throttle': kwargs.get('tick_throttle'),
), },
key=sym,
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
print('USING CACHED FEED')
# add a new broadcast subscription for the quote stream # add a new broadcast subscription for the quote stream
# if this feed is likely already in use # if this feed is likely already in use
async with feed.stream.subscribe() as bstream: async with feed.stream.subscribe() as bstream:

View File

@ -572,14 +572,14 @@ async def maybe_open_fsp_cluster(
**kwargs, **kwargs,
) -> AsyncGenerator[int, dict[str, tractor.Portal]]: ) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
uid = tractor.current_actor().uid kwargs.update(
{'workers': workers}
)
async with maybe_open_context( async with maybe_open_context(
key=uid, # for now make a cluster per client? # for now make a cluster per client?
mngr=open_fsp_cluster( acm_func=open_fsp_cluster,
workers, kwargs=kwargs,
# loglevel=loglevel,
**kwargs,
),
) as (cache_hit, cluster_map): ) as (cache_hit, cluster_map):
if cache_hit: if cache_hit:
log.info('re-using existing fsp cluster') log.info('re-using existing fsp cluster')