diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index af60c70..757af61 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -24,6 +24,7 @@ from typing import ( AsyncContextManager, AsyncGenerator, AsyncIterator, + Callable, Hashable, Optional, Sequence, @@ -131,69 +132,76 @@ class _Cache: async def run_ctx( cls, mng, - key, + ctx_key: tuple, task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, ) -> None: async with mng as value: - - _, no_more_users = cls.resources[id(mng)] - cls.values[key] = value + _, no_more_users = cls.resources[ctx_key] + cls.values[ctx_key] = value task_status.started(value) try: await no_more_users.wait() finally: - value = cls.values.pop(key) - # discard nursery ref so it won't be re-used (an error) - cls.resources.pop(id(mng)) + # discard nursery ref so it won't be re-used (an error)? + value = cls.values.pop(ctx_key) + cls.resources.pop(ctx_key) @acm async def maybe_open_context( - key: Hashable, - mngr: AsyncContextManager[T], + acm_func: Callable[..., AsyncContextManager[T]], + + # XXX: used as cache key after conversion to tuple + # and all embedded values must also be hashable + kwargs: Optional[dict] = {}, + key: Hashable = None, ) -> AsyncIterator[tuple[bool, T]]: ''' Maybe open a context manager if there is not already a _Cached - version for the provided ``key``. Return the _Cached instance on - a _Cache hit. + version for the provided ``key`` for *this* actor. Return the + _Cached instance on a _Cache hit. ''' # lock resource acquisition around task racing / ``trio``'s # scheduler protocol await _Cache.lock.acquire() - ctx_key = id(mngr) + ctx_key = (id(acm_func), key or tuple(kwargs.items())) value = None try: - value = _Cache.values[key] - - except KeyError: - log.info(f'Allocating new resource for {key}') - # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler - # may switch and by accident we create more then one feed. + # may switch and by accident we create more then one resource. + value = _Cache.values[ctx_key] + except KeyError: + log.info(f'Allocating new resource for {ctx_key}') + + mngr = acm_func(**kwargs) # TODO: avoid pulling from ``tractor`` internals and # instead offer a "root nursery" in piker actors? service_n = current_actor()._service_n # TODO: does this need to be a tractor "root nursery"? - assert not _Cache.resources.get(ctx_key), f'Resource exists? {ctx_key}' - ln, _ = _Cache.resources[ctx_key] = (service_n, trio.Event()) + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + ln, _ = resources[ctx_key] = (service_n, trio.Event()) - value = await ln.start(_Cache.run_ctx, mngr, key) + value = await ln.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) _Cache.users += 1 _Cache.lock.release() - yield False, value else: - log.info(f'Reusing _Cached resource for {key}') + log.info(f'Reusing _Cached resource for {ctx_key}') _Cache.users += 1 _Cache.lock.release() yield True, value @@ -204,13 +212,8 @@ async def maybe_open_context( if value is not None: # if no more consumers, teardown the client if _Cache.users <= 0: - log.info(f'De-allocating resource for {key}') - - if _Cache.lock.locked(): - _Cache.lock.release() + log.info(f'De-allocating resource for {ctx_key}') # terminate mngr nursery - entry = _Cache.resources.get(ctx_key) - if entry: - _, no_more_users = entry - no_more_users.set() + _, no_more_users = _Cache.resources[ctx_key] + no_more_users.set()