diff --git a/piker/_cacheables.py b/piker/_cacheables.py index d86b477d..47ffa6a4 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -36,6 +36,7 @@ from contextlib import ( import trio from trio_typing import TaskStatus +import tractor from tractor._portal import maybe_open_nursery from .brokers import get_brokermod @@ -180,43 +181,49 @@ async def maybe_open_ctx( ctx_key = id(mngr) - # TODO: does this need to be a tractor "root nursery"? - async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n: - cache.nurseries[ctx_key] = n - value = None - try: - # lock feed acquisition around task racing / ``trio``'s - # scheduler protocol - value = cache.values[key] - log.info(f'Reusing cached feed for {key}') - cache.users += 1 - cache.lock.release() - yield True, value + value = None + try: + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + value = cache.values[key] + log.info(f'Reusing cached feed for {key}') + cache.users += 1 + cache.lock.release() + yield True, value - except KeyError: - log.info(f'Allocating new feed for {key}') + except KeyError: + log.info(f'Allocating new feed for {key}') - # **critical section** that should prevent other tasks from - # checking the cache until complete otherwise the scheduler - # may switch and by accident we create more then one feed. + # **critical section** that should prevent other tasks from + # checking the cache until complete otherwise the scheduler + # may switch and by accident we create more then one feed. - value = await n.start(cache.run_ctx, mngr, key) - cache.users += 1 + # TODO: vaoid pulling from ``tractor`` internals and + # instead offer a "root nursery" in + service_n = tractor.current_actor()._service_n + + # TODO: does this need to be a tractor "root nursery"? + ln = cache.nurseries.get(ctx_key) + assert not ln + ln = cache.nurseries[ctx_key] = service_n + + value = await ln.start(cache.run_ctx, mngr, key) + cache.users += 1 + cache.lock.release() + + yield False, value + + finally: + cache.users -= 1 + + if cache.lock.locked(): cache.lock.release() - yield False, value + if value is not None: + # if no more consumers, teardown the client + if cache.users <= 0: + log.warning(f'De-allocating feed for {key}') - finally: - cache.users -= 1 - - if cache.lock.locked(): - cache.lock.release() - - if value is not None: - # if no more consumers, teardown the client - if cache.users <= 0: - log.warning(f'De-allocating feed for {key}') - - # terminate mngr nursery - cache.no_more_users.set() + # terminate mngr nursery + cache.no_more_users.set()