forked from goodboy/tractor
1
0
Fork 0

Merge pull request #252 from goodboy/246_facepalm_backup

Trionics improvements from @overclockworked64
graceful_gather
goodboy 2021-10-23 18:10:17 -04:00 committed by GitHub
commit 71b8f9f1ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 37 deletions

View File

@ -35,6 +35,7 @@ setup(
platforms=['linux', 'windows'], platforms=['linux', 'windows'],
packages=[ packages=[
'tractor', 'tractor',
'tractor.trionics',
'tractor.testing', 'tractor.testing',
], ],
install_requires=[ install_requires=[

View File

@ -0,0 +1,37 @@
import itertools
import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import async_enter_all
from conftest import tractor_test
MESSAGE = 'tractoring at full speed'
@tractor.context
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE
@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
async_enter_all(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
async_enter_all(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)

View File

@ -4,6 +4,7 @@ tractor: An actor model micro-framework built on
""" """
from trio import MultiError from trio import MultiError
from ._clustering import open_actor_cluster
from ._ipc import Channel from ._ipc import Channel
from ._streaming import ( from ._streaming import (
Context, Context,
@ -39,6 +40,7 @@ __all__ = [
'get_arbiter', 'get_arbiter',
'is_root_process', 'is_root_process',
'msg', 'msg',
'open_actor_cluster',
'open_nursery', 'open_nursery',
'open_root_actor', 'open_root_actor',
'Portal', 'Portal',

View File

@ -2,6 +2,8 @@
Actor cluster helpers. Actor cluster helpers.
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from multiprocessing import cpu_count from multiprocessing import cpu_count
from typing import AsyncGenerator, Optional from typing import AsyncGenerator, Optional
@ -12,39 +14,40 @@ import tractor
@acm @acm
async def open_actor_cluster( async def open_actor_cluster(
modules: list[str], modules: list[str],
count: int = cpu_count(), count: int = cpu_count(),
names: Optional[list[str]] = None, names: Optional[list[str]] = None,
start_method: Optional[str] = None,
hard_kill: bool = False,
) -> AsyncGenerator[ ) -> AsyncGenerator[
list[str], list[str],
dict[str, tractor.Portal] dict[str, tractor.Portal]
]: ]:
portals: dict[str, tractor.Portal] = {} portals: dict[str, tractor.Portal] = {}
uid = tractor.current_actor().uid
if not names: if not names:
suffix = '_'.join(uid) names = [f'worker_{i}' for i in range(count)]
names = [f'worker_{i}.' + suffix for i in range(count)]
if not len(names) == count: if not len(names) == count:
raise ValueError( raise ValueError(
'Number of names is {len(names)} but count it {count}') 'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery() as an: async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for index, key in zip(range(count), names): uid = tractor.current_actor().uid
async def start(i) -> None: async def _start(name: str) -> None:
key = f'worker_{i}.' + '_'.join(uid) name = f'{name}.{uid}'
portals[key] = await an.start_actor( portals[name] = await an.start_actor(
enable_modules=modules, enable_modules=modules,
name=key, name=name,
) )
n.start_soon(start, index) for name in names:
n.start_soon(_start, name)
assert len(portals) == count assert len(portals) == count
yield portals yield portals
await an.cancel(hard_kill=hard_kill)

View File

@ -2,8 +2,8 @@
Async context manager primitives with hard ``trio``-aware semantics Async context manager primitives with hard ``trio``-aware semantics
''' '''
from typing import AsyncContextManager from typing import AsyncContextManager, AsyncGenerator
from typing import TypeVar from typing import TypeVar, Sequence
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
import trio import trio
@ -13,52 +13,44 @@ import trio
T = TypeVar("T") T = TypeVar("T")
async def _enter_and_sleep( async def _enter_and_wait(
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
to_yield: dict[int, T], unwrapped: dict[int, T],
all_entered: trio.Event, all_entered: trio.Event,
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, ) -> None:
) -> T:
'''Open the async context manager deliver it's value '''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled. to this task's spawner and sleep until cancelled.
''' '''
async with mngr as value: async with mngr as value:
to_yield[id(mngr)] = value unwrapped[id(mngr)] = value
if all(to_yield.values()): if all(unwrapped.values()):
all_entered.set() all_entered.set()
# sleep until cancelled
await trio.sleep_forever() await trio.sleep_forever()
@acm @acm
async def async_enter_all( async def async_enter_all(
mngrs: Sequence[AsyncContextManager[T]],
*mngrs: tuple[AsyncContextManager[T]], ) -> AsyncGenerator[tuple[T, ...], None]:
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)
) -> tuple[T]:
to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event() all_entered = trio.Event()
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for mngr in mngrs: for mngr in mngrs:
n.start_soon( n.start_soon(
_enter_and_sleep, _enter_and_wait,
mngr, mngr,
to_yield, unwrapped,
all_entered, all_entered,
) )
# deliver control once all managers have started up # deliver control once all managers have started up
await all_entered.wait() await all_entered.wait()
yield tuple(to_yield.values())
# tear down all sleeper tasks thus triggering individual yield tuple(unwrapped.values())
# mngr ``__aexit__()``s.
n.cancel_scope.cancel() n.cancel_scope.cancel()