diff --git a/nooz/336.trivial.rst b/nooz/336.trivial.rst new file mode 100644 index 0000000..63b5c19 --- /dev/null +++ b/nooz/336.trivial.rst @@ -0,0 +1,4 @@ +Add ``key: Callable[..., Hashable]`` support to ``.trionics.maybe_open_context()`` + +Gives users finer grained control over cache hit behaviour using +a callable which receives the input ``kwargs: dict``. diff --git a/tractor/_portal.py b/tractor/_portal.py index 94a285b..de2da45 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -35,6 +35,7 @@ import warnings import trio from async_generator import asynccontextmanager +from .trionics import maybe_open_nursery from ._state import current_actor from ._ipc import Channel from .log import get_logger @@ -50,25 +51,6 @@ from ._streaming import Context, ReceiveMsgStream log = get_logger(__name__) -@asynccontextmanager -async def maybe_open_nursery( - nursery: trio.Nursery = None, - shield: bool = False, -) -> AsyncGenerator[trio.Nursery, Any]: - ''' - Create a new nursery if None provided. - - Blocks on exit as expected if no input nursery is provided. - - ''' - if nursery is not None: - yield nursery - else: - async with trio.open_nursery() as nursery: - nursery.cancel_scope.shield = shield - yield nursery - - def _unwrap_msg( msg: dict[str, Any], diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index 22b5bcd..31e49a9 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -21,6 +21,7 @@ Sugary patterns for trio + tractor designs. from ._mngrs import ( gather_contexts, maybe_open_context, + maybe_open_nursery, ) from ._broadcast import ( broadcast_receiver, @@ -35,4 +36,5 @@ __all__ = [ 'BroadcastReceiver', 'Lagged', 'maybe_open_context', + 'maybe_open_nursery', ] diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 5473a04..09df201 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -19,6 +19,7 @@ Async context manager primitives with hard ``trio``-aware semantics ''' from contextlib import asynccontextmanager as acm +import inspect from typing import ( Any, AsyncContextManager, @@ -34,8 +35,8 @@ from typing import ( import trio from trio_typing import TaskStatus -from ..log import get_logger from .._state import current_actor +from ..log import get_logger log = get_logger(__name__) @@ -44,6 +45,25 @@ log = get_logger(__name__) T = TypeVar("T") +@acm +async def maybe_open_nursery( + nursery: trio.Nursery = None, + shield: bool = False, +) -> AsyncGenerator[trio.Nursery, Any]: + ''' + Create a new nursery if None provided. + + Blocks on exit as expected if no input nursery is provided. + + ''' + if nursery is not None: + yield nursery + else: + async with trio.open_nursery() as nursery: + nursery.cancel_scope.shield = shield + yield nursery + + async def _enter_and_wait( mngr: AsyncContextManager[T], @@ -121,6 +141,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' + service_n: Optional[trio.Nursery] = None locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} @@ -128,6 +149,7 @@ class _Cache: Hashable, tuple[trio.Nursery, trio.Event] ] = {} + # nurseries: dict[int, trio.Nursery] = {} no_more_users: Optional[trio.Event] = None @classmethod @@ -158,7 +180,7 @@ async def maybe_open_context( # XXX: used as cache key after conversion to tuple # and all embedded values must also be hashable kwargs: dict = {}, - key: Hashable = None, + key: Hashable | Callable[..., Hashable] = None, ) -> AsyncIterator[tuple[bool, T]]: ''' @@ -168,8 +190,14 @@ async def maybe_open_context( ''' fid = id(acm_func) - ctx_key = (fid, key or tuple(kwargs.items())) - value = None + + if inspect.isfunction(key): + ctx_key = (fid, key(**kwargs)) + else: + ctx_key = (fid, key or tuple(kwargs.items())) + + # yielded output + yielded: Any = None # Lock resource acquisition around task racing / ``trio``'s # scheduler protocol. @@ -179,44 +207,51 @@ async def maybe_open_context( lock = _Cache.locks.setdefault(fid, trio.Lock()) await lock.acquire() + # XXX: one singleton nursery per actor and we want to + # have it not be closed until all consumers have exited (which is + # currently difficult to implement any other way besides using our + # pre-allocated runtime instance..) + service_n: trio.Nursery = current_actor()._service_n + + # TODO: is there any way to allocate + # a 'stays-open-till-last-task-finshed nursery? + # service_n: trio.Nursery + # async with maybe_open_nursery(_Cache.service_n) as service_n: + # _Cache.service_n = service_n + try: # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler # may switch and by accident we create more then one resource. - value = _Cache.values[ctx_key] + yielded = _Cache.values[ctx_key] except KeyError: log.info(f'Allocating new {acm_func} for {ctx_key}') - mngr = acm_func(**kwargs) - # TODO: avoid pulling from ``tractor`` internals and - # instead offer a "root nursery" in piker actors? - service_n = current_actor()._service_n - - # TODO: does this need to be a tractor "root nursery"? resources = _Cache.resources assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' resources[ctx_key] = (service_n, trio.Event()) - value = await service_n.start( + # sync up to the mngr's yielded value + yielded = await service_n.start( _Cache.run_ctx, mngr, ctx_key, ) _Cache.users += 1 lock.release() - yield False, value + yield False, yielded else: log.info(f'Reusing _Cached resource for {ctx_key}') _Cache.users += 1 lock.release() - yield True, value + yield True, yielded finally: _Cache.users -= 1 - if value is not None: + if yielded is not None: # if no more consumers, teardown the client if _Cache.users <= 0: log.info(f'De-allocating resource for {ctx_key}')