Use one nursery per unique manager (signature)

Instead of sticking all `trionics.maybe_open_context()` tasks inside the
actor's (root) service nursery, open a unique one per manager function
instance (id).

Further, accept a callable for the `key` such that a user can have
more flexible control on the caching logic and move the
`maybe_open_nursery()` helper out of the portal mod and into this
trionics "managers" module.
callable_key_maybe_open_context
Tyler Goodlet 2022-10-06 14:41:56 -04:00
parent 9e6266dda3
commit 7a719ac2a7
3 changed files with 51 additions and 43 deletions

View File

@ -35,6 +35,7 @@ import warnings
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from .trionics import maybe_open_nursery
from ._state import current_actor from ._state import current_actor
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
@ -50,25 +51,6 @@ from ._streaming import Context, ReceiveMsgStream
log = get_logger(__name__) log = get_logger(__name__)
@asynccontextmanager
async def maybe_open_nursery(
nursery: trio.Nursery = None,
shield: bool = False,
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
'''
if nursery is not None:
yield nursery
else:
async with trio.open_nursery() as nursery:
nursery.cancel_scope.shield = shield
yield nursery
def _unwrap_msg( def _unwrap_msg(
msg: dict[str, Any], msg: dict[str, Any],

View File

@ -21,6 +21,7 @@ Sugary patterns for trio + tractor designs.
from ._mngrs import ( from ._mngrs import (
gather_contexts, gather_contexts,
maybe_open_context, maybe_open_context,
maybe_open_nursery,
) )
from ._broadcast import ( from ._broadcast import (
broadcast_receiver, broadcast_receiver,
@ -35,4 +36,5 @@ __all__ = [
'BroadcastReceiver', 'BroadcastReceiver',
'Lagged', 'Lagged',
'maybe_open_context', 'maybe_open_context',
'maybe_open_nursery',
] ]

View File

@ -19,6 +19,7 @@ Async context manager primitives with hard ``trio``-aware semantics
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
import inspect
from typing import ( from typing import (
Any, Any,
AsyncContextManager, AsyncContextManager,
@ -35,7 +36,6 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from ..log import get_logger from ..log import get_logger
from .._state import current_actor
log = get_logger(__name__) log = get_logger(__name__)
@ -44,6 +44,25 @@ log = get_logger(__name__)
T = TypeVar("T") T = TypeVar("T")
@acm
async def maybe_open_nursery(
nursery: trio.Nursery = None,
shield: bool = False,
) -> AsyncGenerator[trio.Nursery, Any]:
'''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
'''
if nursery is not None:
yield nursery
else:
async with trio.open_nursery() as nursery:
nursery.cancel_scope.shield = shield
yield nursery
async def _enter_and_wait( async def _enter_and_wait(
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
@ -128,6 +147,7 @@ class _Cache:
Hashable, Hashable,
tuple[trio.Nursery, trio.Event] tuple[trio.Nursery, trio.Event]
] = {} ] = {}
nurseries: dict[int, trio.Nursery] = {}
no_more_users: Optional[trio.Event] = None no_more_users: Optional[trio.Event] = None
@classmethod @classmethod
@ -158,7 +178,7 @@ async def maybe_open_context(
# XXX: used as cache key after conversion to tuple # XXX: used as cache key after conversion to tuple
# and all embedded values must also be hashable # and all embedded values must also be hashable
kwargs: dict = {}, kwargs: dict = {},
key: Hashable = None, key: Hashable | Callable[..., Hashable] = None,
) -> AsyncIterator[tuple[bool, T]]: ) -> AsyncIterator[tuple[bool, T]]:
''' '''
@ -168,8 +188,14 @@ async def maybe_open_context(
''' '''
fid = id(acm_func) fid = id(acm_func)
if inspect.isfunction(key):
ctx_key = (fid, key(**kwargs))
else:
ctx_key = (fid, key or tuple(kwargs.items())) ctx_key = (fid, key or tuple(kwargs.items()))
value = None
# yielded output
yielded: Any = None
# Lock resource acquisition around task racing / ``trio``'s # Lock resource acquisition around task racing / ``trio``'s
# scheduler protocol. # scheduler protocol.
@ -183,40 +209,38 @@ async def maybe_open_context(
# **critical section** that should prevent other tasks from # **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler # checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one resource. # may switch and by accident we create more then one resource.
value = _Cache.values[ctx_key] yielded = _Cache.values[ctx_key]
except KeyError: except KeyError:
log.info(f'Allocating new {acm_func} for {ctx_key}') log.info(f'Allocating new {acm_func} for {ctx_key}')
mngr = acm_func(**kwargs) mngr = acm_func(**kwargs)
# TODO: avoid pulling from ``tractor`` internals and service_n: Optional[trio.Nursery] = _Cache.nurseries.get(fid)
# instead offer a "root nursery" in piker actors? async with maybe_open_nursery(service_n) as service_n:
service_n = current_actor()._service_n _Cache.nurseries[fid] = service_n
# TODO: does this need to be a tractor "root nursery"?
resources = _Cache.resources resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event()) resources[ctx_key] = (service_n, trio.Event())
value = await service_n.start( # sync up to the mngr's yielded value
yielded = await service_n.start(
_Cache.run_ctx, _Cache.run_ctx,
mngr, mngr,
ctx_key, ctx_key,
) )
_Cache.users += 1 _Cache.users += 1
lock.release() lock.release()
yield False, value yield False, yielded
else: else:
log.info(f'Reusing _Cached resource for {ctx_key}') log.info(f'Reusing _Cached resource for {ctx_key}')
_Cache.users += 1 _Cache.users += 1
lock.release() lock.release()
yield True, value yield True, yielded
finally: finally:
_Cache.users -= 1 _Cache.users -= 1
if value is not None: if yielded is not None:
# if no more consumers, teardown the client # if no more consumers, teardown the client
if _Cache.users <= 0: if _Cache.users <= 0:
log.info(f'De-allocating resource for {ctx_key}') log.info(f'De-allocating resource for {ctx_key}')