Compare commits

...

10 Commits

Author SHA1 Message Date
overclockworked64 8d9ad6bac9
Get rid of external teardown trigger 2021-10-22 03:10:46 +02:00
overclockworked64 3020793415
Get rid of external teardown trigger because #245 resolves the problem 2021-10-22 03:03:41 +02:00
overclockworked64 14a7a129bd
Get rid of dumb random uid and use current actor's uid 2021-10-22 02:59:15 +02:00
overclockworked64 f4af27953e
Allow specifying start_method and hard_kill 2021-10-21 22:14:12 +02:00
overclockworked64 b17bdbfa7b
Add a clustering test 2021-10-21 22:14:11 +02:00
overclockworked64 5f7802dc01
Rename a variable and fix type annotations 2021-10-21 22:14:10 +02:00
overclockworked64 c8e7eb8f0b
Cancel nursery 2021-10-21 22:14:09 +02:00
overclockworked64 010a994f1d
Make sure the ID is a str 2021-10-21 22:14:08 +02:00
overclockworked64 0f613050e1
Avoid RuntimeError by not using current_actor's uid 2021-10-21 22:14:07 +02:00
overclockworked64 dc8ccca8be
Make 'async_enter_all' take a teardown trigger which '_enter_and_wait' will wait on 2021-10-21 22:14:06 +02:00
3 changed files with 67 additions and 37 deletions

View File

@ -0,0 +1,37 @@
import itertools
import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import async_enter_all
from conftest import tractor_test
MESSAGE = 'tractoring at full speed'
@tractor.context
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE
@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
async_enter_all(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
async_enter_all(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)

View File

@ -14,39 +14,40 @@ import tractor
@acm @acm
async def open_actor_cluster( async def open_actor_cluster(
modules: list[str], modules: list[str],
count: int = cpu_count(), count: int = cpu_count(),
names: Optional[list[str]] = None, names: Optional[list[str]] = None,
start_method: Optional[str] = None,
hard_kill: bool = False,
) -> AsyncGenerator[ ) -> AsyncGenerator[
list[str], list[str],
dict[str, tractor.Portal] dict[str, tractor.Portal]
]: ]:
portals: dict[str, tractor.Portal] = {} portals: dict[str, tractor.Portal] = {}
uid = tractor.current_actor().uid
if not names: if not names:
suffix = '_'.join(uid) names = [f'worker_{i}' for i in range(count)]
names = [f'worker_{i}.' + suffix for i in range(count)]
if not len(names) == count: if not len(names) == count:
raise ValueError( raise ValueError(
'Number of names is {len(names)} but count it {count}') 'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery() as an: async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for index, key in zip(range(count), names): uid = tractor.current_actor().uid
async def start(i) -> None: async def _start(name: str) -> None:
key = f'worker_{i}.' + '_'.join(uid) name = f'{name}.{uid}'
portals[key] = await an.start_actor( portals[name] = await an.start_actor(
enable_modules=modules, enable_modules=modules,
name=key, name=name,
) )
n.start_soon(start, index) for name in names:
n.start_soon(_start, name)
assert len(portals) == count assert len(portals) == count
yield portals yield portals
await an.cancel(hard_kill=hard_kill)

View File

@ -2,8 +2,8 @@
Async context manager primitives with hard ``trio``-aware semantics Async context manager primitives with hard ``trio``-aware semantics
''' '''
from typing import AsyncContextManager from typing import AsyncContextManager, AsyncGenerator
from typing import TypeVar from typing import TypeVar, Sequence
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
import trio import trio
@ -13,52 +13,44 @@ import trio
T = TypeVar("T") T = TypeVar("T")
async def _enter_and_sleep( async def _enter_and_wait(
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
to_yield: dict[int, T], unwrapped: dict[int, T],
all_entered: trio.Event, all_entered: trio.Event,
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, ) -> None:
) -> T:
'''Open the async context manager deliver it's value '''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled. to this task's spawner and sleep until cancelled.
''' '''
async with mngr as value: async with mngr as value:
to_yield[id(mngr)] = value unwrapped[id(mngr)] = value
if all(to_yield.values()): if all(unwrapped.values()):
all_entered.set() all_entered.set()
# sleep until cancelled
await trio.sleep_forever() await trio.sleep_forever()
@acm @acm
async def async_enter_all( async def async_enter_all(
mngrs: Sequence[AsyncContextManager[T]],
*mngrs: tuple[AsyncContextManager[T]], ) -> AsyncGenerator[tuple[T, ...], None]:
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)
) -> tuple[T]:
to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event() all_entered = trio.Event()
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for mngr in mngrs: for mngr in mngrs:
n.start_soon( n.start_soon(
_enter_and_sleep, _enter_and_wait,
mngr, mngr,
to_yield, unwrapped,
all_entered, all_entered,
) )
# deliver control once all managers have started up # deliver control once all managers have started up
await all_entered.wait() await all_entered.wait()
yield tuple(to_yield.values())
# tear down all sleeper tasks thus triggering individual yield tuple(unwrapped.values())
# mngr ``__aexit__()``s.
n.cancel_scope.cancel() n.cancel_scope.cancel()