Compare commits

..

No commits in common. "8d9ad6bac9071f04c791f814c9b93c60b3b39ce8" and "21042ef9266d969f68c3bc08f12c2f8abdb1b8dc" have entirely different histories.

3 changed files with 37 additions and 67 deletions

View File

@ -1,37 +0,0 @@
import itertools
import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import async_enter_all
from conftest import tractor_test
MESSAGE = 'tractoring at full speed'
@tractor.context
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE
@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
async_enter_all(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
async_enter_all(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)

View File

@ -14,40 +14,39 @@ import tractor
@acm @acm
async def open_actor_cluster( async def open_actor_cluster(
modules: list[str], modules: list[str],
count: int = cpu_count(), count: int = cpu_count(),
names: Optional[list[str]] = None, names: Optional[list[str]] = None,
start_method: Optional[str] = None,
hard_kill: bool = False,
) -> AsyncGenerator[ ) -> AsyncGenerator[
list[str], list[str],
dict[str, tractor.Portal] dict[str, tractor.Portal]
]: ]:
portals: dict[str, tractor.Portal] = {} portals: dict[str, tractor.Portal] = {}
uid = tractor.current_actor().uid
if not names: if not names:
names = [f'worker_{i}' for i in range(count)] suffix = '_'.join(uid)
names = [f'worker_{i}.' + suffix for i in range(count)]
if not len(names) == count: if not len(names) == count:
raise ValueError( raise ValueError(
'Number of names is {len(names)} but count it {count}') 'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery(start_method=start_method) as an: async with tractor.open_nursery() as an:
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
uid = tractor.current_actor().uid for index, key in zip(range(count), names):
async def _start(name: str) -> None: async def start(i) -> None:
name = f'{name}.{uid}' key = f'worker_{i}.' + '_'.join(uid)
portals[name] = await an.start_actor( portals[key] = await an.start_actor(
enable_modules=modules, enable_modules=modules,
name=name, name=key,
) )
for name in names: n.start_soon(start, index)
n.start_soon(_start, name)
assert len(portals) == count assert len(portals) == count
yield portals yield portals
await an.cancel(hard_kill=hard_kill)

View File

@ -2,8 +2,8 @@
Async context manager primitives with hard ``trio``-aware semantics Async context manager primitives with hard ``trio``-aware semantics
''' '''
from typing import AsyncContextManager, AsyncGenerator from typing import AsyncContextManager
from typing import TypeVar, Sequence from typing import TypeVar
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
import trio import trio
@ -13,44 +13,52 @@ import trio
T = TypeVar("T") T = TypeVar("T")
async def _enter_and_wait( async def _enter_and_sleep(
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
unwrapped: dict[int, T], to_yield: dict[int, T],
all_entered: trio.Event, all_entered: trio.Event,
) -> None: # task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> T:
'''Open the async context manager deliver it's value '''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled. to this task's spawner and sleep until cancelled.
''' '''
async with mngr as value: async with mngr as value:
unwrapped[id(mngr)] = value to_yield[id(mngr)] = value
if all(unwrapped.values()): if all(to_yield.values()):
all_entered.set() all_entered.set()
# sleep until cancelled
await trio.sleep_forever() await trio.sleep_forever()
@acm @acm
async def async_enter_all( async def async_enter_all(
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[T, ...], None]: *mngrs: tuple[AsyncContextManager[T]],
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)
) -> tuple[T]:
to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event() all_entered = trio.Event()
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for mngr in mngrs: for mngr in mngrs:
n.start_soon( n.start_soon(
_enter_and_wait, _enter_and_sleep,
mngr, mngr,
unwrapped, to_yield,
all_entered, all_entered,
) )
# deliver control once all managers have started up # deliver control once all managers have started up
await all_entered.wait() await all_entered.wait()
yield tuple(to_yield.values())
yield tuple(unwrapped.values()) # tear down all sleeper tasks thus triggering individual
# mngr ``__aexit__()``s.
n.cancel_scope.cancel() n.cancel_scope.cancel()