From 5f41dbf34f263b7d1a1be1de7731d0180118436d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Oct 2021 14:01:39 -0400 Subject: [PATCH 01/15] Add `maybe_open_context()` an actor wide task-resource cache --- tractor/trionics/__init__.py | 12 +++- tractor/trionics/_mngrs.py | 125 ++++++++++++++++++++++++++++++++++- 2 files changed, 133 insertions(+), 4 deletions(-) diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index 2090a6e..22b5bcd 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -18,8 +18,15 @@ Sugary patterns for trio + tractor designs. ''' -from ._mngrs import gather_contexts -from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged +from ._mngrs import ( + gather_contexts, + maybe_open_context, +) +from ._broadcast import ( + broadcast_receiver, + BroadcastReceiver, + Lagged, +) __all__ = [ @@ -27,4 +34,5 @@ __all__ = [ 'broadcast_receiver', 'BroadcastReceiver', 'Lagged', + 'maybe_open_context', ] diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 909fe42..115311e 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -18,12 +18,25 @@ Async context manager primitives with hard ``trio``-aware semantics ''' -from typing import AsyncContextManager, AsyncGenerator -from typing import TypeVar, Sequence from contextlib import asynccontextmanager as acm +from typing import ( + Any, + AsyncContextManager, + AsyncGenerator, + Hashable, + Optional, + Sequence, + TypeVar, +) import trio +from trio_typing import TaskStatus +from ..log import get_logger +from .._state import current_actor + + +log = get_logger(__name__) # A regular invariant generic type T = TypeVar("T") @@ -92,3 +105,111 @@ async def gather_contexts( # we don't need a try/finally since cancellation will be triggered # by the surrounding nursery on error. parent_exit.set() + + +# Per actor task caching helpers. +# Further potential examples of interest: +# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8 + +class cache: + ''' + Globally (processs wide) cached, task access to a + kept-alive-while-in-use async resource. + + ''' + lock = trio.Lock() + users: int = 0 + values: dict[Any, Any] = {} + resources: dict[ + int, + Optional[tuple[trio.Nursery, trio.Event]] + ] = {} + no_more_users: Optional[trio.Event] = None + + @classmethod + async def run_ctx( + cls, + mng, + key, + task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + + ) -> None: + async with mng as value: + + _, no_more_users = cls.resources[id(mng)] + cls.values[key] = value + task_status.started(value) + try: + await no_more_users.wait() + finally: + value = cls.values.pop(key) + # discard nursery ref so it won't be re-used (an error) + cls.resources.pop(id(mng)) + + +@acm +async def maybe_open_context( + + key: Hashable, + mngr: AsyncContextManager[T], + +) -> (bool, T): + ''' + Maybe open a context manager if there is not already a cached + version for the provided ``key``. Return the cached instance on + a cache hit. + + ''' + await cache.lock.acquire() + + ctx_key = id(mngr) + + value = None + try: + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + value = cache.values[key] + log.info(f'Reusing cached resource for {key}') + cache.users += 1 + cache.lock.release() + yield True, value + + except KeyError: + log.info(f'Allocating new resource for {key}') + + # **critical section** that should prevent other tasks from + # checking the cache until complete otherwise the scheduler + # may switch and by accident we create more then one feed. + + # TODO: avoid pulling from ``tractor`` internals and + # instead offer a "root nursery" in piker actors? + service_n = current_actor()._service_n + + # TODO: does this need to be a tractor "root nursery"? + ln = cache.resources.get(ctx_key) + assert not ln + + ln, _ = cache.resources[ctx_key] = (service_n, trio.Event()) + + value = await ln.start(cache.run_ctx, mngr, key) + cache.users += 1 + cache.lock.release() + + yield False, value + + finally: + cache.users -= 1 + + if cache.lock.locked(): + cache.lock.release() + + if value is not None: + # if no more consumers, teardown the client + if cache.users <= 0: + log.info(f'De-allocating resource for {key}') + + # terminate mngr nursery + entry = cache.resources.get(ctx_key) + if entry: + _, no_more_users = entry + no_more_users.set() From ac22b4a875ded5f87d69eb76be19645153e2e0a7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 28 Nov 2021 12:48:26 -0500 Subject: [PATCH 02/15] Fix type annots in resource cacher internals --- tractor/trionics/_mngrs.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 115311e..e2d9acc 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -23,6 +23,7 @@ from typing import ( Any, AsyncContextManager, AsyncGenerator, + AsyncIterator, Hashable, Optional, Sequence, @@ -122,7 +123,7 @@ class cache: values: dict[Any, Any] = {} resources: dict[ int, - Optional[tuple[trio.Nursery, trio.Event]] + tuple[trio.Nursery, trio.Event] ] = {} no_more_users: Optional[trio.Event] = None @@ -153,7 +154,7 @@ async def maybe_open_context( key: Hashable, mngr: AsyncContextManager[T], -) -> (bool, T): +) -> AsyncIterator[tuple[bool, T]]: ''' Maybe open a context manager if there is not already a cached version for the provided ``key``. Return the cached instance on @@ -186,9 +187,7 @@ async def maybe_open_context( service_n = current_actor()._service_n # TODO: does this need to be a tractor "root nursery"? - ln = cache.resources.get(ctx_key) - assert not ln - + assert not cache.resources.get(ctx_key), f'Resource exists? {ctx_key}' ln, _ = cache.resources[ctx_key] = (service_n, trio.Event()) value = await ln.start(cache.run_ctx, mngr, key) From 4a0252baf2d367cef8473a0a178b1eed5820a905 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 13 Dec 2021 18:03:29 -0500 Subject: [PATCH 03/15] Add task-cached stream test --- tests/test_resource_cache.py | 104 +++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 tests/test_resource_cache.py diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py new file mode 100644 index 0000000..48ca24e --- /dev/null +++ b/tests/test_resource_cache.py @@ -0,0 +1,104 @@ +''' +Async context manager cache api testing: ``trionics.maybe_open_context():`` + +''' +from contextlib import asynccontextmanager as acm +from typing import Awaitable + +import trio +import tractor + + +@tractor.context +async def streamer( + ctx: tractor.Context, + seq: list[int] = list(range(1000)), +) -> None: + + await ctx.started() + async with ctx.open_stream() as stream: + for val in seq: + await stream.send(val) + await trio.sleep(0.001) + + +@acm +async def open_stream() -> Awaitable[tractor.MsgStream]: + + async with tractor.open_nursery() as tn: + portal = await tn.start_actor('streamer', enable_modules=[__name__]) + async with ( + portal.open_context(streamer) as (ctx, first), + ctx.open_stream() as stream, + ): + yield stream + breakpoint() + + print('CANCELLING STREAMER') + await portal.cancel_actor() + + +@acm +async def maybe_open_stream(taskname: str): + async with tractor.trionics.maybe_open_context( + # NOTE: all secondary tasks should cache hit on the same key + key='stream', + mngr=open_stream(), + ) as (cache_hit, stream): + + if cache_hit: + print(f'{taskname} loaded from cache') + + # add a new broadcast subscription for the quote stream + # if this feed is already allocated by the first + # task that entereed + async with stream.subscribe() as bstream: + yield bstream + else: + # yield the actual stream + yield stream + + +def test_open_local_sub_to_stream(): + ''' + Verify a single inter-actor stream can can be fanned-out shared to + N local tasks using ``trionics.maybe_open_context():``. + + ''' + async def main(): + + full = list(range(1000)) + + async def get_sub_and_pull(taskname: str): + async with ( + maybe_open_stream(taskname) as stream, + ): + if '0' in taskname: + assert isinstance(stream, tractor.MsgStream) + else: + assert isinstance( + stream, + tractor.trionics.BroadcastReceiver + ) + + first = await stream.receive() + print(f'{taskname} started with value {first}') + seq = [] + async for msg in stream: + seq.append(msg) + print(f'{taskname} received {msg}') + + assert set(seq).issubset(set(full)) + print(f'{taskname} finished') + + # TODO: turns out this isn't multi-task entrant XD + # We probably need an indepotent entry semantic? + async with tractor.open_root_actor(): + async with ( + trio.open_nursery() as nurse, + ): + for i in range(10): + nurse.start_soon(get_sub_and_pull, f'task_{i}') + await trio.sleep(0.001) + + trio.run(main) From b210278e2f6dec65a1446a84d551a4ede85ee829 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Dec 2021 08:16:31 -0500 Subject: [PATCH 04/15] Naming change `cache` -> `_Cache` --- tractor/trionics/_mngrs.py | 46 +++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index e2d9acc..ab42b4e 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -112,10 +112,10 @@ async def gather_contexts( # Further potential examples of interest: # https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8 -class cache: +class _Cache: ''' - Globally (processs wide) cached, task access to a - kept-alive-while-in-use async resource. + Globally (actor-processs scoped) cached, task access to + a kept-alive-while-in-use async resource. ''' lock = trio.Lock() @@ -156,12 +156,12 @@ async def maybe_open_context( ) -> AsyncIterator[tuple[bool, T]]: ''' - Maybe open a context manager if there is not already a cached - version for the provided ``key``. Return the cached instance on - a cache hit. + Maybe open a context manager if there is not already a _Cached + version for the provided ``key``. Return the _Cached instance on + a _Cache hit. ''' - await cache.lock.acquire() + await _Cache.lock.acquire() ctx_key = id(mngr) @@ -169,17 +169,17 @@ async def maybe_open_context( try: # lock feed acquisition around task racing / ``trio``'s # scheduler protocol - value = cache.values[key] - log.info(f'Reusing cached resource for {key}') - cache.users += 1 - cache.lock.release() + value = _Cache.values[key] + log.info(f'Reusing _Cached resource for {key}') + _Cache.users += 1 + _Cache.lock.release() yield True, value except KeyError: log.info(f'Allocating new resource for {key}') # **critical section** that should prevent other tasks from - # checking the cache until complete otherwise the scheduler + # checking the _Cache until complete otherwise the scheduler # may switch and by accident we create more then one feed. # TODO: avoid pulling from ``tractor`` internals and @@ -187,28 +187,28 @@ async def maybe_open_context( service_n = current_actor()._service_n # TODO: does this need to be a tractor "root nursery"? - assert not cache.resources.get(ctx_key), f'Resource exists? {ctx_key}' - ln, _ = cache.resources[ctx_key] = (service_n, trio.Event()) + assert not _Cache.resources.get(ctx_key), f'Resource exists? {ctx_key}' + ln, _ = _Cache.resources[ctx_key] = (service_n, trio.Event()) - value = await ln.start(cache.run_ctx, mngr, key) - cache.users += 1 - cache.lock.release() + value = await ln.start(_Cache.run_ctx, mngr, key) + _Cache.users += 1 + _Cache.lock.release() yield False, value finally: - cache.users -= 1 - - if cache.lock.locked(): - cache.lock.release() + _Cache.users -= 1 if value is not None: # if no more consumers, teardown the client - if cache.users <= 0: + if _Cache.users <= 0: log.info(f'De-allocating resource for {key}') + if _Cache.lock.locked(): + _Cache.lock.release() + # terminate mngr nursery - entry = cache.resources.get(ctx_key) + entry = _Cache.resources.get(ctx_key) if entry: _, no_more_users = entry no_more_users.set() From 3826bc99723bc21d2e2f6681beeaa9de6c4efdb8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 14 Dec 2021 10:55:27 -0500 Subject: [PATCH 05/15] Don't catch key errors from the yielded to scope --- tractor/trionics/_mngrs.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index ab42b4e..af60c70 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -161,19 +161,15 @@ async def maybe_open_context( a _Cache hit. ''' + # lock resource acquisition around task racing / ``trio``'s + # scheduler protocol await _Cache.lock.acquire() ctx_key = id(mngr) - value = None + try: - # lock feed acquisition around task racing / ``trio``'s - # scheduler protocol value = _Cache.values[key] - log.info(f'Reusing _Cached resource for {key}') - _Cache.users += 1 - _Cache.lock.release() - yield True, value except KeyError: log.info(f'Allocating new resource for {key}') @@ -196,6 +192,12 @@ async def maybe_open_context( yield False, value + else: + log.info(f'Reusing _Cached resource for {key}') + _Cache.users += 1 + _Cache.lock.release() + yield True, value + finally: _Cache.users -= 1 From 52627a632615edd861d3708d50d93433fef3f598 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Dec 2021 13:42:47 -0500 Subject: [PATCH 06/15] Rework interface: pass func and kwargs After more extensive testing I realized that keying on the context manager *instance id* isn't going to work since each entering task is going to create a unique key XD Instead pass the manager function as `acm_func` and optionally allow keying the resource on the passed `kwargs` (if hashable) or the `key:str`. Further, pass the key to the enterer task and avoid a separate keying scheme for the manager versus the value it delivers. Don't bother with checking and releasing the lock in `finally:` block, it should be an error if it's still locked. --- tractor/trionics/_mngrs.py | 65 ++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index af60c70..757af61 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -24,6 +24,7 @@ from typing import ( AsyncContextManager, AsyncGenerator, AsyncIterator, + Callable, Hashable, Optional, Sequence, @@ -131,69 +132,76 @@ class _Cache: async def run_ctx( cls, mng, - key, + ctx_key: tuple, task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, ) -> None: async with mng as value: - - _, no_more_users = cls.resources[id(mng)] - cls.values[key] = value + _, no_more_users = cls.resources[ctx_key] + cls.values[ctx_key] = value task_status.started(value) try: await no_more_users.wait() finally: - value = cls.values.pop(key) - # discard nursery ref so it won't be re-used (an error) - cls.resources.pop(id(mng)) + # discard nursery ref so it won't be re-used (an error)? + value = cls.values.pop(ctx_key) + cls.resources.pop(ctx_key) @acm async def maybe_open_context( - key: Hashable, - mngr: AsyncContextManager[T], + acm_func: Callable[..., AsyncContextManager[T]], + + # XXX: used as cache key after conversion to tuple + # and all embedded values must also be hashable + kwargs: Optional[dict] = {}, + key: Hashable = None, ) -> AsyncIterator[tuple[bool, T]]: ''' Maybe open a context manager if there is not already a _Cached - version for the provided ``key``. Return the _Cached instance on - a _Cache hit. + version for the provided ``key`` for *this* actor. Return the + _Cached instance on a _Cache hit. ''' # lock resource acquisition around task racing / ``trio``'s # scheduler protocol await _Cache.lock.acquire() - ctx_key = id(mngr) + ctx_key = (id(acm_func), key or tuple(kwargs.items())) value = None try: - value = _Cache.values[key] - - except KeyError: - log.info(f'Allocating new resource for {key}') - # **critical section** that should prevent other tasks from # checking the _Cache until complete otherwise the scheduler - # may switch and by accident we create more then one feed. + # may switch and by accident we create more then one resource. + value = _Cache.values[ctx_key] + except KeyError: + log.info(f'Allocating new resource for {ctx_key}') + + mngr = acm_func(**kwargs) # TODO: avoid pulling from ``tractor`` internals and # instead offer a "root nursery" in piker actors? service_n = current_actor()._service_n # TODO: does this need to be a tractor "root nursery"? - assert not _Cache.resources.get(ctx_key), f'Resource exists? {ctx_key}' - ln, _ = _Cache.resources[ctx_key] = (service_n, trio.Event()) + resources = _Cache.resources + assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + ln, _ = resources[ctx_key] = (service_n, trio.Event()) - value = await ln.start(_Cache.run_ctx, mngr, key) + value = await ln.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) _Cache.users += 1 _Cache.lock.release() - yield False, value else: - log.info(f'Reusing _Cached resource for {key}') + log.info(f'Reusing _Cached resource for {ctx_key}') _Cache.users += 1 _Cache.lock.release() yield True, value @@ -204,13 +212,8 @@ async def maybe_open_context( if value is not None: # if no more consumers, teardown the client if _Cache.users <= 0: - log.info(f'De-allocating resource for {key}') - - if _Cache.lock.locked(): - _Cache.lock.release() + log.info(f'De-allocating resource for {ctx_key}') # terminate mngr nursery - entry = _Cache.resources.get(ctx_key) - if entry: - _, no_more_users = entry - no_more_users.set() + _, no_more_users = _Cache.resources[ctx_key] + no_more_users.set() From f617da6ff11adbdd70978e1b2e41dbb00dca23c0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Dec 2021 14:22:26 -0500 Subject: [PATCH 07/15] Add timeout around test and prints for guidance --- tests/test_resource_cache.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index 48ca24e..f7bf1eb 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -21,6 +21,8 @@ async def streamer( await stream.send(val) await trio.sleep(0.001) + print('producer finished') + @acm async def open_stream() -> Awaitable[tractor.MsgStream]: @@ -32,18 +34,16 @@ async def open_stream() -> Awaitable[tractor.MsgStream]: ctx.open_stream() as stream, ): yield stream - breakpoint() - print('CANCELLING STREAMER') await portal.cancel_actor() + print('CANCELLED STREAMER') @acm async def maybe_open_stream(taskname: str): async with tractor.trionics.maybe_open_context( # NOTE: all secondary tasks should cache hit on the same key - key='stream', - mngr=open_stream(), + acm_func=open_stream, ) as (cache_hit, stream): if cache_hit: @@ -86,19 +86,21 @@ def test_open_local_sub_to_stream(): seq = [] async for msg in stream: seq.append(msg) - print(f'{taskname} received {msg}') assert set(seq).issubset(set(full)) - print(f'{taskname} finished') + print(f'{taskname} finished') # TODO: turns out this isn't multi-task entrant XD # We probably need an indepotent entry semantic? - async with tractor.open_root_actor(): - async with ( - trio.open_nursery() as nurse, - ): - for i in range(10): - nurse.start_soon(get_sub_and_pull, f'task_{i}') - await trio.sleep(0.001) + with trio.fail_after(3): + async with tractor.open_root_actor(): + async with ( + trio.open_nursery() as nurse, + ): + for i in range(10): + nurse.start_soon(get_sub_and_pull, f'task_{i}') + await trio.sleep(0.001) + + print('all consumer tasks finished') trio.run(main) From 213447008b90375e415125d472f7050d83cd6665 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Dec 2021 14:27:01 -0500 Subject: [PATCH 08/15] Add draft code for waiting on all nurseries in root --- tractor/_root.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tractor/_root.py b/tractor/_root.py index fc727d4..3468a25 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -215,6 +215,14 @@ async def open_root_actor( raise finally: + # NOTE: not sure if we'll ever need this but it's + # possibly better for even more determinism? + # logger.cancel(f'Waiting on {len(nurseries)} nurseries in root..') + # nurseries = actor._actoruid2nursery.values() + # async with trio.open_nursery() as tempn: + # for an in nurseries: + # tempn.start_soon(an.exited.wait) + logger.cancel("Shutting down root actor") await actor.cancel() finally: From 11e64426f680b9239cadf96dbf553664e0cb344a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Dec 2021 16:20:57 -0500 Subject: [PATCH 09/15] Wake all sleeping consumers on bcaster closure --- tractor/trionics/_broadcast.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 35711b2..77ab6d0 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -331,10 +331,16 @@ class BroadcastReceiver(ReceiveChannel): if self._closed: return + # if there are sleeping consumers wake + # them on closure. + rr = self._state.recv_ready + if rr: + _, event = rr + event.set() + # XXX: leaving it like this consumers can still get values # up to the last received that still reside in the queue. self._state.subs.pop(self.key) - self._closed = True From 26394dd8df93928f961b609c8a19a258610b6557 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Dec 2021 17:21:41 -0500 Subject: [PATCH 10/15] Type annot fixes --- tractor/trionics/_mngrs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 757af61..c9f596d 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -123,7 +123,7 @@ class _Cache: users: int = 0 values: dict[Any, Any] = {} resources: dict[ - int, + Hashable, tuple[trio.Nursery, trio.Event] ] = {} no_more_users: Optional[trio.Event] = None @@ -155,7 +155,7 @@ async def maybe_open_context( # XXX: used as cache key after conversion to tuple # and all embedded values must also be hashable - kwargs: Optional[dict] = {}, + kwargs: dict = {}, key: Hashable = None, ) -> AsyncIterator[tuple[bool, T]]: From 9b1d8bf7b07c8c29d099ba34b164b1873f7eb8c5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Dec 2021 17:50:47 -0500 Subject: [PATCH 11/15] Of course, increase the timeout for windows.. --- tests/test_resource_cache.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index f7bf1eb..ae8ff7e 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -3,6 +3,7 @@ Async context manager cache api testing: ``trionics.maybe_open_context():`` ''' from contextlib import asynccontextmanager as acm +import platform from typing import Awaitable import trio @@ -65,6 +66,8 @@ def test_open_local_sub_to_stream(): N local tasks using ``trionics.maybe_open_context():``. ''' + timeout = 3 if platform.system() != "Windows" else 10 + async def main(): full = list(range(1000)) @@ -92,7 +95,7 @@ def test_open_local_sub_to_stream(): # TODO: turns out this isn't multi-task entrant XD # We probably need an indepotent entry semantic? - with trio.fail_after(3): + with trio.fail_after(timeout): async with tractor.open_root_actor(): async with ( trio.open_nursery() as nurse, From 67dc0d014c21c07e199a509bccb9b028e01ddee7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 16 Dec 2021 10:13:30 -0500 Subject: [PATCH 12/15] Add basic `maybe_open_context()` caching test --- tests/test_resource_cache.py | 55 ++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index ae8ff7e..4122c24 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -10,6 +10,61 @@ import trio import tractor +_resource: int = 0 + + +@acm +async def maybe_increment_counter(task_name: str): + global _resource + + _resource += 1 + await trio.lowlevel.checkpoint() + yield _resource + await trio.lowlevel.checkpoint() + _resource -= 1 + + +def test_resource_only_entered_once(): + + async def main(): + global _resource + cache_active: bool = False + + async def enter_cached_mngr(name: str): + nonlocal cache_active + + async with tractor.trionics.maybe_open_context( + maybe_increment_counter, + kwargs={'task_name': name}, + key='same_key' + + ) as (cache_hit, resource): + if cache_hit: + try: + cache_active = True + assert resource == 1 + await trio.sleep_forever() + finally: + cache_active = False + else: + assert resource == 1 + await trio.sleep_forever() + + with trio.move_on_after(0.5): + # TODO: turns out this isn't multi-task entrant XD + # We probably need an indepotent entry semantic? + async with ( + tractor.open_root_actor(), + trio.open_nursery() as n, + ): + + for i in range(10): + n.start_soon(enter_cached_mngr, f'task_{i}') + await trio.sleep(0.001) + + trio.run(main) + + @tractor.context async def streamer( ctx: tractor.Context, From 21a9c474968840d8df567876980f067cc3a594a2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 16 Dec 2021 10:40:47 -0500 Subject: [PATCH 13/15] Parameterize over cache keying methods: kwargs and "key" --- tests/test_resource_cache.py | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index 4122c24..0e7ad74 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -6,6 +6,7 @@ from contextlib import asynccontextmanager as acm import platform from typing import Awaitable +import pytest import trio import tractor @@ -24,19 +25,38 @@ async def maybe_increment_counter(task_name: str): _resource -= 1 -def test_resource_only_entered_once(): +@pytest.mark.parametrize( + 'key_on', + ['key_value', 'kwargs'], + ids="key_on={}".format, +) +def test_resource_only_entered_once(key_on): + global _resource + _resource = 0 + + kwargs = {} + key = None + if key_on == 'key_value': + key = 'some_common_key' async def main(): - global _resource cache_active: bool = False async def enter_cached_mngr(name: str): nonlocal cache_active + if key_on == 'kwargs': + # make a common kwargs input to key on it + kwargs = {'task_name': 'same_task_name'} + assert key is None + else: + # different task names per task will be used + kwargs = {'task_name': name} + async with tractor.trionics.maybe_open_context( maybe_increment_counter, - kwargs={'task_name': name}, - key='same_key' + kwargs=kwargs, + key=key, ) as (cache_hit, resource): if cache_hit: @@ -51,8 +71,6 @@ def test_resource_only_entered_once(): await trio.sleep_forever() with trio.move_on_after(0.5): - # TODO: turns out this isn't multi-task entrant XD - # We probably need an indepotent entry semantic? async with ( tractor.open_root_actor(), trio.open_nursery() as n, @@ -148,9 +166,9 @@ def test_open_local_sub_to_stream(): assert set(seq).issubset(set(full)) print(f'{taskname} finished') - # TODO: turns out this isn't multi-task entrant XD - # We probably need an indepotent entry semantic? with trio.fail_after(timeout): + # TODO: turns out this isn't multi-task entrant XD + # We probably need an indepotent entry semantic? async with tractor.open_root_actor(): async with ( trio.open_nursery() as nurse, From da5e36bf0c383ddd92ba2b9fcdf8fc78ff81031a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 16 Dec 2021 11:00:57 -0500 Subject: [PATCH 14/15] Revert back to avoiding key errors on cancellation --- tractor/trionics/_mngrs.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index c9f596d..3834983 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -170,6 +170,7 @@ async def maybe_open_context( await _Cache.lock.acquire() ctx_key = (id(acm_func), key or tuple(kwargs.items())) + print(ctx_key) value = None try: @@ -214,6 +215,10 @@ async def maybe_open_context( if _Cache.users <= 0: log.info(f'De-allocating resource for {ctx_key}') - # terminate mngr nursery - _, no_more_users = _Cache.resources[ctx_key] - no_more_users.set() + # XXX: if we're cancelled we the entry may have never + # been entered since the nursery task was killed. + # _, no_more_users = _Cache.resources[ctx_key] + entry = _Cache.resources.get(ctx_key) + if entry: + _, no_more_users = entry + no_more_users.set() From 953d15b67dbacd9915518db76b25427ad9020fd5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 16 Dec 2021 11:46:11 -0500 Subject: [PATCH 15/15] Add nooz --- newsfragments/257.feature.rst | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 newsfragments/257.feature.rst diff --git a/newsfragments/257.feature.rst b/newsfragments/257.feature.rst new file mode 100644 index 0000000..1183632 --- /dev/null +++ b/newsfragments/257.feature.rst @@ -0,0 +1,9 @@ +Add ``trionics.maybe_open_context()`` an actor-scoped async multi-task +context manager resource caching API. + +Adds an SC-safe cacheing async context manager api that only enters on +the *first* task entry and only exits on the *last* task exit while in +between delivering the same cached value per input key. Keys can be +either an explicit ``key`` named arg provided by the user or a +hashable ``kwargs`` dict (will be converted to a ``list[tuple]``) which +is passed to the underlying manager function as input.