diff --git a/tractor/_portal.py b/tractor/_portal.py index 94a285b..de2da45 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -35,6 +35,7 @@ import warnings import trio from async_generator import asynccontextmanager +from .trionics import maybe_open_nursery from ._state import current_actor from ._ipc import Channel from .log import get_logger @@ -50,25 +51,6 @@ from ._streaming import Context, ReceiveMsgStream log = get_logger(__name__) -@asynccontextmanager -async def maybe_open_nursery( - nursery: trio.Nursery = None, - shield: bool = False, -) -> AsyncGenerator[trio.Nursery, Any]: - ''' - Create a new nursery if None provided. - - Blocks on exit as expected if no input nursery is provided. - - ''' - if nursery is not None: - yield nursery - else: - async with trio.open_nursery() as nursery: - nursery.cancel_scope.shield = shield - yield nursery - - def _unwrap_msg( msg: dict[str, Any], diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index 22b5bcd..31e49a9 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -21,6 +21,7 @@ Sugary patterns for trio + tractor designs. from ._mngrs import ( gather_contexts, maybe_open_context, + maybe_open_nursery, ) from ._broadcast import ( broadcast_receiver, @@ -35,4 +36,5 @@ __all__ = [ 'BroadcastReceiver', 'Lagged', 'maybe_open_context', + 'maybe_open_nursery', ] diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 5473a04..c54782b 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -19,6 +19,7 @@ Async context manager primitives with hard ``trio``-aware semantics ''' from contextlib import asynccontextmanager as acm +import inspect from typing import ( Any, AsyncContextManager, @@ -35,7 +36,6 @@ import trio from trio_typing import TaskStatus from ..log import get_logger -from .._state import current_actor log = get_logger(__name__) @@ -44,6 +44,25 @@ log = get_logger(__name__) T = TypeVar("T") +@acm +async def maybe_open_nursery( + nursery: trio.Nursery = None, + shield: bool = False, +) -> AsyncGenerator[trio.Nursery, Any]: + ''' + Create a new nursery if None provided. + + Blocks on exit as expected if no input nursery is provided. + + ''' + if nursery is not None: + yield nursery + else: + async with trio.open_nursery() as nursery: + nursery.cancel_scope.shield = shield + yield nursery + + async def _enter_and_wait( mngr: AsyncContextManager[T], @@ -128,6 +147,7 @@ class _Cache: Hashable, tuple[trio.Nursery, trio.Event] ] = {} + nurseries: dict[int, trio.Nursery] = {} no_more_users: Optional[trio.Event] = None @classmethod @@ -158,7 +178,7 @@ async def maybe_open_context( # XXX: used as cache key after conversion to tuple # and all embedded values must also be hashable kwargs: dict = {}, - key: Hashable = None, + key: Hashable | Callable[..., Hashable] = None, ) -> AsyncIterator[tuple[bool, T]]: ''' @@ -168,8 +188,14 @@ async def maybe_open_context( ''' fid = id(acm_func) - ctx_key = (fid, key or tuple(kwargs.items())) - value = None + + if inspect.isfunction(key): + ctx_key = (fid, key(**kwargs)) + else: + ctx_key = (fid, key or tuple(kwargs.items())) + + # yielded output + yielded: Any = None # Lock resource acquisition around task racing / ``trio``'s # scheduler protocol. @@ -183,40 +209,38 @@ async def maybe_open_context( # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler # may switch and by accident we create more then one resource. - value = _Cache.values[ctx_key] + yielded = _Cache.values[ctx_key] except KeyError: log.info(f'Allocating new {acm_func} for {ctx_key}') - mngr = acm_func(**kwargs) - # TODO: avoid pulling from ``tractor`` internals and - # instead offer a "root nursery" in piker actors? - service_n = current_actor()._service_n + service_n: Optional[trio.Nursery] = _Cache.nurseries.get(fid) + async with maybe_open_nursery(service_n) as service_n: + _Cache.nurseries[fid] = service_n + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + resources[ctx_key] = (service_n, trio.Event()) - # TODO: does this need to be a tractor "root nursery"? - resources = _Cache.resources - assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) - - value = await service_n.start( - _Cache.run_ctx, - mngr, - ctx_key, - ) - _Cache.users += 1 - lock.release() - yield False, value + # sync up to the mngr's yielded value + yielded = await service_n.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) + _Cache.users += 1 + lock.release() + yield False, yielded else: log.info(f'Reusing _Cached resource for {ctx_key}') _Cache.users += 1 lock.release() - yield True, value + yield True, yielded finally: _Cache.users -= 1 - if value is not None: + if yielded is not None: # if no more consumers, teardown the client if _Cache.users <= 0: log.info(f'De-allocating resource for {ctx_key}')