From 7a719ac2a73bf3976d615d472b299f128390bf8c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Oct 2022 14:41:56 -0400 Subject: [PATCH 1/3] Use one nursery per unique manager (signature) Instead of sticking all `trionics.maybe_open_context()` tasks inside the actor's (root) service nursery, open a unique one per manager function instance (id). Further, accept a callable for the `key` such that a user can have more flexible control on the caching logic and move the `maybe_open_nursery()` helper out of the portal mod and into this trionics "managers" module. --- tractor/_portal.py | 20 +--------- tractor/trionics/__init__.py | 2 + tractor/trionics/_mngrs.py | 72 ++++++++++++++++++++++++------------ 3 files changed, 51 insertions(+), 43 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 94a285b..de2da45 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -35,6 +35,7 @@ import warnings import trio from async_generator import asynccontextmanager +from .trionics import maybe_open_nursery from ._state import current_actor from ._ipc import Channel from .log import get_logger @@ -50,25 +51,6 @@ from ._streaming import Context, ReceiveMsgStream log = get_logger(__name__) -@asynccontextmanager -async def maybe_open_nursery( - nursery: trio.Nursery = None, - shield: bool = False, -) -> AsyncGenerator[trio.Nursery, Any]: - ''' - Create a new nursery if None provided. - - Blocks on exit as expected if no input nursery is provided. - - ''' - if nursery is not None: - yield nursery - else: - async with trio.open_nursery() as nursery: - nursery.cancel_scope.shield = shield - yield nursery - - def _unwrap_msg( msg: dict[str, Any], diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index 22b5bcd..31e49a9 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -21,6 +21,7 @@ Sugary patterns for trio + tractor designs. from ._mngrs import ( gather_contexts, maybe_open_context, + maybe_open_nursery, ) from ._broadcast import ( broadcast_receiver, @@ -35,4 +36,5 @@ __all__ = [ 'BroadcastReceiver', 'Lagged', 'maybe_open_context', + 'maybe_open_nursery', ] diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 5473a04..c54782b 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -19,6 +19,7 @@ Async context manager primitives with hard ``trio``-aware semantics ''' from contextlib import asynccontextmanager as acm +import inspect from typing import ( Any, AsyncContextManager, @@ -35,7 +36,6 @@ import trio from trio_typing import TaskStatus from ..log import get_logger -from .._state import current_actor log = get_logger(__name__) @@ -44,6 +44,25 @@ log = get_logger(__name__) T = TypeVar("T") +@acm +async def maybe_open_nursery( + nursery: trio.Nursery = None, + shield: bool = False, +) -> AsyncGenerator[trio.Nursery, Any]: + ''' + Create a new nursery if None provided. + + Blocks on exit as expected if no input nursery is provided. + + ''' + if nursery is not None: + yield nursery + else: + async with trio.open_nursery() as nursery: + nursery.cancel_scope.shield = shield + yield nursery + + async def _enter_and_wait( mngr: AsyncContextManager[T], @@ -128,6 +147,7 @@ class _Cache: Hashable, tuple[trio.Nursery, trio.Event] ] = {} + nurseries: dict[int, trio.Nursery] = {} no_more_users: Optional[trio.Event] = None @classmethod @@ -158,7 +178,7 @@ async def maybe_open_context( # XXX: used as cache key after conversion to tuple # and all embedded values must also be hashable kwargs: dict = {}, - key: Hashable = None, + key: Hashable | Callable[..., Hashable] = None, ) -> AsyncIterator[tuple[bool, T]]: ''' @@ -168,8 +188,14 @@ async def maybe_open_context( ''' fid = id(acm_func) - ctx_key = (fid, key or tuple(kwargs.items())) - value = None + + if inspect.isfunction(key): + ctx_key = (fid, key(**kwargs)) + else: + ctx_key = (fid, key or tuple(kwargs.items())) + + # yielded output + yielded: Any = None # Lock resource acquisition around task racing / ``trio``'s # scheduler protocol. @@ -183,40 +209,38 @@ async def maybe_open_context( # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler # may switch and by accident we create more then one resource. - value = _Cache.values[ctx_key] + yielded = _Cache.values[ctx_key] except KeyError: log.info(f'Allocating new {acm_func} for {ctx_key}') - mngr = acm_func(**kwargs) - # TODO: avoid pulling from ``tractor`` internals and - # instead offer a "root nursery" in piker actors? - service_n = current_actor()._service_n + service_n: Optional[trio.Nursery] = _Cache.nurseries.get(fid) + async with maybe_open_nursery(service_n) as service_n: + _Cache.nurseries[fid] = service_n + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + resources[ctx_key] = (service_n, trio.Event()) - # TODO: does this need to be a tractor "root nursery"? - resources = _Cache.resources - assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) - - value = await service_n.start( - _Cache.run_ctx, - mngr, - ctx_key, - ) - _Cache.users += 1 - lock.release() - yield False, value + # sync up to the mngr's yielded value + yielded = await service_n.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) + _Cache.users += 1 + lock.release() + yield False, yielded else: log.info(f'Reusing _Cached resource for {ctx_key}') _Cache.users += 1 lock.release() - yield True, value + yield True, yielded finally: _Cache.users -= 1 - if value is not None: + if yielded is not None: # if no more consumers, teardown the client if _Cache.users <= 0: log.info(f'De-allocating resource for {ctx_key}') From 44b59f3338f21ced4deeada08cd1af4837eed215 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Oct 2022 12:45:15 -0400 Subject: [PATCH 2/3] Go back to a `global` single-ton nursery per actor Turns out the lifetime mgmt of separate nurseries per delegate manager is tricky; a new nursery can't be naively allocated on cache-misses since it may get closed by some early terminating task instead of by the "last using" consumer task. In theory if we allocate using the same logic as that used for the last-task-triggers-exit then this should work? For now just go back to a single global nursery per `_Cache` which still avoids use of the internal actor service nursery. --- tractor/trionics/_mngrs.py | 43 ++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index c54782b..09df201 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -35,6 +35,7 @@ from typing import ( import trio from trio_typing import TaskStatus +from .._state import current_actor from ..log import get_logger @@ -140,6 +141,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' + service_n: Optional[trio.Nursery] = None locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} @@ -147,7 +149,7 @@ class _Cache: Hashable, tuple[trio.Nursery, trio.Event] ] = {} - nurseries: dict[int, trio.Nursery] = {} + # nurseries: dict[int, trio.Nursery] = {} no_more_users: Optional[trio.Event] = None @classmethod @@ -205,6 +207,18 @@ async def maybe_open_context( lock = _Cache.locks.setdefault(fid, trio.Lock()) await lock.acquire() + # XXX: one singleton nursery per actor and we want to + # have it not be closed until all consumers have exited (which is + # currently difficult to implement any other way besides using our + # pre-allocated runtime instance..) + service_n: trio.Nursery = current_actor()._service_n + + # TODO: is there any way to allocate + # a 'stays-open-till-last-task-finshed nursery? + # service_n: trio.Nursery + # async with maybe_open_nursery(_Cache.service_n) as service_n: + # _Cache.service_n = service_n + try: # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler @@ -214,22 +228,19 @@ async def maybe_open_context( except KeyError: log.info(f'Allocating new {acm_func} for {ctx_key}') mngr = acm_func(**kwargs) - service_n: Optional[trio.Nursery] = _Cache.nurseries.get(fid) - async with maybe_open_nursery(service_n) as service_n: - _Cache.nurseries[fid] = service_n - resources = _Cache.resources - assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + resources[ctx_key] = (service_n, trio.Event()) - # sync up to the mngr's yielded value - yielded = await service_n.start( - _Cache.run_ctx, - mngr, - ctx_key, - ) - _Cache.users += 1 - lock.release() - yield False, yielded + # sync up to the mngr's yielded value + yielded = await service_n.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) + _Cache.users += 1 + lock.release() + yield False, yielded else: log.info(f'Reusing _Cached resource for {ctx_key}') From b892bc74f6ef8a9f0b37b1d472a7c0b7427407fe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 8 Oct 2022 14:31:29 -0400 Subject: [PATCH 3/3] Add trivial news snippet --- nooz/336.trivial.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 nooz/336.trivial.rst diff --git a/nooz/336.trivial.rst b/nooz/336.trivial.rst new file mode 100644 index 0000000..63b5c19 --- /dev/null +++ b/nooz/336.trivial.rst @@ -0,0 +1,4 @@ +Add ``key: Callable[..., Hashable]`` support to ``.trionics.maybe_open_context()`` + +Gives users finer grained control over cache hit behaviour using +a callable which receives the input ``kwargs: dict``.