diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index c54782b..09df201 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -35,6 +35,7 @@ from typing import ( import trio from trio_typing import TaskStatus +from .._state import current_actor from ..log import get_logger @@ -140,6 +141,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' + service_n: Optional[trio.Nursery] = None locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} @@ -147,7 +149,7 @@ class _Cache: Hashable, tuple[trio.Nursery, trio.Event] ] = {} - nurseries: dict[int, trio.Nursery] = {} + # nurseries: dict[int, trio.Nursery] = {} no_more_users: Optional[trio.Event] = None @classmethod @@ -205,6 +207,18 @@ async def maybe_open_context( lock = _Cache.locks.setdefault(fid, trio.Lock()) await lock.acquire() + # XXX: one singleton nursery per actor and we want to + # have it not be closed until all consumers have exited (which is + # currently difficult to implement any other way besides using our + # pre-allocated runtime instance..) + service_n: trio.Nursery = current_actor()._service_n + + # TODO: is there any way to allocate + # a 'stays-open-till-last-task-finshed nursery? + # service_n: trio.Nursery + # async with maybe_open_nursery(_Cache.service_n) as service_n: + # _Cache.service_n = service_n + try: # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler @@ -214,22 +228,19 @@ async def maybe_open_context( except KeyError: log.info(f'Allocating new {acm_func} for {ctx_key}') mngr = acm_func(**kwargs) - service_n: Optional[trio.Nursery] = _Cache.nurseries.get(fid) - async with maybe_open_nursery(service_n) as service_n: - _Cache.nurseries[fid] = service_n - resources = _Cache.resources - assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + resources[ctx_key] = (service_n, trio.Event()) - # sync up to the mngr's yielded value - yielded = await service_n.start( - _Cache.run_ctx, - mngr, - ctx_key, - ) - _Cache.users += 1 - lock.release() - yield False, yielded + # sync up to the mngr's yielded value + yielded = await service_n.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) + _Cache.users += 1 + lock.release() + yield False, yielded else: log.info(f'Reusing _Cached resource for {ctx_key}')