From 340ddba4aeceff4c443e7090443f0e7cee278d18 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 10:44:27 -0400 Subject: [PATCH 01/28] Rename the nursery module to `_supervise` --- tractor/{_trionics.py => _supervise.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tractor/{_trionics.py => _supervise.py} (100%) diff --git a/tractor/_trionics.py b/tractor/_supervise.py similarity index 100% rename from tractor/_trionics.py rename to tractor/_supervise.py From 680a84128232b159cde7fc6272af82a45233021f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 11:02:51 -0400 Subject: [PATCH 02/28] Start `trionics` sub-pkg with `async_enter_all()` Since it seems we're building out more and more higher level primitives in order to support certain parallel style actor trees and messaging patterns (eg. task broadcast channels), we might as well start a new sub-package for purely `trio` constructions. We hereby dub this the realm of `trionics` (like electronics but for trios instead of electrons). To kick things off, add an `async_enter_all()` concurrent exit-stack-like context manager API which will concurrently spawn a sequence of provided async context managers and deliver their ordered results but with proper support for `trio` cancellation semantics. The stdlib's `AsyncExitStack` is not compatible with nurseries not `trio` tasks (which are cancelled) since as task will be suspended on the stack after push and does not ever hit a checkpoint until the stack is closed. --- tractor/trionics/__init__.py | 10 ++++++ tractor/trionics/_mngrs.py | 64 ++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 tractor/trionics/__init__.py create mode 100644 tractor/trionics/_mngrs.py diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py new file mode 100644 index 0000000..620d25a --- /dev/null +++ b/tractor/trionics/__init__.py @@ -0,0 +1,10 @@ +''' +Sugary patterns for trio + tractor designs. + +''' +from ._mngrs import async_enter_all + + +__all__ = [ + 'async_enter_all', +] diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py new file mode 100644 index 0000000..4e9a86a --- /dev/null +++ b/tractor/trionics/_mngrs.py @@ -0,0 +1,64 @@ +''' +Async context manager primitives with hard ``trio``-aware semantics + +''' +from typing import AsyncContextManager +from typing import TypeVar +from contextlib import asynccontextmanager as acm + +import trio + + +# A regular invariant generic type +T = TypeVar("T") + + +async def _enter_and_sleep( + + mngr: AsyncContextManager[T], + to_yield: dict[int, T], + all_entered: trio.Event, + # task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + +) -> T: + '''Open the async context manager deliver it's value + to this task's spawner and sleep until cancelled. + + ''' + async with mngr as value: + to_yield[id(mngr)] = value + + if all(to_yield.values()): + all_entered.set() + + # sleep until cancelled + await trio.sleep_forever() + + +@acm +async def async_enter_all( + + *mngrs: list[AsyncContextManager[T]], + +) -> tuple[T]: + + to_yield = {}.fromkeys(id(mngr) for mngr in mngrs) + + all_entered = trio.Event() + + async with trio.open_nursery() as n: + for mngr in mngrs: + n.start_soon( + _enter_and_sleep, + mngr, + to_yield, + all_entered, + ) + + # deliver control once all managers have started up + await all_entered.wait() + yield tuple(to_yield.values()) + + # tear down all sleeper tasks thus triggering individual + # mngr ``__aexit__()``s. + n.cancel_scope.cancel() From 4114eb1d25141ab41521da9b5fe31b2f6a7b5b90 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 11:22:10 -0400 Subject: [PATCH 03/28] Move broadcast channel parts into trionics --- tests/test_task_broadcasting.py | 3 +-- tractor/_streaming.py | 2 +- tractor/trionics/__init__.py | 3 +++ tractor/{ => trionics}/_broadcast.py | 0 4 files changed, 5 insertions(+), 3 deletions(-) rename tractor/{ => trionics}/_broadcast.py (100%) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index f32d209..b18a40e 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -12,7 +12,7 @@ import pytest import trio from trio.lowlevel import current_task import tractor -from tractor._broadcast import broadcast_receiver, Lagged +from tractor.trionics import broadcast_receiver, Lagged @tractor.context @@ -432,7 +432,6 @@ def test_first_recver_is_cancelled(): tx, rx = trio.open_memory_channel(1) brx = broadcast_receiver(rx, 1) cs = trio.CancelScope() - sequence = list(range(3)) async def sub_and_recv(): with cs: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 6ee264c..5c22116 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -19,8 +19,8 @@ import trio from ._ipc import Channel from ._exceptions import unpack_error, ContextCancelled from ._state import current_actor -from ._broadcast import broadcast_receiver, BroadcastReceiver from .log import get_logger +from .trionics import broadcast_receiver, BroadcastReceiver log = get_logger(__name__) diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index 620d25a..a5b2c87 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -3,8 +3,11 @@ Sugary patterns for trio + tractor designs. ''' from ._mngrs import async_enter_all +from ._broadcast import broadcast_receiver, BroadcastReceiver __all__ = [ 'async_enter_all', + 'broadcast_receiver', + 'BroadcastReceiver', ] diff --git a/tractor/_broadcast.py b/tractor/trionics/_broadcast.py similarity index 100% rename from tractor/_broadcast.py rename to tractor/trionics/_broadcast.py From 1e917fdb1dbcc29d3b2340506f853830c7c73d4f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 12:02:21 -0400 Subject: [PATCH 04/28] Add an async actor cluster spawner prototype --- tractor/_clustering.py | 49 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 tractor/_clustering.py diff --git a/tractor/_clustering.py b/tractor/_clustering.py new file mode 100644 index 0000000..6e3f05d --- /dev/null +++ b/tractor/_clustering.py @@ -0,0 +1,49 @@ +''' +Actor cluster helpers. + +''' +from contextlib import asynccontextmanager as acm +from multiprocessing import cpu_count +from typing import AsyncGenerator, Optional + +import trio +import tractor + + +@acm +async def open_actor_cluster( + + modules: list[str], + count: int = cpu_count(), + names: Optional[list[str]] = None, + +) -> AsyncGenerator[..., dict[str, tractor.Portal]]: + + portals: dict[str, tractor.Portal] = {} + uid = tractor.current_actor().uid + + if not names: + suffix = '_'.join(uid) + names = [f'worker_{i}.' + suffix for i in range(count)] + + if not len(names) == count: + raise ValueError( + 'Number of names is {len(names)} but count it {count}') + + async with ( + tractor.open_nursery() as an, + trio.open_nursery() as n, + ): + for index, key in zip(range(count), names): + + async def start(i) -> None: + key = f'worker_{i}.' + '_'.join(uid) + portals[key] = await an.start_actor( + enable_modules=modules, + name=key, + ) + + n.start_soon(start, index) + + assert len(portals) == count + yield portals From 79fb1d0ebc02528c7003c1b6094913d4eafed281 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 12:39:52 -0400 Subject: [PATCH 05/28] Fix top level nursery import --- tractor/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 394693f..795feb7 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -13,7 +13,7 @@ from ._streaming import ( context, ) from ._discovery import get_arbiter, find_actor, wait_for_actor -from ._trionics import open_nursery +from ._supervise import open_nursery from ._state import current_actor, is_root_process from ._exceptions import ( RemoteActorError, From 97006c904ca10edd849297686bb2521a46f189c1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 15:08:48 -0400 Subject: [PATCH 06/28] Expose `Lagged` for broadcasting --- tractor/trionics/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index a5b2c87..f7e90c6 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -3,11 +3,12 @@ Sugary patterns for trio + tractor designs. ''' from ._mngrs import async_enter_all -from ._broadcast import broadcast_receiver, BroadcastReceiver +from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged __all__ = [ 'async_enter_all', 'broadcast_receiver', 'BroadcastReceiver', + 'Lagged', ] From 8ba10315c1ab4e6b72236a459ae46e484c9b8e40 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 15:09:07 -0400 Subject: [PATCH 07/28] Fix type path to new `_supervise` mod --- examples/debugging/fast_error_in_root_after_spawn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/debugging/fast_error_in_root_after_spawn.py b/examples/debugging/fast_error_in_root_after_spawn.py index 044815b..570cf7e 100644 --- a/examples/debugging/fast_error_in_root_after_spawn.py +++ b/examples/debugging/fast_error_in_root_after_spawn.py @@ -20,7 +20,7 @@ async def sleep( async def open_ctx( - n: tractor._trionics.ActorNursery + n: tractor._supervise.ActorNursery ): # spawn both actors From 9ddd75733c0befbb61997a8cf129d74a786be4e8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 16:01:09 -0400 Subject: [PATCH 08/28] Lul, fix everything for cluster helper --- tractor/_clustering.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 6e3f05d..829caff 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -17,7 +17,10 @@ async def open_actor_cluster( count: int = cpu_count(), names: Optional[list[str]] = None, -) -> AsyncGenerator[..., dict[str, tractor.Portal]]: +) -> AsyncGenerator[ + list[str], + dict[str, tractor.Portal] +]: portals: dict[str, tractor.Portal] = {} uid = tractor.current_actor().uid @@ -30,20 +33,18 @@ async def open_actor_cluster( raise ValueError( 'Number of names is {len(names)} but count it {count}') - async with ( - tractor.open_nursery() as an, - trio.open_nursery() as n, - ): - for index, key in zip(range(count), names): + async with tractor.open_nursery() as an: + async with trio.open_nursery() as n: + for index, key in zip(range(count), names): - async def start(i) -> None: - key = f'worker_{i}.' + '_'.join(uid) - portals[key] = await an.start_actor( - enable_modules=modules, - name=key, - ) + async def start(i) -> None: + key = f'worker_{i}.' + '_'.join(uid) + portals[key] = await an.start_actor( + enable_modules=modules, + name=key, + ) - n.start_soon(start, index) + n.start_soon(start, index) assert len(portals) == count yield portals From c372367cc2848a3d904e4d616167ed6441556199 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 19:50:09 -0400 Subject: [PATCH 09/28] Fix *args-like type annot --- tractor/trionics/_mngrs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 4e9a86a..09d4b14 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -38,7 +38,7 @@ async def _enter_and_sleep( @acm async def async_enter_all( - *mngrs: list[AsyncContextManager[T]], + *mngrs: tuple[AsyncContextManager[T]], ) -> tuple[T]: From 76767a3d7ee990e59a4f394f46f6a78f23137ebe Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:30:44 +0200 Subject: [PATCH 10/28] Add 'trio.trionics' to setup.py --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index d233ecd..4f7431c 100755 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ setup( platforms=['linux', 'windows'], packages=[ 'tractor', + 'tractor.trionics', 'tractor.testing', ], install_requires=[ From 7d502cef74d51dcef54b038eb12d27c0370c6508 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:31:26 +0200 Subject: [PATCH 11/28] Add 'open_actor_cluster' to __all__ --- tractor/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/__init__.py b/tractor/__init__.py index 795feb7..602e7bf 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,6 +4,7 @@ tractor: An actor model micro-framework built on """ from trio import MultiError +from ._clustering import open_actor_cluster from ._ipc import Channel from ._streaming import ( Context, @@ -39,6 +40,7 @@ __all__ = [ 'get_arbiter', 'is_root_process', 'msg', + 'open_actor_cluster', 'open_nursery', 'open_root_actor', 'Portal', From 21afc69ac7124784ae8585a296ad6eb30d0beb31 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:33:39 +0200 Subject: [PATCH 12/28] Postpone evaluation of annotations --- tractor/_clustering.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 829caff..8142373 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -2,6 +2,8 @@ Actor cluster helpers. ''' +from __future__ import annotations + from contextlib import asynccontextmanager as acm from multiprocessing import cpu_count from typing import AsyncGenerator, Optional From 2815f1c343783ffaeddc06ca9b9e0e947e88163f Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:34:59 +0200 Subject: [PATCH 13/28] Make 'async_enter_all' take a teardown trigger which '_enter_and_wait' will wait on --- tractor/trionics/_mngrs.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 09d4b14..878bc35 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -13,12 +13,12 @@ import trio T = TypeVar("T") -async def _enter_and_sleep( +async def _enter_and_wait( mngr: AsyncContextManager[T], to_yield: dict[int, T], all_entered: trio.Event, - # task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + teardown_trigger: trio.Event, ) -> T: '''Open the async context manager deliver it's value @@ -31,14 +31,14 @@ async def _enter_and_sleep( if all(to_yield.values()): all_entered.set() - # sleep until cancelled - await trio.sleep_forever() + await teardown_trigger.wait() @acm async def async_enter_all( *mngrs: tuple[AsyncContextManager[T]], + teardown_trigger: trio.Event, ) -> tuple[T]: @@ -49,16 +49,13 @@ async def async_enter_all( async with trio.open_nursery() as n: for mngr in mngrs: n.start_soon( - _enter_and_sleep, + _enter_and_wait, mngr, to_yield, all_entered, + teardown_trigger, ) # deliver control once all managers have started up await all_entered.wait() yield tuple(to_yield.values()) - - # tear down all sleeper tasks thus triggering individual - # mngr ``__aexit__()``s. - n.cancel_scope.cancel() From 73cbb2388a7638688cf32dd0548d83cfea0bc89c Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:39:36 +0200 Subject: [PATCH 14/28] Avoid RuntimeError by not using current_actor's uid --- tractor/_clustering.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 8142373..37a03bb 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -25,7 +25,8 @@ async def open_actor_cluster( ]: portals: dict[str, tractor.Portal] = {} - uid = tractor.current_actor().uid + uid = __import__('random').randint(0, 2 ** 16) + # uid = tractor.current_actor().uid if not names: suffix = '_'.join(uid) From 6e6baf250b471ec8c158e77ad07e2b6b91083881 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:50:36 +0200 Subject: [PATCH 15/28] Make sure the ID is a str --- tractor/_clustering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 37a03bb..8fabc00 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -25,7 +25,7 @@ async def open_actor_cluster( ]: portals: dict[str, tractor.Portal] = {} - uid = __import__('random').randint(0, 2 ** 16) + uid = str(__import__('random').randint(0, 2 ** 16)) # uid = tractor.current_actor().uid if not names: From 6f9229cd09b6cf73b3e023bd704b71cb34543dc1 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:50:55 +0200 Subject: [PATCH 16/28] Cancel nursery --- tractor/_clustering.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 8fabc00..d7255fd 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -51,3 +51,5 @@ async def open_actor_cluster( assert len(portals) == count yield portals + + await an.cancel() From 3130a04c6109121729914307c2079cf670ae1104 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sun, 17 Oct 2021 07:33:37 +0200 Subject: [PATCH 17/28] Rename a variable and fix type annotations --- tractor/trionics/_mngrs.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 878bc35..bfc715b 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -2,8 +2,8 @@ Async context manager primitives with hard ``trio``-aware semantics ''' -from typing import AsyncContextManager -from typing import TypeVar +from typing import AsyncContextManager, AsyncGenerator +from typing import TypeVar, Sequence from contextlib import asynccontextmanager as acm import trio @@ -14,21 +14,19 @@ T = TypeVar("T") async def _enter_and_wait( - mngr: AsyncContextManager[T], - to_yield: dict[int, T], + unwrapped: dict[int, T], all_entered: trio.Event, teardown_trigger: trio.Event, - -) -> T: +) -> None: '''Open the async context manager deliver it's value to this task's spawner and sleep until cancelled. ''' async with mngr as value: - to_yield[id(mngr)] = value + unwrapped[id(mngr)] = value - if all(to_yield.values()): + if all(unwrapped.values()): all_entered.set() await teardown_trigger.wait() @@ -36,13 +34,14 @@ async def _enter_and_wait( @acm async def async_enter_all( - - *mngrs: tuple[AsyncContextManager[T]], + mngrs: Sequence[AsyncContextManager[T]], teardown_trigger: trio.Event, - -) -> tuple[T]: - - to_yield = {}.fromkeys(id(mngr) for mngr in mngrs) +) -> AsyncGenerator[tuple[T, ...], None]: + """This async context manager expects a 'teardown_trigger' from the + outside world which will be used internally to gracefully teardown + individual context managers. + """ + unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) all_entered = trio.Event() @@ -51,11 +50,12 @@ async def async_enter_all( n.start_soon( _enter_and_wait, mngr, - to_yield, + unwrapped, all_entered, teardown_trigger, ) # deliver control once all managers have started up await all_entered.wait() - yield tuple(to_yield.values()) + + yield tuple(unwrapped.values()) From c1089dbd95ac8b27214f2cde04d06a85794c6b07 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sun, 17 Oct 2021 07:33:54 +0200 Subject: [PATCH 18/28] Add a clustering test --- tests/test_clustering.py | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/test_clustering.py diff --git a/tests/test_clustering.py b/tests/test_clustering.py new file mode 100644 index 0000000..8f28d85 --- /dev/null +++ b/tests/test_clustering.py @@ -0,0 +1,41 @@ +import itertools + +import trio +import tractor +from tractor import open_actor_cluster +from tractor.trionics import async_enter_all + +from conftest import tractor_test + + +MESSAGE = 'tractoring at full speed' + + +@tractor.context +async def worker(ctx: tractor.Context) -> None: + await ctx.started() + async with ctx.open_stream() as stream: + async for msg in stream: + # do something with msg + print(msg) + assert msg == MESSAGE + + +@tractor_test +async def test_streaming_to_actor_cluster() -> None: + teardown_trigger = trio.Event() + async with ( + open_actor_cluster(modules=[__name__]) as portals, + async_enter_all( + mngrs=[p.open_context(worker) for p in portals.values()], + teardown_trigger=teardown_trigger, + ) as contexts, + async_enter_all( + mngrs=[ctx[0].open_stream() for ctx in contexts], + teardown_trigger=teardown_trigger, + ) as streams, + ): + with trio.move_on_after(1): + for stream in itertools.cycle(streams): + await stream.send(MESSAGE) + teardown_trigger.set() \ No newline at end of file From b7a464167424bf86dec9a4d812d88a05e2d70510 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sun, 17 Oct 2021 14:40:09 +0200 Subject: [PATCH 19/28] Allow specifying start_method and hard_kill --- tractor/_clustering.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index d7255fd..2a68482 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -18,6 +18,8 @@ async def open_actor_cluster( modules: list[str], count: int = cpu_count(), names: Optional[list[str]] = None, + start_method: Optional[str] = None, + hard_kill: bool = False, ) -> AsyncGenerator[ list[str], @@ -36,7 +38,7 @@ async def open_actor_cluster( raise ValueError( 'Number of names is {len(names)} but count it {count}') - async with tractor.open_nursery() as an: + async with tractor.open_nursery(start_method=start_method) as an: async with trio.open_nursery() as n: for index, key in zip(range(count), names): @@ -52,4 +54,4 @@ async def open_actor_cluster( assert len(portals) == count yield portals - await an.cancel() + await an.cancel(hard_kill=hard_kill) From 04895b9d5edc69dbc72223620361eba21e901bbc Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Fri, 22 Oct 2021 02:59:15 +0200 Subject: [PATCH 20/28] Get rid of dumb random uid and use current actor's uid --- tractor/_clustering.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 2a68482..410459b 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -14,25 +14,20 @@ import tractor @acm async def open_actor_cluster( - modules: list[str], count: int = cpu_count(), names: Optional[list[str]] = None, start_method: Optional[str] = None, hard_kill: bool = False, - ) -> AsyncGenerator[ list[str], dict[str, tractor.Portal] ]: portals: dict[str, tractor.Portal] = {} - uid = str(__import__('random').randint(0, 2 ** 16)) - # uid = tractor.current_actor().uid if not names: - suffix = '_'.join(uid) - names = [f'worker_{i}.' + suffix for i in range(count)] + names = [f'worker_{i}' for i in range(count)] if not len(names) == count: raise ValueError( @@ -40,16 +35,17 @@ async def open_actor_cluster( async with tractor.open_nursery(start_method=start_method) as an: async with trio.open_nursery() as n: - for index, key in zip(range(count), names): + uid = tractor.current_actor().uid - async def start(i) -> None: - key = f'worker_{i}.' + '_'.join(uid) - portals[key] = await an.start_actor( - enable_modules=modules, - name=key, - ) + async def _start(name: str) -> None: + name = f'{name}.{uid}' + portals[name] = await an.start_actor( + enable_modules=modules, + name=name, + ) - n.start_soon(start, index) + for name in names: + n.start_soon(_start, name) assert len(portals) == count yield portals From 87e3d32992680e57fe862d5fa115248e0751d08a Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Fri, 22 Oct 2021 03:03:41 +0200 Subject: [PATCH 21/28] Get rid of external teardown trigger because #245 resolves the problem --- tractor/trionics/_mngrs.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index bfc715b..ca75c55 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -17,7 +17,6 @@ async def _enter_and_wait( mngr: AsyncContextManager[T], unwrapped: dict[int, T], all_entered: trio.Event, - teardown_trigger: trio.Event, ) -> None: '''Open the async context manager deliver it's value to this task's spawner and sleep until cancelled. @@ -29,18 +28,13 @@ async def _enter_and_wait( if all(unwrapped.values()): all_entered.set() - await teardown_trigger.wait() + await trio.sleep_forever() @acm async def async_enter_all( mngrs: Sequence[AsyncContextManager[T]], - teardown_trigger: trio.Event, ) -> AsyncGenerator[tuple[T, ...], None]: - """This async context manager expects a 'teardown_trigger' from the - outside world which will be used internally to gracefully teardown - individual context managers. - """ unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) all_entered = trio.Event() @@ -52,10 +46,11 @@ async def async_enter_all( mngr, unwrapped, all_entered, - teardown_trigger, ) # deliver control once all managers have started up await all_entered.wait() yield tuple(unwrapped.values()) + + n.cancel_scope.cancel() \ No newline at end of file From b91adcf38dca47b1642a8890869c7082f636f55d Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Fri, 22 Oct 2021 03:10:46 +0200 Subject: [PATCH 22/28] Get rid of external teardown trigger --- tests/test_clustering.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_clustering.py b/tests/test_clustering.py index 8f28d85..1e85d04 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -23,19 +23,15 @@ async def worker(ctx: tractor.Context) -> None: @tractor_test async def test_streaming_to_actor_cluster() -> None: - teardown_trigger = trio.Event() async with ( open_actor_cluster(modules=[__name__]) as portals, async_enter_all( mngrs=[p.open_context(worker) for p in portals.values()], - teardown_trigger=teardown_trigger, ) as contexts, async_enter_all( mngrs=[ctx[0].open_stream() for ctx in contexts], - teardown_trigger=teardown_trigger, ) as streams, ): with trio.move_on_after(1): for stream in itertools.cycle(streams): await stream.send(MESSAGE) - teardown_trigger.set() \ No newline at end of file From 50400359b8566b489ebb2350c5a27fa28c3eba7a Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sun, 24 Oct 2021 00:47:26 +0200 Subject: [PATCH 23/28] Fix type annotations --- tractor/_clustering.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 410459b..6d7e7c8 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -20,8 +20,8 @@ async def open_actor_cluster( start_method: Optional[str] = None, hard_kill: bool = False, ) -> AsyncGenerator[ - list[str], - dict[str, tractor.Portal] + dict[str, tractor.Portal], + None, ]: portals: dict[str, tractor.Portal] = {} From d0f5c7a5e2c9d8bf73aaf94b812e13f4812c9082 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 24 Oct 2021 13:48:36 -0400 Subject: [PATCH 24/28] Change to `gather_contexts()`, use event for graceful exit The api we've made here is actually closer to `asyncio.gather()` but with opening async context managers instead of funcs. Use another event to allow for graceful teardown of children on non-cancellation exits and add a doc string. --- tests/test_clustering.py | 6 +++--- tractor/trionics/__init__.py | 4 ++-- tractor/trionics/_mngrs.py | 30 ++++++++++++++++++++++++++---- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/tests/test_clustering.py b/tests/test_clustering.py index 1e85d04..ba8052f 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -3,7 +3,7 @@ import itertools import trio import tractor from tractor import open_actor_cluster -from tractor.trionics import async_enter_all +from tractor.trionics import gather_contexts from conftest import tractor_test @@ -25,10 +25,10 @@ async def worker(ctx: tractor.Context) -> None: async def test_streaming_to_actor_cluster() -> None: async with ( open_actor_cluster(modules=[__name__]) as portals, - async_enter_all( + gather_contexts( mngrs=[p.open_context(worker) for p in portals.values()], ) as contexts, - async_enter_all( + gather_contexts( mngrs=[ctx[0].open_stream() for ctx in contexts], ) as streams, ): diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index f7e90c6..3d1b9bd 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -2,12 +2,12 @@ Sugary patterns for trio + tractor designs. ''' -from ._mngrs import async_enter_all +from ._mngrs import gather_contexts from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged __all__ = [ - 'async_enter_all', + 'gather_contexts', 'broadcast_receiver', 'BroadcastReceiver', 'Lagged', diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index ca75c55..d31f1e0 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -14,11 +14,15 @@ T = TypeVar("T") async def _enter_and_wait( + mngr: AsyncContextManager[T], unwrapped: dict[int, T], all_entered: trio.Event, + parent_exit: trio.Event, + ) -> None: - '''Open the async context manager deliver it's value + ''' + Open the async context manager deliver it's value to this task's spawner and sleep until cancelled. ''' @@ -28,16 +32,31 @@ async def _enter_and_wait( if all(unwrapped.values()): all_entered.set() - await trio.sleep_forever() + await parent_exit.wait() @acm -async def async_enter_all( +async def gather_contexts( + mngrs: Sequence[AsyncContextManager[T]], + ) -> AsyncGenerator[tuple[T, ...], None]: + ''' + Concurrently enter a sequence of async context managers, each in + a separate ``trio`` task and deliver the unwrapped values in the + same order once all managers have entered. On exit all contexts are + subsequently and concurrently exited. + + This function is somewhat similar to common usage of + ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in + combo with ``asyncio.gather()`` except the managers are concurrently + entered and exited cancellation just works. + + ''' unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) all_entered = trio.Event() + parent_exit = trio.Event() async with trio.open_nursery() as n: for mngr in mngrs: @@ -46,6 +65,7 @@ async def async_enter_all( mngr, unwrapped, all_entered, + parent_exit, ) # deliver control once all managers have started up @@ -53,4 +73,6 @@ async def async_enter_all( yield tuple(unwrapped.values()) - n.cancel_scope.cancel() \ No newline at end of file + # we don't need a try/finally since cancellation will be triggered + # by the surrounding nursery on error. + parent_exit.set() From 083b73ad4a15f86b1bc1966e417890db9e24c83f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 25 Oct 2021 10:22:41 -0400 Subject: [PATCH 25/28] Test: don't grab debug lock if not in mode --- tractor/_debug.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tractor/_debug.py b/tractor/_debug.py index 67485af..7aac4eb 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -558,6 +558,9 @@ async def acquire_debug_lock( Grab root's debug lock on entry, release on exit. ''' + if not debug_mode(): + return + async with trio.open_nursery() as n: cs = await n.start( wait_for_parent_stdin_hijack, From c7f59bd48375286ae8092d7a7c1efae5a5de2ab4 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Mon, 25 Oct 2021 19:17:42 +0200 Subject: [PATCH 26/28] Add a news fragment --- newsfragments/241.feature | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 newsfragments/241.feature diff --git a/newsfragments/241.feature b/newsfragments/241.feature new file mode 100644 index 0000000..c5b3791 --- /dev/null +++ b/newsfragments/241.feature @@ -0,0 +1,4 @@ +Introduce a new sub-package that exposes all our high(er) level trio primitives and goodies, most importantly: +- A new `open_actor_cluster` procedure is available for concurrently spawning a number of actors. +- A new `gather_contexts` procedure is available for concurrently entering a sequence of async context managers. + From 49dd230b4fd64ebf22b1a7319efd2daab2c3454f Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Mon, 25 Oct 2021 20:01:21 +0200 Subject: [PATCH 27/28] Add a newline --- newsfragments/241.feature | 1 + 1 file changed, 1 insertion(+) diff --git a/newsfragments/241.feature b/newsfragments/241.feature index c5b3791..52e2d51 100644 --- a/newsfragments/241.feature +++ b/newsfragments/241.feature @@ -1,4 +1,5 @@ Introduce a new sub-package that exposes all our high(er) level trio primitives and goodies, most importantly: + - A new `open_actor_cluster` procedure is available for concurrently spawning a number of actors. - A new `gather_contexts` procedure is available for concurrently entering a sequence of async context managers. From 6da76949fd71b8c7a423af420beed8bc94768ae9 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Wed, 27 Oct 2021 17:03:25 +0200 Subject: [PATCH 28/28] Fix the syntax and point to the new package --- newsfragments/241.feature | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/newsfragments/241.feature b/newsfragments/241.feature index 52e2d51..e2b4297 100644 --- a/newsfragments/241.feature +++ b/newsfragments/241.feature @@ -1,5 +1,6 @@ -Introduce a new sub-package that exposes all our high(er) level trio primitives and goodies, most importantly: +Introduce a new `sub-package`_ that exposes all our high(er) level trio primitives and goodies, most importantly: -- A new `open_actor_cluster` procedure is available for concurrently spawning a number of actors. -- A new `gather_contexts` procedure is available for concurrently entering a sequence of async context managers. +- A new ``open_actor_cluster`` procedure is available for concurrently spawning a number of actors. +- A new ``gather_contexts`` procedure is available for concurrently entering a sequence of async context managers. +.. _sub-package: ../tractor/trionics