Merge pull request #241 from goodboy/trionics

Trionics
pubsub_startup_response_msg
goodboy 2021-10-27 13:09:11 -04:00 committed by GitHub
commit 5dbe8e4b14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 198 additions and 5 deletions

View File

@ -20,7 +20,7 @@ async def sleep(
async def open_ctx( async def open_ctx(
n: tractor._trionics.ActorNursery n: tractor._supervise.ActorNursery
): ):
# spawn both actors # spawn both actors

View File

@ -0,0 +1,6 @@
Introduce a new `sub-package`_ that exposes all our high(er) level trio primitives and goodies, most importantly:
- A new ``open_actor_cluster`` procedure is available for concurrently spawning a number of actors.
- A new ``gather_contexts`` procedure is available for concurrently entering a sequence of async context managers.
.. _sub-package: ../tractor/trionics

View File

@ -35,6 +35,7 @@ setup(
platforms=['linux', 'windows'], platforms=['linux', 'windows'],
packages=[ packages=[
'tractor', 'tractor',
'tractor.trionics',
'tractor.testing', 'tractor.testing',
], ],
install_requires=[ install_requires=[

View File

@ -0,0 +1,37 @@
import itertools
import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import gather_contexts
from conftest import tractor_test
MESSAGE = 'tractoring at full speed'
@tractor.context
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE
@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
gather_contexts(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
gather_contexts(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):
await stream.send(MESSAGE)

View File

@ -12,7 +12,7 @@ import pytest
import trio import trio
from trio.lowlevel import current_task from trio.lowlevel import current_task
import tractor import tractor
from tractor._broadcast import broadcast_receiver, Lagged from tractor.trionics import broadcast_receiver, Lagged
@tractor.context @tractor.context
@ -432,7 +432,6 @@ def test_first_recver_is_cancelled():
tx, rx = trio.open_memory_channel(1) tx, rx = trio.open_memory_channel(1)
brx = broadcast_receiver(rx, 1) brx = broadcast_receiver(rx, 1)
cs = trio.CancelScope() cs = trio.CancelScope()
sequence = list(range(3))
async def sub_and_recv(): async def sub_and_recv():
with cs: with cs:

View File

@ -4,6 +4,7 @@ tractor: An actor model micro-framework built on
""" """
from trio import MultiError from trio import MultiError
from ._clustering import open_actor_cluster
from ._ipc import Channel from ._ipc import Channel
from ._streaming import ( from ._streaming import (
Context, Context,
@ -13,7 +14,7 @@ from ._streaming import (
context, context,
) )
from ._discovery import get_arbiter, find_actor, wait_for_actor from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._trionics import open_nursery from ._supervise import open_nursery
from ._state import current_actor, is_root_process from ._state import current_actor, is_root_process
from ._exceptions import ( from ._exceptions import (
RemoteActorError, RemoteActorError,
@ -39,6 +40,7 @@ __all__ = [
'get_arbiter', 'get_arbiter',
'is_root_process', 'is_root_process',
'msg', 'msg',
'open_actor_cluster',
'open_nursery', 'open_nursery',
'open_root_actor', 'open_root_actor',
'Portal', 'Portal',

View File

@ -0,0 +1,53 @@
'''
Actor cluster helpers.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from multiprocessing import cpu_count
from typing import AsyncGenerator, Optional
import trio
import tractor
@acm
async def open_actor_cluster(
modules: list[str],
count: int = cpu_count(),
names: Optional[list[str]] = None,
start_method: Optional[str] = None,
hard_kill: bool = False,
) -> AsyncGenerator[
dict[str, tractor.Portal],
None,
]:
portals: dict[str, tractor.Portal] = {}
if not names:
names = [f'worker_{i}' for i in range(count)]
if not len(names) == count:
raise ValueError(
'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n:
uid = tractor.current_actor().uid
async def _start(name: str) -> None:
name = f'{name}.{uid}'
portals[name] = await an.start_actor(
enable_modules=modules,
name=name,
)
for name in names:
n.start_soon(_start, name)
assert len(portals) == count
yield portals
await an.cancel(hard_kill=hard_kill)

View File

@ -558,6 +558,9 @@ async def acquire_debug_lock(
Grab root's debug lock on entry, release on exit. Grab root's debug lock on entry, release on exit.
''' '''
if not debug_mode():
return
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
cs = await n.start( cs = await n.start(
wait_for_parent_stdin_hijack, wait_for_parent_stdin_hijack,

View File

@ -19,8 +19,8 @@ import trio
from ._ipc import Channel from ._ipc import Channel
from ._exceptions import unpack_error, ContextCancelled from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor from ._state import current_actor
from ._broadcast import broadcast_receiver, BroadcastReceiver
from .log import get_logger from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver
log = get_logger(__name__) log = get_logger(__name__)

View File

@ -0,0 +1,14 @@
'''
Sugary patterns for trio + tractor designs.
'''
from ._mngrs import gather_contexts
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged
__all__ = [
'gather_contexts',
'broadcast_receiver',
'BroadcastReceiver',
'Lagged',
]

View File

@ -0,0 +1,78 @@
'''
Async context manager primitives with hard ``trio``-aware semantics
'''
from typing import AsyncContextManager, AsyncGenerator
from typing import TypeVar, Sequence
from contextlib import asynccontextmanager as acm
import trio
# A regular invariant generic type
T = TypeVar("T")
async def _enter_and_wait(
mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
all_entered: trio.Event,
parent_exit: trio.Event,
) -> None:
'''
Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
async with mngr as value:
unwrapped[id(mngr)] = value
if all(unwrapped.values()):
all_entered.set()
await parent_exit.wait()
@acm
async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[T, ...], None]:
'''
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
same order once all managers have entered. On exit all contexts are
subsequently and concurrently exited.
This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited cancellation just works.
'''
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event()
parent_exit = trio.Event()
async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_wait,
mngr,
unwrapped,
all_entered,
parent_exit,
)
# deliver control once all managers have started up
await all_entered.wait()
yield tuple(unwrapped.values())
# we don't need a try/finally since cancellation will be triggered
# by the surrounding nursery on error.
parent_exit.set()