From 44b59f3338f21ced4deeada08cd1af4837eed215 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Oct 2022 12:45:15 -0400 Subject: [PATCH] Go back to a `global` single-ton nursery per actor Turns out the lifetime mgmt of separate nurseries per delegate manager is tricky; a new nursery can't be naively allocated on cache-misses since it may get closed by some early terminating task instead of by the "last using" consumer task. In theory if we allocate using the same logic as that used for the last-task-triggers-exit then this should work? For now just go back to a single global nursery per `_Cache` which still avoids use of the internal actor service nursery. --- tractor/trionics/_mngrs.py | 43 ++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index c54782b..09df201 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -35,6 +35,7 @@ from typing import ( import trio from trio_typing import TaskStatus +from .._state import current_actor from ..log import get_logger @@ -140,6 +141,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' + service_n: Optional[trio.Nursery] = None locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} @@ -147,7 +149,7 @@ class _Cache: Hashable, tuple[trio.Nursery, trio.Event] ] = {} - nurseries: dict[int, trio.Nursery] = {} + # nurseries: dict[int, trio.Nursery] = {} no_more_users: Optional[trio.Event] = None @classmethod @@ -205,6 +207,18 @@ async def maybe_open_context( lock = _Cache.locks.setdefault(fid, trio.Lock()) await lock.acquire() + # XXX: one singleton nursery per actor and we want to + # have it not be closed until all consumers have exited (which is + # currently difficult to implement any other way besides using our + # pre-allocated runtime instance..) + service_n: trio.Nursery = current_actor()._service_n + + # TODO: is there any way to allocate + # a 'stays-open-till-last-task-finshed nursery? + # service_n: trio.Nursery + # async with maybe_open_nursery(_Cache.service_n) as service_n: + # _Cache.service_n = service_n + try: # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler @@ -214,22 +228,19 @@ async def maybe_open_context( except KeyError: log.info(f'Allocating new {acm_func} for {ctx_key}') mngr = acm_func(**kwargs) - service_n: Optional[trio.Nursery] = _Cache.nurseries.get(fid) - async with maybe_open_nursery(service_n) as service_n: - _Cache.nurseries[fid] = service_n - resources = _Cache.resources - assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + resources[ctx_key] = (service_n, trio.Event()) - # sync up to the mngr's yielded value - yielded = await service_n.start( - _Cache.run_ctx, - mngr, - ctx_key, - ) - _Cache.users += 1 - lock.release() - yield False, yielded + # sync up to the mngr's yielded value + yielded = await service_n.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) + _Cache.users += 1 + lock.release() + yield False, yielded else: log.info(f'Reusing _Cached resource for {ctx_key}')