diff --git a/examples/debugging/fast_error_in_root_after_spawn.py b/examples/debugging/fast_error_in_root_after_spawn.py index 044815b..570cf7e 100644 --- a/examples/debugging/fast_error_in_root_after_spawn.py +++ b/examples/debugging/fast_error_in_root_after_spawn.py @@ -20,7 +20,7 @@ async def sleep( async def open_ctx( - n: tractor._trionics.ActorNursery + n: tractor._supervise.ActorNursery ): # spawn both actors diff --git a/newsfragments/241.feature b/newsfragments/241.feature new file mode 100644 index 0000000..e2b4297 --- /dev/null +++ b/newsfragments/241.feature @@ -0,0 +1,6 @@ +Introduce a new `sub-package`_ that exposes all our high(er) level trio primitives and goodies, most importantly: + +- A new ``open_actor_cluster`` procedure is available for concurrently spawning a number of actors. +- A new ``gather_contexts`` procedure is available for concurrently entering a sequence of async context managers. + +.. _sub-package: ../tractor/trionics diff --git a/setup.py b/setup.py index d233ecd..4f7431c 100755 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ setup( platforms=['linux', 'windows'], packages=[ 'tractor', + 'tractor.trionics', 'tractor.testing', ], install_requires=[ diff --git a/tests/test_clustering.py b/tests/test_clustering.py new file mode 100644 index 0000000..ba8052f --- /dev/null +++ b/tests/test_clustering.py @@ -0,0 +1,37 @@ +import itertools + +import trio +import tractor +from tractor import open_actor_cluster +from tractor.trionics import gather_contexts + +from conftest import tractor_test + + +MESSAGE = 'tractoring at full speed' + + +@tractor.context +async def worker(ctx: tractor.Context) -> None: + await ctx.started() + async with ctx.open_stream() as stream: + async for msg in stream: + # do something with msg + print(msg) + assert msg == MESSAGE + + +@tractor_test +async def test_streaming_to_actor_cluster() -> None: + async with ( + open_actor_cluster(modules=[__name__]) as portals, + gather_contexts( + mngrs=[p.open_context(worker) for p in portals.values()], + ) as contexts, + gather_contexts( + mngrs=[ctx[0].open_stream() for ctx in contexts], + ) as streams, + ): + with trio.move_on_after(1): + for stream in itertools.cycle(streams): + await stream.send(MESSAGE) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index f32d209..b18a40e 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -12,7 +12,7 @@ import pytest import trio from trio.lowlevel import current_task import tractor -from tractor._broadcast import broadcast_receiver, Lagged +from tractor.trionics import broadcast_receiver, Lagged @tractor.context @@ -432,7 +432,6 @@ def test_first_recver_is_cancelled(): tx, rx = trio.open_memory_channel(1) brx = broadcast_receiver(rx, 1) cs = trio.CancelScope() - sequence = list(range(3)) async def sub_and_recv(): with cs: diff --git a/tractor/__init__.py b/tractor/__init__.py index 394693f..602e7bf 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,6 +4,7 @@ tractor: An actor model micro-framework built on """ from trio import MultiError +from ._clustering import open_actor_cluster from ._ipc import Channel from ._streaming import ( Context, @@ -13,7 +14,7 @@ from ._streaming import ( context, ) from ._discovery import get_arbiter, find_actor, wait_for_actor -from ._trionics import open_nursery +from ._supervise import open_nursery from ._state import current_actor, is_root_process from ._exceptions import ( RemoteActorError, @@ -39,6 +40,7 @@ __all__ = [ 'get_arbiter', 'is_root_process', 'msg', + 'open_actor_cluster', 'open_nursery', 'open_root_actor', 'Portal', diff --git a/tractor/_clustering.py b/tractor/_clustering.py new file mode 100644 index 0000000..6d7e7c8 --- /dev/null +++ b/tractor/_clustering.py @@ -0,0 +1,53 @@ +''' +Actor cluster helpers. + +''' +from __future__ import annotations + +from contextlib import asynccontextmanager as acm +from multiprocessing import cpu_count +from typing import AsyncGenerator, Optional + +import trio +import tractor + + +@acm +async def open_actor_cluster( + modules: list[str], + count: int = cpu_count(), + names: Optional[list[str]] = None, + start_method: Optional[str] = None, + hard_kill: bool = False, +) -> AsyncGenerator[ + dict[str, tractor.Portal], + None, +]: + + portals: dict[str, tractor.Portal] = {} + + if not names: + names = [f'worker_{i}' for i in range(count)] + + if not len(names) == count: + raise ValueError( + 'Number of names is {len(names)} but count it {count}') + + async with tractor.open_nursery(start_method=start_method) as an: + async with trio.open_nursery() as n: + uid = tractor.current_actor().uid + + async def _start(name: str) -> None: + name = f'{name}.{uid}' + portals[name] = await an.start_actor( + enable_modules=modules, + name=name, + ) + + for name in names: + n.start_soon(_start, name) + + assert len(portals) == count + yield portals + + await an.cancel(hard_kill=hard_kill) diff --git a/tractor/_debug.py b/tractor/_debug.py index 67485af..7aac4eb 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -558,6 +558,9 @@ async def acquire_debug_lock( Grab root's debug lock on entry, release on exit. ''' + if not debug_mode(): + return + async with trio.open_nursery() as n: cs = await n.start( wait_for_parent_stdin_hijack, diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 6ee264c..5c22116 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -19,8 +19,8 @@ import trio from ._ipc import Channel from ._exceptions import unpack_error, ContextCancelled from ._state import current_actor -from ._broadcast import broadcast_receiver, BroadcastReceiver from .log import get_logger +from .trionics import broadcast_receiver, BroadcastReceiver log = get_logger(__name__) diff --git a/tractor/_trionics.py b/tractor/_supervise.py similarity index 100% rename from tractor/_trionics.py rename to tractor/_supervise.py diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py new file mode 100644 index 0000000..3d1b9bd --- /dev/null +++ b/tractor/trionics/__init__.py @@ -0,0 +1,14 @@ +''' +Sugary patterns for trio + tractor designs. + +''' +from ._mngrs import gather_contexts +from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged + + +__all__ = [ + 'gather_contexts', + 'broadcast_receiver', + 'BroadcastReceiver', + 'Lagged', +] diff --git a/tractor/_broadcast.py b/tractor/trionics/_broadcast.py similarity index 100% rename from tractor/_broadcast.py rename to tractor/trionics/_broadcast.py diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py new file mode 100644 index 0000000..d31f1e0 --- /dev/null +++ b/tractor/trionics/_mngrs.py @@ -0,0 +1,78 @@ +''' +Async context manager primitives with hard ``trio``-aware semantics + +''' +from typing import AsyncContextManager, AsyncGenerator +from typing import TypeVar, Sequence +from contextlib import asynccontextmanager as acm + +import trio + + +# A regular invariant generic type +T = TypeVar("T") + + +async def _enter_and_wait( + + mngr: AsyncContextManager[T], + unwrapped: dict[int, T], + all_entered: trio.Event, + parent_exit: trio.Event, + +) -> None: + ''' + Open the async context manager deliver it's value + to this task's spawner and sleep until cancelled. + + ''' + async with mngr as value: + unwrapped[id(mngr)] = value + + if all(unwrapped.values()): + all_entered.set() + + await parent_exit.wait() + + +@acm +async def gather_contexts( + + mngrs: Sequence[AsyncContextManager[T]], + +) -> AsyncGenerator[tuple[T, ...], None]: + ''' + Concurrently enter a sequence of async context managers, each in + a separate ``trio`` task and deliver the unwrapped values in the + same order once all managers have entered. On exit all contexts are + subsequently and concurrently exited. + + This function is somewhat similar to common usage of + ``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in + combo with ``asyncio.gather()`` except the managers are concurrently + entered and exited cancellation just works. + + ''' + unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) + + all_entered = trio.Event() + parent_exit = trio.Event() + + async with trio.open_nursery() as n: + for mngr in mngrs: + n.start_soon( + _enter_and_wait, + mngr, + unwrapped, + all_entered, + parent_exit, + ) + + # deliver control once all managers have started up + await all_entered.wait() + + yield tuple(unwrapped.values()) + + # we don't need a try/finally since cancellation will be triggered + # by the surrounding nursery on error. + parent_exit.set()