Compare commits

...

22 Commits

Author SHA1 Message Date
overclockworked64 8d9ad6bac9
Get rid of external teardown trigger 2021-10-22 03:10:46 +02:00
overclockworked64 3020793415
Get rid of external teardown trigger because #245 resolves the problem 2021-10-22 03:03:41 +02:00
overclockworked64 14a7a129bd
Get rid of dumb random uid and use current actor's uid 2021-10-22 02:59:15 +02:00
overclockworked64 f4af27953e
Allow specifying start_method and hard_kill 2021-10-21 22:14:12 +02:00
overclockworked64 b17bdbfa7b
Add a clustering test 2021-10-21 22:14:11 +02:00
overclockworked64 5f7802dc01
Rename a variable and fix type annotations 2021-10-21 22:14:10 +02:00
overclockworked64 c8e7eb8f0b
Cancel nursery 2021-10-21 22:14:09 +02:00
overclockworked64 010a994f1d
Make sure the ID is a str 2021-10-21 22:14:08 +02:00
overclockworked64 0f613050e1
Avoid RuntimeError by not using current_actor's uid 2021-10-21 22:14:07 +02:00
overclockworked64 dc8ccca8be
Make 'async_enter_all' take a teardown trigger which '_enter_and_wait' will wait on 2021-10-21 22:14:06 +02:00
overclockworked64 21042ef926
Postpone evaluation of annotations 2021-10-21 22:14:06 +02:00
overclockworked64 4974e3efe5
Add 'open_actor_cluster' to __all__ 2021-10-21 22:14:05 +02:00
overclockworked64 f1c24c7ab7
Add 'trio.trionics' to setup.py 2021-10-21 22:14:04 +02:00
Tyler Goodlet e5b3eb117d Fix *args-like type annot 2021-10-17 08:56:35 -04:00
Tyler Goodlet 619b3b344a Lul, fix everything for cluster helper 2021-10-17 08:56:35 -04:00
Tyler Goodlet 5a99482b9d Fix type path to new `_supervise` mod 2021-10-17 08:56:35 -04:00
Tyler Goodlet 4ec177d717 Expose `Lagged` for broadcasting 2021-10-17 08:56:35 -04:00
Tyler Goodlet 9ce4dc584a Fix top level nursery import 2021-10-17 08:56:35 -04:00
Tyler Goodlet 6988e8d3c8 Add an async actor cluster spawner prototype 2021-10-17 08:56:35 -04:00
Tyler Goodlet c2a19c630b Move broadcast channel parts into trionics 2021-10-17 08:56:35 -04:00
Tyler Goodlet 917d94dcbb Start `trionics` sub-pkg with `async_enter_all()`
Since it seems we're building out more and more higher level primitives
in order to support certain parallel style actor trees and messaging
patterns (eg. task broadcast channels), we might as well start a new
sub-package for purely `trio` constructions. We hereby dub this
the realm of `trionics` (like electronics but for trios instead of
electrons).

To kick things off, add an `async_enter_all()` concurrent
exit-stack-like context manager API which will concurrently spawn
a sequence of provided async context managers and deliver their ordered
results but with proper support for `trio` cancellation semantics.
The stdlib's `AsyncExitStack` is not compatible with nurseries not
`trio` tasks (which are cancelled) since as task will be suspended on
the stack after push and does not ever hit a checkpoint until the stack
is closed.
2021-10-17 08:56:35 -04:00
Tyler Goodlet e4e47c3901 Rename the nursery module to `_supervise` 2021-10-17 08:56:35 -04:00
11 changed files with 167 additions and 5 deletions

View File

@ -20,7 +20,7 @@ async def sleep(
async def open_ctx(
n: tractor._trionics.ActorNursery
n: tractor._supervise.ActorNursery
):
# spawn both actors

View File

@ -35,6 +35,7 @@ setup(
platforms=['linux', 'windows'],
packages=[
'tractor',
'tractor.trionics',
'tractor.testing',
],
install_requires=[

View File

@ -0,0 +1,37 @@
import itertools
import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import async_enter_all
from conftest import tractor_test
MESSAGE = 'tractoring at full speed'
@tractor.context
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE
@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
async_enter_all(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
async_enter_all(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)

View File

@ -12,7 +12,7 @@ import pytest
import trio
from trio.lowlevel import current_task
import tractor
from tractor._broadcast import broadcast_receiver, Lagged
from tractor.trionics import broadcast_receiver, Lagged
@tractor.context
@ -432,7 +432,6 @@ def test_first_recver_is_cancelled():
tx, rx = trio.open_memory_channel(1)
brx = broadcast_receiver(rx, 1)
cs = trio.CancelScope()
sequence = list(range(3))
async def sub_and_recv():
with cs:

View File

@ -4,6 +4,7 @@ tractor: An actor model micro-framework built on
"""
from trio import MultiError
from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
Context,
@ -13,7 +14,7 @@ from ._streaming import (
context,
)
from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._trionics import open_nursery
from ._supervise import open_nursery
from ._state import current_actor, is_root_process
from ._exceptions import (
RemoteActorError,
@ -39,6 +40,7 @@ __all__ = [
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'Portal',

View File

@ -0,0 +1,53 @@
'''
Actor cluster helpers.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from multiprocessing import cpu_count
from typing import AsyncGenerator, Optional
import trio
import tractor
@acm
async def open_actor_cluster(
modules: list[str],
count: int = cpu_count(),
names: Optional[list[str]] = None,
start_method: Optional[str] = None,
hard_kill: bool = False,
) -> AsyncGenerator[
list[str],
dict[str, tractor.Portal]
]:
portals: dict[str, tractor.Portal] = {}
if not names:
names = [f'worker_{i}' for i in range(count)]
if not len(names) == count:
raise ValueError(
'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n:
uid = tractor.current_actor().uid
async def _start(name: str) -> None:
name = f'{name}.{uid}'
portals[name] = await an.start_actor(
enable_modules=modules,
name=name,
)
for name in names:
n.start_soon(_start, name)
assert len(portals) == count
yield portals
await an.cancel(hard_kill=hard_kill)

View File

@ -19,8 +19,8 @@ import trio
from ._ipc import Channel
from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor
from ._broadcast import broadcast_receiver, BroadcastReceiver
from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver
log = get_logger(__name__)

View File

@ -0,0 +1,14 @@
'''
Sugary patterns for trio + tractor designs.
'''
from ._mngrs import async_enter_all
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged
__all__ = [
'async_enter_all',
'broadcast_receiver',
'BroadcastReceiver',
'Lagged',
]

View File

@ -0,0 +1,56 @@
'''
Async context manager primitives with hard ``trio``-aware semantics
'''
from typing import AsyncContextManager, AsyncGenerator
from typing import TypeVar, Sequence
from contextlib import asynccontextmanager as acm
import trio
# A regular invariant generic type
T = TypeVar("T")
async def _enter_and_wait(
mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
all_entered: trio.Event,
) -> None:
'''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
async with mngr as value:
unwrapped[id(mngr)] = value
if all(unwrapped.values()):
all_entered.set()
await trio.sleep_forever()
@acm
async def async_enter_all(
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[T, ...], None]:
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event()
async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_wait,
mngr,
unwrapped,
all_entered,
)
# deliver control once all managers have started up
await all_entered.wait()
yield tuple(unwrapped.values())
n.cancel_scope.cancel()