diff --git a/setup.py b/setup.py index d233ecd..4f7431c 100755 --- a/setup.py +++ b/setup.py @@ -35,6 +35,7 @@ setup( platforms=['linux', 'windows'], packages=[ 'tractor', + 'tractor.trionics', 'tractor.testing', ], install_requires=[ diff --git a/tests/test_clustering.py b/tests/test_clustering.py new file mode 100644 index 0000000..1e85d04 --- /dev/null +++ b/tests/test_clustering.py @@ -0,0 +1,37 @@ +import itertools + +import trio +import tractor +from tractor import open_actor_cluster +from tractor.trionics import async_enter_all + +from conftest import tractor_test + + +MESSAGE = 'tractoring at full speed' + + +@tractor.context +async def worker(ctx: tractor.Context) -> None: + await ctx.started() + async with ctx.open_stream() as stream: + async for msg in stream: + # do something with msg + print(msg) + assert msg == MESSAGE + + +@tractor_test +async def test_streaming_to_actor_cluster() -> None: + async with ( + open_actor_cluster(modules=[__name__]) as portals, + async_enter_all( + mngrs=[p.open_context(worker) for p in portals.values()], + ) as contexts, + async_enter_all( + mngrs=[ctx[0].open_stream() for ctx in contexts], + ) as streams, + ): + with trio.move_on_after(1): + for stream in itertools.cycle(streams): + await stream.send(MESSAGE) diff --git a/tractor/__init__.py b/tractor/__init__.py index 795feb7..602e7bf 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,6 +4,7 @@ tractor: An actor model micro-framework built on """ from trio import MultiError +from ._clustering import open_actor_cluster from ._ipc import Channel from ._streaming import ( Context, @@ -39,6 +40,7 @@ __all__ = [ 'get_arbiter', 'is_root_process', 'msg', + 'open_actor_cluster', 'open_nursery', 'open_root_actor', 'Portal', diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 829caff..410459b 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -2,6 +2,8 @@ Actor cluster helpers. ''' +from __future__ import annotations + from contextlib import asynccontextmanager as acm from multiprocessing import cpu_count from typing import AsyncGenerator, Optional @@ -12,39 +14,40 @@ import tractor @acm async def open_actor_cluster( - modules: list[str], count: int = cpu_count(), names: Optional[list[str]] = None, - + start_method: Optional[str] = None, + hard_kill: bool = False, ) -> AsyncGenerator[ list[str], dict[str, tractor.Portal] ]: portals: dict[str, tractor.Portal] = {} - uid = tractor.current_actor().uid if not names: - suffix = '_'.join(uid) - names = [f'worker_{i}.' + suffix for i in range(count)] + names = [f'worker_{i}' for i in range(count)] if not len(names) == count: raise ValueError( 'Number of names is {len(names)} but count it {count}') - async with tractor.open_nursery() as an: + async with tractor.open_nursery(start_method=start_method) as an: async with trio.open_nursery() as n: - for index, key in zip(range(count), names): + uid = tractor.current_actor().uid - async def start(i) -> None: - key = f'worker_{i}.' + '_'.join(uid) - portals[key] = await an.start_actor( - enable_modules=modules, - name=key, - ) + async def _start(name: str) -> None: + name = f'{name}.{uid}' + portals[name] = await an.start_actor( + enable_modules=modules, + name=name, + ) - n.start_soon(start, index) + for name in names: + n.start_soon(_start, name) assert len(portals) == count yield portals + + await an.cancel(hard_kill=hard_kill) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 09d4b14..ca75c55 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -2,8 +2,8 @@ Async context manager primitives with hard ``trio``-aware semantics ''' -from typing import AsyncContextManager -from typing import TypeVar +from typing import AsyncContextManager, AsyncGenerator +from typing import TypeVar, Sequence from contextlib import asynccontextmanager as acm import trio @@ -13,52 +13,44 @@ import trio T = TypeVar("T") -async def _enter_and_sleep( - +async def _enter_and_wait( mngr: AsyncContextManager[T], - to_yield: dict[int, T], + unwrapped: dict[int, T], all_entered: trio.Event, - # task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, - -) -> T: +) -> None: '''Open the async context manager deliver it's value to this task's spawner and sleep until cancelled. ''' async with mngr as value: - to_yield[id(mngr)] = value + unwrapped[id(mngr)] = value - if all(to_yield.values()): + if all(unwrapped.values()): all_entered.set() - # sleep until cancelled await trio.sleep_forever() @acm async def async_enter_all( - - *mngrs: tuple[AsyncContextManager[T]], - -) -> tuple[T]: - - to_yield = {}.fromkeys(id(mngr) for mngr in mngrs) + mngrs: Sequence[AsyncContextManager[T]], +) -> AsyncGenerator[tuple[T, ...], None]: + unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) all_entered = trio.Event() async with trio.open_nursery() as n: for mngr in mngrs: n.start_soon( - _enter_and_sleep, + _enter_and_wait, mngr, - to_yield, + unwrapped, all_entered, ) # deliver control once all managers have started up await all_entered.wait() - yield tuple(to_yield.values()) - # tear down all sleeper tasks thus triggering individual - # mngr ``__aexit__()``s. - n.cancel_scope.cancel() + yield tuple(unwrapped.values()) + + n.cancel_scope.cancel() \ No newline at end of file