From 422977d27ace39b01e4cbce4c50410e3410d90f7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 17 Dec 2021 13:34:04 -0500 Subject: [PATCH] Port to new `tractor.trionics.maybe_open_context()` api --- piker/_cacheables.py | 6 +++--- piker/data/feed.py | 27 +++++++++++++++------------ piker/ui/_display.py | 14 +++++++------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 02ac9240..ba7361c3 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -66,14 +66,14 @@ def async_lifo_cache(maxsize=128): async def open_cached_client( brokername: str, ) -> 'Client': # noqa - '''Get a cached broker client from the current actor's local vars. + ''' + Get a cached broker client from the current actor's local vars. If one has not been setup do it and cache it. ''' brokermod = get_brokermod(brokername) async with maybe_open_context( - key=brokername, - mngr=brokermod.get_client(), + acm_func=brokermod.get_client, ) as (cache_hit, client): yield client diff --git a/piker/data/feed.py b/piker/data/feed.py index 0ee94ad9..1e0c55b2 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -27,7 +27,6 @@ from types import ModuleType from typing import ( Any, Sequence, AsyncIterator, Optional, - Awaitable, Callable, ) import trio @@ -372,11 +371,12 @@ async def open_sample_step_stream( # a lone broker-daemon per provider should be # created for all practical purposes async with maybe_open_context( - key=delay_s, - mngr=portal.open_stream_from( + acm_func=partial( + portal.open_stream_from, iter_ohlc_periods, - delay_s=delay_s, # must be kwarg ), + + kwargs={'delay_s': delay_s}, ) as (cache_hit, istream): if cache_hit: # add a new broadcast subscription for the quote stream @@ -524,7 +524,7 @@ async def open_feed( ) as (ctx, (init_msg, first_quotes)), ctx.open_stream( - # XXX: be explicit about stream backpressure since we should + # XXX: be explicit about stream backpressure since we should # **never** overrun on feeds being too fast, which will # pretty much always happen with HFT XD backpressure=True @@ -574,6 +574,7 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) + # yield feed try: yield feed finally: @@ -599,16 +600,18 @@ async def maybe_open_feed( sym = symbols[0].lower() async with maybe_open_context( - key=(brokername, sym), - mngr=open_feed( - brokername, - [sym], - loglevel=loglevel, - **kwargs, - ), + acm_func=open_feed, + kwargs={ + 'brokername': brokername, + 'symbols': [sym], + 'loglevel': loglevel, + 'tick_throttle': kwargs.get('tick_throttle'), + }, + key=sym, ) as (cache_hit, feed): if cache_hit: + print('USING CACHED FEED') # add a new broadcast subscription for the quote stream # if this feed is likely already in use async with feed.stream.subscribe() as bstream: diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 85e7ac60..241cd370 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -572,14 +572,14 @@ async def maybe_open_fsp_cluster( **kwargs, ) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - uid = tractor.current_actor().uid + kwargs.update( + {'workers': workers} + ) + async with maybe_open_context( - key=uid, # for now make a cluster per client? - mngr=open_fsp_cluster( - workers, - # loglevel=loglevel, - **kwargs, - ), + # for now make a cluster per client? + acm_func=open_fsp_cluster, + kwargs=kwargs, ) as (cache_hit, cluster_map): if cache_hit: log.info('re-using existing fsp cluster')