diff --git a/tractor/_clustering.py b/tractor/_clustering.py index dbb50304..9f019c6c 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -25,17 +25,21 @@ from contextlib import ( from multiprocessing import cpu_count from typing import ( AsyncGenerator, + TYPE_CHECKING, ) import trio import tractor +if TYPE_CHECKING: + from tractor.msg import Aid + @acm async def open_actor_cluster( modules: list[str], count: int = cpu_count(), - names: list[str] | None = None, + names: list[str]|None = None, hard_kill: bool = False, # passed through verbatim to ``open_root_actor()`` @@ -45,15 +49,26 @@ async def open_actor_cluster( dict[str, tractor.Portal], None, ]: + ''' + Open an "actor cluster", much like a "process worker pool" but where + each primitive is a full `tractor.Actor` allocated as a batch and + mapped via a `dict[str, Portal]` table returned to the caller. + ''' portals: dict[str, tractor.Portal] = {} if not names: - names = [f'worker_{i}' for i in range(count)] + names: list[str] = [ + f'worker_{i}' + for i in range(count) + ] - if not len(names) == count: + if len(names) != count: raise ValueError( - 'Number of names is {len(names)} but count it {count}') + f'Number of subactor names != count ??\n' + f'len(name) = {len(names)!r}\n' + f'count = {count!r}\n' + ) async with ( # tractor.trionics.collapse_eg(), @@ -66,18 +81,18 @@ async def open_actor_cluster( trio.open_nursery() as tn, tractor.trionics.maybe_raise_from_masking_exc() ): - uid = tractor.current_actor().uid - + aid: Aid = tractor.current_actor().aid async def _start(name: str) -> None: - name = f'{uid[0]}.{name}' portals[name] = await an.start_actor( enable_modules=modules, - name=name, + name=f'{aid.name}.{name}', ) for name in names: tn.start_soon(_start, name) - assert len(portals) == count + assert len(portals) == count, 'Portal-count mismatch?\n' yield portals - await an.cancel(hard_kill=hard_kill) + await an.cancel( + hard_kill=hard_kill + )