diff --git a/newsfragments/257.feature.rst b/newsfragments/257.feature.rst new file mode 100644 index 0000000..1183632 --- /dev/null +++ b/newsfragments/257.feature.rst @@ -0,0 +1,9 @@ +Add ``trionics.maybe_open_context()`` an actor-scoped async multi-task +context manager resource caching API. + +Adds an SC-safe cacheing async context manager api that only enters on +the *first* task entry and only exits on the *last* task exit while in +between delivering the same cached value per input key. Keys can be +either an explicit ``key`` named arg provided by the user or a +hashable ``kwargs`` dict (will be converted to a ``list[tuple]``) which +is passed to the underlying manager function as input. diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py new file mode 100644 index 0000000..0e7ad74 --- /dev/null +++ b/tests/test_resource_cache.py @@ -0,0 +1,182 @@ +''' +Async context manager cache api testing: ``trionics.maybe_open_context():`` + +''' +from contextlib import asynccontextmanager as acm +import platform +from typing import Awaitable + +import pytest +import trio +import tractor + + +_resource: int = 0 + + +@acm +async def maybe_increment_counter(task_name: str): + global _resource + + _resource += 1 + await trio.lowlevel.checkpoint() + yield _resource + await trio.lowlevel.checkpoint() + _resource -= 1 + + +@pytest.mark.parametrize( + 'key_on', + ['key_value', 'kwargs'], + ids="key_on={}".format, +) +def test_resource_only_entered_once(key_on): + global _resource + _resource = 0 + + kwargs = {} + key = None + if key_on == 'key_value': + key = 'some_common_key' + + async def main(): + cache_active: bool = False + + async def enter_cached_mngr(name: str): + nonlocal cache_active + + if key_on == 'kwargs': + # make a common kwargs input to key on it + kwargs = {'task_name': 'same_task_name'} + assert key is None + else: + # different task names per task will be used + kwargs = {'task_name': name} + + async with tractor.trionics.maybe_open_context( + maybe_increment_counter, + kwargs=kwargs, + key=key, + + ) as (cache_hit, resource): + if cache_hit: + try: + cache_active = True + assert resource == 1 + await trio.sleep_forever() + finally: + cache_active = False + else: + assert resource == 1 + await trio.sleep_forever() + + with trio.move_on_after(0.5): + async with ( + tractor.open_root_actor(), + trio.open_nursery() as n, + ): + + for i in range(10): + n.start_soon(enter_cached_mngr, f'task_{i}') + await trio.sleep(0.001) + + trio.run(main) + + +@tractor.context +async def streamer( + ctx: tractor.Context, + seq: list[int] = list(range(1000)), +) -> None: + + await ctx.started() + async with ctx.open_stream() as stream: + for val in seq: + await stream.send(val) + await trio.sleep(0.001) + + print('producer finished') + + +@acm +async def open_stream() -> Awaitable[tractor.MsgStream]: + + async with tractor.open_nursery() as tn: + portal = await tn.start_actor('streamer', enable_modules=[__name__]) + async with ( + portal.open_context(streamer) as (ctx, first), + ctx.open_stream() as stream, + ): + yield stream + + await portal.cancel_actor() + print('CANCELLED STREAMER') + + +@acm +async def maybe_open_stream(taskname: str): + async with tractor.trionics.maybe_open_context( + # NOTE: all secondary tasks should cache hit on the same key + acm_func=open_stream, + ) as (cache_hit, stream): + + if cache_hit: + print(f'{taskname} loaded from cache') + + # add a new broadcast subscription for the quote stream + # if this feed is already allocated by the first + # task that entereed + async with stream.subscribe() as bstream: + yield bstream + else: + # yield the actual stream + yield stream + + +def test_open_local_sub_to_stream(): + ''' + Verify a single inter-actor stream can can be fanned-out shared to + N local tasks using ``trionics.maybe_open_context():``. + + ''' + timeout = 3 if platform.system() != "Windows" else 10 + + async def main(): + + full = list(range(1000)) + + async def get_sub_and_pull(taskname: str): + async with ( + maybe_open_stream(taskname) as stream, + ): + if '0' in taskname: + assert isinstance(stream, tractor.MsgStream) + else: + assert isinstance( + stream, + tractor.trionics.BroadcastReceiver + ) + + first = await stream.receive() + print(f'{taskname} started with value {first}') + seq = [] + async for msg in stream: + seq.append(msg) + + assert set(seq).issubset(set(full)) + print(f'{taskname} finished') + + with trio.fail_after(timeout): + # TODO: turns out this isn't multi-task entrant XD + # We probably need an indepotent entry semantic? + async with tractor.open_root_actor(): + async with ( + trio.open_nursery() as nurse, + ): + for i in range(10): + nurse.start_soon(get_sub_and_pull, f'task_{i}') + await trio.sleep(0.001) + + print('all consumer tasks finished') + + trio.run(main) diff --git a/tractor/_root.py b/tractor/_root.py index fc727d4..3468a25 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -215,6 +215,14 @@ async def open_root_actor( raise finally: + # NOTE: not sure if we'll ever need this but it's + # possibly better for even more determinism? + # logger.cancel(f'Waiting on {len(nurseries)} nurseries in root..') + # nurseries = actor._actoruid2nursery.values() + # async with trio.open_nursery() as tempn: + # for an in nurseries: + # tempn.start_soon(an.exited.wait) + logger.cancel("Shutting down root actor") await actor.cancel() finally: diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index 2090a6e..22b5bcd 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -18,8 +18,15 @@ Sugary patterns for trio + tractor designs. ''' -from ._mngrs import gather_contexts -from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged +from ._mngrs import ( + gather_contexts, + maybe_open_context, +) +from ._broadcast import ( + broadcast_receiver, + BroadcastReceiver, + Lagged, +) __all__ = [ @@ -27,4 +34,5 @@ __all__ = [ 'broadcast_receiver', 'BroadcastReceiver', 'Lagged', + 'maybe_open_context', ] diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 35711b2..77ab6d0 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -331,10 +331,16 @@ class BroadcastReceiver(ReceiveChannel): if self._closed: return + # if there are sleeping consumers wake + # them on closure. + rr = self._state.recv_ready + if rr: + _, event = rr + event.set() + # XXX: leaving it like this consumers can still get values # up to the last received that still reside in the queue. self._state.subs.pop(self.key) - self._closed = True diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 909fe42..3834983 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -18,12 +18,27 @@ Async context manager primitives with hard ``trio``-aware semantics ''' -from typing import AsyncContextManager, AsyncGenerator -from typing import TypeVar, Sequence from contextlib import asynccontextmanager as acm +from typing import ( + Any, + AsyncContextManager, + AsyncGenerator, + AsyncIterator, + Callable, + Hashable, + Optional, + Sequence, + TypeVar, +) import trio +from trio_typing import TaskStatus +from ..log import get_logger +from .._state import current_actor + + +log = get_logger(__name__) # A regular invariant generic type T = TypeVar("T") @@ -92,3 +107,118 @@ async def gather_contexts( # we don't need a try/finally since cancellation will be triggered # by the surrounding nursery on error. parent_exit.set() + + +# Per actor task caching helpers. +# Further potential examples of interest: +# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8 + +class _Cache: + ''' + Globally (actor-processs scoped) cached, task access to + a kept-alive-while-in-use async resource. + + ''' + lock = trio.Lock() + users: int = 0 + values: dict[Any, Any] = {} + resources: dict[ + Hashable, + tuple[trio.Nursery, trio.Event] + ] = {} + no_more_users: Optional[trio.Event] = None + + @classmethod + async def run_ctx( + cls, + mng, + ctx_key: tuple, + task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + + ) -> None: + async with mng as value: + _, no_more_users = cls.resources[ctx_key] + cls.values[ctx_key] = value + task_status.started(value) + try: + await no_more_users.wait() + finally: + # discard nursery ref so it won't be re-used (an error)? + value = cls.values.pop(ctx_key) + cls.resources.pop(ctx_key) + + +@acm +async def maybe_open_context( + + acm_func: Callable[..., AsyncContextManager[T]], + + # XXX: used as cache key after conversion to tuple + # and all embedded values must also be hashable + kwargs: dict = {}, + key: Hashable = None, + +) -> AsyncIterator[tuple[bool, T]]: + ''' + Maybe open a context manager if there is not already a _Cached + version for the provided ``key`` for *this* actor. Return the + _Cached instance on a _Cache hit. + + ''' + # lock resource acquisition around task racing / ``trio``'s + # scheduler protocol + await _Cache.lock.acquire() + + ctx_key = (id(acm_func), key or tuple(kwargs.items())) + print(ctx_key) + value = None + + try: + # **critical section** that should prevent other tasks from + # checking the _Cache until complete otherwise the scheduler + # may switch and by accident we create more then one resource. + value = _Cache.values[ctx_key] + + except KeyError: + log.info(f'Allocating new resource for {ctx_key}') + + mngr = acm_func(**kwargs) + # TODO: avoid pulling from ``tractor`` internals and + # instead offer a "root nursery" in piker actors? + service_n = current_actor()._service_n + + # TODO: does this need to be a tractor "root nursery"? + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + ln, _ = resources[ctx_key] = (service_n, trio.Event()) + + value = await ln.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) + _Cache.users += 1 + _Cache.lock.release() + yield False, value + + else: + log.info(f'Reusing _Cached resource for {ctx_key}') + _Cache.users += 1 + _Cache.lock.release() + yield True, value + + finally: + _Cache.users -= 1 + + if value is not None: + # if no more consumers, teardown the client + if _Cache.users <= 0: + log.info(f'De-allocating resource for {ctx_key}') + + # XXX: if we're cancelled we the entry may have never + # been entered since the nursery task was killed. + # _, no_more_users = _Cache.resources[ctx_key] + entry = _Cache.resources.get(ctx_key) + if entry: + _, no_more_users = entry + no_more_users.set()