From 6988e8d3c8400b42537f62b68b8b28bb6627b955 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 12:02:21 -0400 Subject: [PATCH] Add an async actor cluster spawner prototype --- tractor/_clustering.py | 49 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 tractor/_clustering.py diff --git a/tractor/_clustering.py b/tractor/_clustering.py new file mode 100644 index 0000000..6e3f05d --- /dev/null +++ b/tractor/_clustering.py @@ -0,0 +1,49 @@ +''' +Actor cluster helpers. + +''' +from contextlib import asynccontextmanager as acm +from multiprocessing import cpu_count +from typing import AsyncGenerator, Optional + +import trio +import tractor + + +@acm +async def open_actor_cluster( + + modules: list[str], + count: int = cpu_count(), + names: Optional[list[str]] = None, + +) -> AsyncGenerator[..., dict[str, tractor.Portal]]: + + portals: dict[str, tractor.Portal] = {} + uid = tractor.current_actor().uid + + if not names: + suffix = '_'.join(uid) + names = [f'worker_{i}.' + suffix for i in range(count)] + + if not len(names) == count: + raise ValueError( + 'Number of names is {len(names)} but count it {count}') + + async with ( + tractor.open_nursery() as an, + trio.open_nursery() as n, + ): + for index, key in zip(range(count), names): + + async def start(i) -> None: + key = f'worker_{i}.' + '_'.join(uid) + portals[key] = await an.start_actor( + enable_modules=modules, + name=key, + ) + + n.start_soon(start, index) + + assert len(portals) == count + yield portals