From 76767a3d7ee990e59a4f394f46f6a78f23137ebe Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:30:44 +0200 Subject: [PATCH 01/13] Add 'trio.trionics' to setup.py --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index d233ecd..4f7431c 100755 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ setup( platforms=['linux', 'windows'], packages=[ 'tractor', + 'tractor.trionics', 'tractor.testing', ], install_requires=[ From 7d502cef74d51dcef54b038eb12d27c0370c6508 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:31:26 +0200 Subject: [PATCH 02/13] Add 'open_actor_cluster' to __all__ --- tractor/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/__init__.py b/tractor/__init__.py index 795feb7..602e7bf 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,6 +4,7 @@ tractor: An actor model micro-framework built on """ from trio import MultiError +from ._clustering import open_actor_cluster from ._ipc import Channel from ._streaming import ( Context, @@ -39,6 +40,7 @@ __all__ = [ 'get_arbiter', 'is_root_process', 'msg', + 'open_actor_cluster', 'open_nursery', 'open_root_actor', 'Portal', From 21afc69ac7124784ae8585a296ad6eb30d0beb31 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:33:39 +0200 Subject: [PATCH 03/13] Postpone evaluation of annotations --- tractor/_clustering.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 829caff..8142373 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -2,6 +2,8 @@ Actor cluster helpers. ''' +from __future__ import annotations + from contextlib import asynccontextmanager as acm from multiprocessing import cpu_count from typing import AsyncGenerator, Optional From 2815f1c343783ffaeddc06ca9b9e0e947e88163f Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:34:59 +0200 Subject: [PATCH 04/13] Make 'async_enter_all' take a teardown trigger which '_enter_and_wait' will wait on --- tractor/trionics/_mngrs.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 09d4b14..878bc35 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -13,12 +13,12 @@ import trio T = TypeVar("T") -async def _enter_and_sleep( +async def _enter_and_wait( mngr: AsyncContextManager[T], to_yield: dict[int, T], all_entered: trio.Event, - # task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + teardown_trigger: trio.Event, ) -> T: '''Open the async context manager deliver it's value @@ -31,14 +31,14 @@ async def _enter_and_sleep( if all(to_yield.values()): all_entered.set() - # sleep until cancelled - await trio.sleep_forever() + await teardown_trigger.wait() @acm async def async_enter_all( *mngrs: tuple[AsyncContextManager[T]], + teardown_trigger: trio.Event, ) -> tuple[T]: @@ -49,16 +49,13 @@ async def async_enter_all( async with trio.open_nursery() as n: for mngr in mngrs: n.start_soon( - _enter_and_sleep, + _enter_and_wait, mngr, to_yield, all_entered, + teardown_trigger, ) # deliver control once all managers have started up await all_entered.wait() yield tuple(to_yield.values()) - - # tear down all sleeper tasks thus triggering individual - # mngr ``__aexit__()``s. - n.cancel_scope.cancel() From 73cbb2388a7638688cf32dd0548d83cfea0bc89c Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:39:36 +0200 Subject: [PATCH 05/13] Avoid RuntimeError by not using current_actor's uid --- tractor/_clustering.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 8142373..37a03bb 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -25,7 +25,8 @@ async def open_actor_cluster( ]: portals: dict[str, tractor.Portal] = {} - uid = tractor.current_actor().uid + uid = __import__('random').randint(0, 2 ** 16) + # uid = tractor.current_actor().uid if not names: suffix = '_'.join(uid) From 6e6baf250b471ec8c158e77ad07e2b6b91083881 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:50:36 +0200 Subject: [PATCH 06/13] Make sure the ID is a str --- tractor/_clustering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 37a03bb..8fabc00 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -25,7 +25,7 @@ async def open_actor_cluster( ]: portals: dict[str, tractor.Portal] = {} - uid = __import__('random').randint(0, 2 ** 16) + uid = str(__import__('random').randint(0, 2 ** 16)) # uid = tractor.current_actor().uid if not names: From 6f9229cd09b6cf73b3e023bd704b71cb34543dc1 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sat, 16 Oct 2021 17:50:55 +0200 Subject: [PATCH 07/13] Cancel nursery --- tractor/_clustering.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 8fabc00..d7255fd 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -51,3 +51,5 @@ async def open_actor_cluster( assert len(portals) == count yield portals + + await an.cancel() From 3130a04c6109121729914307c2079cf670ae1104 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sun, 17 Oct 2021 07:33:37 +0200 Subject: [PATCH 08/13] Rename a variable and fix type annotations --- tractor/trionics/_mngrs.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 878bc35..bfc715b 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -2,8 +2,8 @@ Async context manager primitives with hard ``trio``-aware semantics ''' -from typing import AsyncContextManager -from typing import TypeVar +from typing import AsyncContextManager, AsyncGenerator +from typing import TypeVar, Sequence from contextlib import asynccontextmanager as acm import trio @@ -14,21 +14,19 @@ T = TypeVar("T") async def _enter_and_wait( - mngr: AsyncContextManager[T], - to_yield: dict[int, T], + unwrapped: dict[int, T], all_entered: trio.Event, teardown_trigger: trio.Event, - -) -> T: +) -> None: '''Open the async context manager deliver it's value to this task's spawner and sleep until cancelled. ''' async with mngr as value: - to_yield[id(mngr)] = value + unwrapped[id(mngr)] = value - if all(to_yield.values()): + if all(unwrapped.values()): all_entered.set() await teardown_trigger.wait() @@ -36,13 +34,14 @@ async def _enter_and_wait( @acm async def async_enter_all( - - *mngrs: tuple[AsyncContextManager[T]], + mngrs: Sequence[AsyncContextManager[T]], teardown_trigger: trio.Event, - -) -> tuple[T]: - - to_yield = {}.fromkeys(id(mngr) for mngr in mngrs) +) -> AsyncGenerator[tuple[T, ...], None]: + """This async context manager expects a 'teardown_trigger' from the + outside world which will be used internally to gracefully teardown + individual context managers. + """ + unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) all_entered = trio.Event() @@ -51,11 +50,12 @@ async def async_enter_all( n.start_soon( _enter_and_wait, mngr, - to_yield, + unwrapped, all_entered, teardown_trigger, ) # deliver control once all managers have started up await all_entered.wait() - yield tuple(to_yield.values()) + + yield tuple(unwrapped.values()) From c1089dbd95ac8b27214f2cde04d06a85794c6b07 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sun, 17 Oct 2021 07:33:54 +0200 Subject: [PATCH 09/13] Add a clustering test --- tests/test_clustering.py | 41 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/test_clustering.py diff --git a/tests/test_clustering.py b/tests/test_clustering.py new file mode 100644 index 0000000..8f28d85 --- /dev/null +++ b/tests/test_clustering.py @@ -0,0 +1,41 @@ +import itertools + +import trio +import tractor +from tractor import open_actor_cluster +from tractor.trionics import async_enter_all + +from conftest import tractor_test + + +MESSAGE = 'tractoring at full speed' + + +@tractor.context +async def worker(ctx: tractor.Context) -> None: + await ctx.started() + async with ctx.open_stream() as stream: + async for msg in stream: + # do something with msg + print(msg) + assert msg == MESSAGE + + +@tractor_test +async def test_streaming_to_actor_cluster() -> None: + teardown_trigger = trio.Event() + async with ( + open_actor_cluster(modules=[__name__]) as portals, + async_enter_all( + mngrs=[p.open_context(worker) for p in portals.values()], + teardown_trigger=teardown_trigger, + ) as contexts, + async_enter_all( + mngrs=[ctx[0].open_stream() for ctx in contexts], + teardown_trigger=teardown_trigger, + ) as streams, + ): + with trio.move_on_after(1): + for stream in itertools.cycle(streams): + await stream.send(MESSAGE) + teardown_trigger.set() \ No newline at end of file From b7a464167424bf86dec9a4d812d88a05e2d70510 Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Sun, 17 Oct 2021 14:40:09 +0200 Subject: [PATCH 10/13] Allow specifying start_method and hard_kill --- tractor/_clustering.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index d7255fd..2a68482 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -18,6 +18,8 @@ async def open_actor_cluster( modules: list[str], count: int = cpu_count(), names: Optional[list[str]] = None, + start_method: Optional[str] = None, + hard_kill: bool = False, ) -> AsyncGenerator[ list[str], @@ -36,7 +38,7 @@ async def open_actor_cluster( raise ValueError( 'Number of names is {len(names)} but count it {count}') - async with tractor.open_nursery() as an: + async with tractor.open_nursery(start_method=start_method) as an: async with trio.open_nursery() as n: for index, key in zip(range(count), names): @@ -52,4 +54,4 @@ async def open_actor_cluster( assert len(portals) == count yield portals - await an.cancel() + await an.cancel(hard_kill=hard_kill) From 04895b9d5edc69dbc72223620361eba21e901bbc Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Fri, 22 Oct 2021 02:59:15 +0200 Subject: [PATCH 11/13] Get rid of dumb random uid and use current actor's uid --- tractor/_clustering.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 2a68482..410459b 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -14,25 +14,20 @@ import tractor @acm async def open_actor_cluster( - modules: list[str], count: int = cpu_count(), names: Optional[list[str]] = None, start_method: Optional[str] = None, hard_kill: bool = False, - ) -> AsyncGenerator[ list[str], dict[str, tractor.Portal] ]: portals: dict[str, tractor.Portal] = {} - uid = str(__import__('random').randint(0, 2 ** 16)) - # uid = tractor.current_actor().uid if not names: - suffix = '_'.join(uid) - names = [f'worker_{i}.' + suffix for i in range(count)] + names = [f'worker_{i}' for i in range(count)] if not len(names) == count: raise ValueError( @@ -40,16 +35,17 @@ async def open_actor_cluster( async with tractor.open_nursery(start_method=start_method) as an: async with trio.open_nursery() as n: - for index, key in zip(range(count), names): + uid = tractor.current_actor().uid - async def start(i) -> None: - key = f'worker_{i}.' + '_'.join(uid) - portals[key] = await an.start_actor( - enable_modules=modules, - name=key, - ) + async def _start(name: str) -> None: + name = f'{name}.{uid}' + portals[name] = await an.start_actor( + enable_modules=modules, + name=name, + ) - n.start_soon(start, index) + for name in names: + n.start_soon(_start, name) assert len(portals) == count yield portals From 87e3d32992680e57fe862d5fa115248e0751d08a Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Fri, 22 Oct 2021 03:03:41 +0200 Subject: [PATCH 12/13] Get rid of external teardown trigger because #245 resolves the problem --- tractor/trionics/_mngrs.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index bfc715b..ca75c55 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -17,7 +17,6 @@ async def _enter_and_wait( mngr: AsyncContextManager[T], unwrapped: dict[int, T], all_entered: trio.Event, - teardown_trigger: trio.Event, ) -> None: '''Open the async context manager deliver it's value to this task's spawner and sleep until cancelled. @@ -29,18 +28,13 @@ async def _enter_and_wait( if all(unwrapped.values()): all_entered.set() - await teardown_trigger.wait() + await trio.sleep_forever() @acm async def async_enter_all( mngrs: Sequence[AsyncContextManager[T]], - teardown_trigger: trio.Event, ) -> AsyncGenerator[tuple[T, ...], None]: - """This async context manager expects a 'teardown_trigger' from the - outside world which will be used internally to gracefully teardown - individual context managers. - """ unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) all_entered = trio.Event() @@ -52,10 +46,11 @@ async def async_enter_all( mngr, unwrapped, all_entered, - teardown_trigger, ) # deliver control once all managers have started up await all_entered.wait() yield tuple(unwrapped.values()) + + n.cancel_scope.cancel() \ No newline at end of file From b91adcf38dca47b1642a8890869c7082f636f55d Mon Sep 17 00:00:00 2001 From: overclockworked64 Date: Fri, 22 Oct 2021 03:10:46 +0200 Subject: [PATCH 13/13] Get rid of external teardown trigger --- tests/test_clustering.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_clustering.py b/tests/test_clustering.py index 8f28d85..1e85d04 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -23,19 +23,15 @@ async def worker(ctx: tractor.Context) -> None: @tractor_test async def test_streaming_to_actor_cluster() -> None: - teardown_trigger = trio.Event() async with ( open_actor_cluster(modules=[__name__]) as portals, async_enter_all( mngrs=[p.open_context(worker) for p in portals.values()], - teardown_trigger=teardown_trigger, ) as contexts, async_enter_all( mngrs=[ctx[0].open_stream() for ctx in contexts], - teardown_trigger=teardown_trigger, ) as streams, ): with trio.move_on_after(1): for stream in itertools.cycle(streams): await stream.send(MESSAGE) - teardown_trigger.set() \ No newline at end of file