Merge pull request #121 from goodboy/infect_asyncio

Infect `asyncio`
win_ci_timeout
goodboy 2021-12-17 11:02:01 -05:00 committed by GitHub
commit bbcdbaaba4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1185 additions and 31 deletions

View File

@ -25,7 +25,7 @@ jobs:
testing-linux:
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
timeout-minutes: 9
timeout-minutes: 10
runs-on: ${{ matrix.os }}
strategy:

View File

@ -27,7 +27,9 @@ Features
- A modular transport stack, allowing for custom serialization (eg.
`msgspec`_), communications protocols, and environment specific IPC
primitives
- `structured concurrency`_ from the ground up
- Support for spawning process-level-SC, inter-loop one-to-one-task oriented
``asyncio`` actors via "infected ``asyncio``" mode
- `structured chadcurrency`_ from the ground up
Run a func in a process
@ -313,6 +315,117 @@ real time::
This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s IPC!
"Infected ``asyncio``" mode
---------------------------
Have a bunch of ``asyncio`` code you want to force to be SC at the process level?
Check out our experimental system for `guest-mode`_ controlled
``asyncio`` actors:
.. code:: python
import asyncio
from statistics import mean
import time
import trio
import tractor
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
) -> None:
# a first message must be sent **from** this ``asyncio``
# task or the ``trio`` side will never unblock from
# ``tractor.to_asyncio.open_channel_from():``
to_trio.send_nowait('start')
# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
# should probably offer something better.
while True:
# echo the msg back
to_trio.send_nowait(await from_trio.get())
await asyncio.sleep(0)
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
# this will block until the ``asyncio`` task sends a "first"
# message.
async with tractor.to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):
assert first == 'start'
await ctx.started(first)
async with ctx.open_stream() as stream:
async for msg in stream:
await chan.send(msg)
out = await chan.receive()
# echo back to parent actor-task
await stream.send(out)
async def main():
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
trio_to_aio_echo_server,
) as (ctx, first):
assert first == 'start'
count = 0
async with ctx.open_stream() as stream:
delays = []
send = time.time()
await stream.send(count)
async for msg in stream:
recv = time.time()
delays.append(recv - send)
assert msg == count
count += 1
send = time.time()
await stream.send(count)
if count >= 1e3:
break
print(f'mean round trip rate (Hz): {1/mean(delays)}')
await p.cancel_actor()
if __name__ == '__main__':
trio.run(main)
Yes, we spawn a python process, run ``asyncio``, start ``trio`` on the
``asyncio`` loop, then send commands to the ``trio`` scheduled tasks to
tell ``asyncio`` tasks what to do XD
We need help refining the `asyncio`-side channel API to be more
`trio`-like. Feel free to sling your opinion in `#273`_!
.. _#273: https://github.com/goodboy/tractor/issues/273
Higher level "cluster" APIs
---------------------------
To be extra terse the ``tractor`` devs have started hacking some "higher
level" APIs for managing actor trees/clusters. These interfaces should
generally be condsidered provisional for now but we encourage you to try
@ -476,6 +589,7 @@ channel`_!
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
@ -484,11 +598,14 @@ channel`_!
.. _messages: https://en.wikipedia.org/wiki/Message_passing
.. _trio docs: https://trio.readthedocs.io/en/latest/
.. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency
.. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony
.. _async generators: https://www.python.org/dev/peps/pep-0525/
.. _trio-parallel: https://github.com/richardsheridan/trio-parallel
.. _msgspec: https://jcristharif.com/msgspec/
.. _guest-mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square

View File

@ -0,0 +1,91 @@
'''
An SC compliant infected ``asyncio`` echo server.
'''
import asyncio
from statistics import mean
import time
import trio
import tractor
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
) -> None:
# a first message must be sent **from** this ``asyncio``
# task or the ``trio`` side will never unblock from
# ``tractor.to_asyncio.open_channel_from():``
to_trio.send_nowait('start')
# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
# should probably offer something better.
while True:
# echo the msg back
to_trio.send_nowait(await from_trio.get())
await asyncio.sleep(0)
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
# this will block until the ``asyncio`` task sends a "first"
# message.
async with tractor.to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):
assert first == 'start'
await ctx.started(first)
async with ctx.open_stream() as stream:
async for msg in stream:
await chan.send(msg)
out = await chan.receive()
# echo back to parent actor-task
await stream.send(out)
async def main():
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
trio_to_aio_echo_server,
) as (ctx, first):
assert first == 'start'
count = 0
async with ctx.open_stream() as stream:
delays = []
send = time.time()
await stream.send(count)
async for msg in stream:
recv = time.time()
delays.append(recv - send)
assert msg == count
count += 1
send = time.time()
await stream.send(count)
if count >= 1e3:
break
print(f'mean round trip rate (Hz): {1/mean(delays)}')
await p.cancel_actor()
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,28 @@
Add "infected ``asyncio`` mode; a sub-system to spawn and control
``asyncio`` actors using ``trio``'s guest-mode.
This gets us the following very interesting functionality:
- ability to spawn an actor that has a process entry point of
``asyncio.run()`` by passing ``infect_asyncio=True`` to
``Portal.start_actor()`` (and friends).
- the ``asyncio`` actor embeds ``trio`` using guest-mode and starts
a main ``trio`` task which runs the ``tractor.Actor._async_main()``
entry point engages all the normal ``tractor`` runtime IPC/messaging
machinery; for all purposes the actor is now running normally on
a ``trio.run()``.
- the actor can now make one-to-one task spawning requests to the
underlying ``asyncio`` event loop using either of:
* ``to_asyncio.run_task()`` to spawn and run an ``asyncio`` task to
completion and block until a return value is delivered.
* ``async with to_asyncio.open_channel_from():`` which spawns a task
and hands it a pair of "memory channels" to allow for bi-directional
streaming between the now SC-linked ``trio`` and ``asyncio`` tasks.
The output from any call(s) to ``asyncio`` can be handled as normal in
``trio``/``tractor`` task operation with the caveat of the overhead due
to guest-mode use.
For more details see the `original PR
<https://github.com/goodboy/tractor/pull/121>`_ and `issue
<https://github.com/goodboy/tractor/issues/120>`_.

View File

@ -523,7 +523,7 @@ def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon(
cancellation, and it's faster, we might as well do it.
'''
kbi_delay = 0.2
kbi_delay = 0.5
async def main():
start = time.time()

View File

@ -1,6 +1,7 @@
"""
'''
Let's make sure them docs work yah?
"""
'''
from contextlib import contextmanager
import itertools
import os

View File

@ -0,0 +1,431 @@
'''
The hipster way to force SC onto the stdlib's "async": 'infection mode'.
'''
from typing import Optional, Iterable
import asyncio
import builtins
import importlib
import pytest
import trio
import tractor
from tractor import to_asyncio
from tractor import RemoteActorError
async def sleep_and_err():
await asyncio.sleep(0.1)
assert 0
async def sleep_forever():
await asyncio.sleep(float('inf'))
async def trio_cancels_single_aio_task():
# spawn an ``asyncio`` task to run a func and return result
with trio.move_on_after(.2):
await tractor.to_asyncio.run_task(sleep_forever)
def test_trio_cancels_aio_on_actor_side(arb_addr):
'''
Spawn an infected actor that is cancelled by the ``trio`` side
task using std cancel scope apis.
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr
) as n:
await n.run_in_actor(
trio_cancels_single_aio_task,
infect_asyncio=True,
)
trio.run(main)
async def asyncio_actor(
target: str,
expect_err: Optional[Exception] = None
) -> None:
assert tractor.current_actor().is_infected_aio()
target = globals()[target]
if '.' in expect_err:
modpath, _, name = expect_err.rpartition('.')
mod = importlib.import_module(modpath)
error_type = getattr(mod, name)
else: # toplevel builtin error type
error_type = builtins.__dict__.get(expect_err)
try:
# spawn an ``asyncio`` task to run a func and return result
await tractor.to_asyncio.run_task(target)
except BaseException as err:
if expect_err:
assert isinstance(err, error_type)
raise
def test_aio_simple_error(arb_addr):
'''
Verify a simple remote asyncio error propagates back through trio
to the parent actor.
'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr
) as n:
await n.run_in_actor(
asyncio_actor,
target='sleep_and_err',
expect_err='AssertionError',
infect_asyncio=True,
)
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
err = excinfo.value
assert isinstance(err, RemoteActorError)
assert err.type == AssertionError
def test_tractor_cancels_aio(arb_addr):
'''
Verify we can cancel a spawned asyncio task gracefully.
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
asyncio_actor,
target='sleep_forever',
expect_err='trio.Cancelled',
infect_asyncio=True,
)
# cancel the entire remote runtime
await portal.cancel_actor()
trio.run(main)
def test_trio_cancels_aio(arb_addr):
'''
Much like the above test with ``tractor.Portal.cancel_actor()``
except we just use a standard ``trio`` cancellation api.
'''
async def main():
with trio.move_on_after(1):
# cancel the nursery shortly after boot
async with tractor.open_nursery() as n:
await n.run_in_actor(
asyncio_actor,
target='sleep_forever',
expect_err='trio.Cancelled',
infect_asyncio=True,
)
trio.run(main)
async def aio_cancel():
''''
Cancel urself boi.
'''
await asyncio.sleep(0.5)
task = asyncio.current_task()
# cancel and enter sleep
task.cancel()
await sleep_forever()
def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
async def main():
async with tractor.open_nursery() as n:
await n.run_in_actor(
asyncio_actor,
target='aio_cancel',
expect_err='tractor.to_asyncio.AsyncioCancelled',
infect_asyncio=True,
)
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == to_asyncio.AsyncioCancelled
# TODO: verify open_channel_from will fail on this..
async def no_to_trio_in_args():
pass
async def push_from_aio_task(
sequence: Iterable,
to_trio: trio.abc.SendChannel,
expect_cancel: False,
fail_early: bool,
) -> None:
try:
# sync caller ctx manager
to_trio.send_nowait(True)
for i in sequence:
print(f'asyncio sending {i}')
to_trio.send_nowait(i)
await asyncio.sleep(0.001)
if i == 50 and fail_early:
raise Exception
print('asyncio streamer complete!')
except asyncio.CancelledError:
if not expect_cancel:
pytest.fail("aio task was cancelled unexpectedly")
raise
else:
if expect_cancel:
pytest.fail("aio task wasn't cancelled as expected!?")
async def stream_from_aio(
exit_early: bool = False,
raise_err: bool = False,
aio_raise_err: bool = False,
) -> None:
seq = range(100)
expect = list(seq)
try:
pulled = []
async with to_asyncio.open_channel_from(
push_from_aio_task,
sequence=seq,
expect_cancel=raise_err or exit_early,
fail_early=aio_raise_err,
) as (first, chan):
assert first is True
async for value in chan:
print(f'trio received {value}')
pulled.append(value)
if value == 50:
if raise_err:
raise Exception
elif exit_early:
break
finally:
if (
not raise_err and
not exit_early and
not aio_raise_err
):
assert pulled == expect
else:
assert pulled == expect[:51]
print('trio guest mode task completed!')
def test_basic_interloop_channel_stream(arb_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
infect_asyncio=True,
)
await portal.result()
trio.run(main)
# TODO: parametrize the above test and avoid the duplication here?
def test_trio_error_cancels_intertask_chan(arb_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
raise_err=True,
infect_asyncio=True,
)
# should trigger remote actor error
await portal.result()
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
exit_early=True,
infect_asyncio=True,
)
# should trigger remote actor error
await portal.result()
# should be a quiet exit on a simple channel exit
trio.run(main)
def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
aio_raise_err=True,
infect_asyncio=True,
)
# should trigger remote actor error
await portal.result()
with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)
# ensure boxed error is correct
assert excinfo.value.type == Exception
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
) -> None:
to_trio.send_nowait('start')
while True:
msg = await from_trio.get()
# echo the msg back
to_trio.send_nowait(msg)
# if we get the terminate sentinel
# break the echo loop
if msg is None:
print('breaking aio echo loop')
break
async with to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):
assert first == 'start'
await ctx.started(first)
async with ctx.open_stream() as stream:
async for msg in stream:
print(f'asyncio echoing {msg}')
await chan.send(msg)
out = await chan.receive()
# echo back to parent actor-task
await stream.send(out)
if out is None:
try:
out = await chan.receive()
except trio.EndOfChannel:
break
else:
raise RuntimeError('aio channel never stopped?')
@pytest.mark.parametrize(
'raise_error_mid_stream',
[False, Exception, KeyboardInterrupt],
ids='raise_error={}'.format,
)
def test_echoserver_detailed_mechanics(
arb_addr,
raise_error_mid_stream,
):
async def main():
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
trio_to_aio_echo_server,
) as (ctx, first):
assert first == 'start'
async with ctx.open_stream() as stream:
for i in range(100):
await stream.send(i)
out = await stream.receive()
assert i == out
if raise_error_mid_stream and i == 50:
raise raise_error_mid_stream
# send terminate msg
await stream.send(None)
out = await stream.receive()
assert out is None
if out is None:
# ensure the stream is stopped
# with trio.fail_after(0.1):
try:
await stream.receive()
except trio.EndOfChannel:
pass
else:
pytest.fail(
"stream wasn't stopped after sentinel?!")
# TODO: the case where this blocks and
# is cancelled by kbi or out of task cancellation
await p.cancel_actor()
if raise_error_mid_stream:
with pytest.raises(raise_error_mid_stream):
trio.run(main)
else:
trio.run(main)

View File

@ -361,6 +361,9 @@ class Actor:
# syncs for setup/teardown sequences
_server_down: Optional[trio.Event] = None
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
def __init__(
self,
name: str,
@ -472,6 +475,7 @@ class Actor:
self._mods[modpath] = mod
if modpath == '__main__':
self._mods['__mp_main__'] = mod
except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error
# will be raised later
@ -1459,6 +1463,9 @@ class Actor:
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid
def is_infected_aio(self) -> bool:
return self._infected_aio
class Arbiter(Actor):
'''

View File

@ -37,12 +37,15 @@ def parse_ipaddr(arg):
return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid)
parser.add_argument("--loglevel", type=str)
parser.add_argument("--parent_addr", type=parse_ipaddr)
parser.add_argument("--asyncio", action='store_true')
args = parser.parse_args()
subactor = Actor(
@ -54,5 +57,6 @@ if __name__ == "__main__":
_trio_main(
subactor,
parent_addr=args.parent_addr
)
parent_addr=args.parent_addr,
infect_asyncio=args.asyncio,
)

View File

@ -26,20 +26,26 @@ import trio # type: ignore
from .log import get_console_log, get_logger
from . import _state
from .to_asyncio import run_as_asyncio_guest
log = get_logger(__name__)
def _mp_main(
actor: 'Actor', # type: ignore
accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
"""The routine called *after fork* which invokes a fresh ``trio.run``
"""
'''
The routine called *after fork* which invokes a fresh ``trio.run``
'''
actor._forkserver_info = forkserver_info
from ._spawn import try_set_start_method
spawn_ctx = try_set_start_method(start_method)
@ -62,7 +68,11 @@ def _mp_main(
parent_addr=parent_addr
)
try:
trio.run(trio_main)
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
pass # handle it the same way trio does?
@ -71,16 +81,17 @@ def _mp_main(
def _trio_main(
actor: 'Actor', # type: ignore
*,
parent_addr: Tuple[str, int] = None,
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
"""
# Disable sigint handling in children;
# we don't need it thanks to our cancellation machinery.
# signal.signal(signal.SIGINT, signal.SIG_IGN)
infect_asyncio: bool = False,
) -> None:
'''
Entry point for a `trio_run_in_process` subactor.
'''
log.info(f"Started new trio process for {actor.uid}")
if actor.loglevel is not None:
@ -100,7 +111,11 @@ def _trio_main(
)
try:
trio.run(trio_main)
if infect_asyncio:
actor._infected_aio = True
run_as_asyncio_guest(trio_main)
else:
trio.run(trio_main)
except KeyboardInterrupt:
log.warning(f"Actor {actor.uid} received KBI")

View File

@ -82,6 +82,15 @@ class StreamOverrun(trio.TooSlowError):
"This stream was overrun by sender"
class AsyncioCancelled(Exception):
'''
Asyncio cancelled translation (non-base) error
for use with the ``to_asyncio`` module
to be raised in the ``trio`` side task
'''
def pack_error(
exc: BaseException,
tb=None,

View File

@ -103,7 +103,6 @@ async def open_root_actor(
_default_arbiter_port,
)
if loglevel is None:
loglevel = log.get_loglevel()
else:

View File

@ -22,10 +22,10 @@ import sys
import multiprocessing as mp
import platform
from typing import (
Any, Dict, Optional, Union, Callable,
Any, Dict, Optional, Callable,
TypeVar,
)
from collections.abc import Awaitable, Coroutine
from collections.abc import Awaitable
import trio
from trio_typing import TaskStatus
@ -244,6 +244,8 @@ async def new_proc(
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
@ -260,7 +262,6 @@ async def new_proc(
uid = subactor.uid
if _spawn_method == 'trio':
spawn_cmd = [
sys.executable,
"-m",
@ -283,6 +284,9 @@ async def new_proc(
"--loglevel",
subactor.loglevel
]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: Optional[trio.Process] = None
@ -412,6 +416,7 @@ async def new_proc(
bind_addr=bind_addr,
parent_addr=parent_addr,
_runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status,
)
@ -427,6 +432,7 @@ async def mp_new_proc(
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
@ -472,6 +478,7 @@ async def mp_new_proc(
fs_info,
start_method,
parent_addr,
infect_asyncio,
),
# daemon=True,
name=name,

View File

@ -45,8 +45,33 @@ _default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)
class ActorNursery:
"""Spawn scoped subprocess actors.
"""
'''
The fundamental actor supervision construct: spawn and manage
explicit lifetime and capability restricted, bootstrapped,
``trio.run()`` scheduled sub-processes.
Though the concept of a "process nursery" is different in complexity
and slightly different in semantics then a tradtional single
threaded task nursery, much of the interface is the same. New
processes each require a top level "parent" or "root" task which is
itself no different then any task started by a tradtional
``trio.Nursery``. The main difference is that each "actor" (a
process + ``trio.run()``) contains a full, paralell executing
``trio``-task-tree. The following super powers ensue:
- starting tasks in a child actor are completely independent of
tasks started in the current process. They execute in *parallel*
relative to tasks in the current process and are scheduled by their
own actor's ``trio`` run loop.
- tasks scheduled in a remote process still maintain an SC protocol
across memory boundaries using a so called "structured concurrency
dialogue protocol" which ensures task-hierarchy-lifetimes are linked.
- remote tasks (in another actor) can fail and relay failure back to
the caller task (in some other actor) via a seralized
``RemoteActorError`` which means no zombie process or RPC
initiated task can ever go off on its own.
'''
def __init__(
self,
actor: Actor,
@ -81,6 +106,7 @@ class ActorNursery:
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
debug_mode: Optional[bool] = None,
infect_asyncio: bool = False,
) -> Portal:
'''
Start a (daemon) actor: an process that has no designated
@ -134,19 +160,25 @@ class ActorNursery:
bind_addr,
parent_addr,
_rtv, # run time vars
infect_asyncio=infect_asyncio,
)
)
async def run_in_actor(
self,
fn: typing.Callable,
*,
name: Optional[str] = None,
bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[List[str]] = None,
enable_modules: List[str] = None,
loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn``
) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and
return its result.
@ -170,6 +202,7 @@ class ActorNursery:
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,
infect_asyncio=infect_asyncio,
)
# XXX: don't allow stream funcs
@ -408,8 +441,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
@asynccontextmanager
async def open_nursery(
**kwargs,
) -> typing.AsyncGenerator[ActorNursery, None]:
"""Create and yield a new ``ActorNursery`` to be used for spawning
'''
Create and yield a new ``ActorNursery`` to be used for spawning
structured concurrent subactors.
When an actor is spawned a new trio task is started which
@ -421,7 +456,8 @@ async def open_nursery(
close it. It turns out this approach is probably more correct
anyway since it is more clear from the following nested nurseries
which cancellation scopes correspond to each spawned subactor set.
"""
'''
implicit_runtime = False
actor = current_actor(err_on_no_runtime=False)

View File

@ -0,0 +1,405 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Infection apis for ``asyncio`` loops running ``trio`` using guest mode.
'''
import asyncio
from asyncio.exceptions import CancelledError
from contextlib import asynccontextmanager as acm
from dataclasses import dataclass
import inspect
from typing import (
Any,
Callable,
AsyncIterator,
Awaitable,
Optional,
)
import trio
from .log import get_logger
from ._state import current_actor
from ._exceptions import AsyncioCancelled
log = get_logger(__name__)
__all__ = ['run_task', 'run_as_asyncio_guest']
@dataclass
class LinkedTaskChannel(trio.abc.Channel):
'''
A "linked task channel" which allows for two-way synchronized msg
passing between a ``trio``-in-guest-mode task and an ``asyncio``
task scheduled in the host loop.
'''
_to_aio: asyncio.Queue
_from_aio: trio.MemoryReceiveChannel
_to_trio: trio.MemorySendChannel
_trio_cs: trio.CancelScope
_aio_task_complete: trio.Event
# set after ``asyncio.create_task()``
_aio_task: Optional[asyncio.Task] = None
_aio_err: Optional[BaseException] = None
async def aclose(self) -> None:
await self._from_aio.aclose()
async def receive(self) -> Any:
async with translate_aio_errors(self):
return await self._from_aio.receive()
async def wait_ayncio_complete(self) -> None:
await self._aio_task_complete.wait()
# def cancel_asyncio_task(self) -> None:
# self._aio_task.cancel()
async def send(self, item: Any) -> None:
'''
Send a value through to the asyncio task presuming
it defines a ``from_trio`` argument, if it does not
this method will raise an error.
'''
self._to_aio.put_nowait(item)
def _run_asyncio_task(
func: Callable,
*,
qsize: int = 1,
provide_channels: bool = False,
**kwargs,
) -> LinkedTaskChannel:
'''
Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
'''
if not current_actor().is_infected_aio():
raise RuntimeError("`infect_asyncio` mode is not enabled!?")
# ITC (inter task comms), these channel/queue names are mostly from
# ``asyncio``'s perspective.
aio_q = from_trio = asyncio.Queue(qsize) # type: ignore
to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore
args = tuple(inspect.getfullargspec(func).args)
if getattr(func, '_tractor_steam_function', None):
# the assumption is that the target async routine accepts the
# send channel then it intends to yield more then one return
# value otherwise it would just return ;P
assert qsize > 1
if provide_channels:
assert 'to_trio' in args
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
coro = func(**kwargs)
cancel_scope = trio.CancelScope()
aio_task_complete = trio.Event()
aio_err: Optional[BaseException] = None
chan = LinkedTaskChannel(
aio_q, # asyncio.Queue
from_aio, # recv chan
to_trio, # send chan
cancel_scope,
aio_task_complete,
)
async def wait_on_coro_final_result(
to_trio: trio.MemorySendChannel,
coro: Awaitable,
aio_task_complete: trio.Event,
) -> None:
'''
Await ``coro`` and relay result back to ``trio``.
'''
nonlocal aio_err
nonlocal chan
orig = result = id(coro)
try:
result = await coro
except BaseException as aio_err:
chan._aio_err = aio_err
raise
else:
if (
result != orig and
aio_err is None and
# in the ``open_channel_from()`` case we don't
# relay through the "return value".
not provide_channels
):
to_trio.send_nowait(result)
finally:
# if the task was spawned using ``open_channel_from()``
# then we close the channels on exit.
if provide_channels:
# only close the sender side which will relay
# a ``trio.EndOfChannel`` to the trio (consumer) side.
to_trio.close()
aio_task_complete.set()
# start the asyncio task we submitted from trio
if not inspect.isawaitable(coro):
raise TypeError(f"No support for invoking {coro}")
task = asyncio.create_task(
wait_on_coro_final_result(
to_trio,
coro,
aio_task_complete
)
)
chan._aio_task = task
def cancel_trio(task: asyncio.Task) -> None:
'''
Cancel the calling ``trio`` task on error.
'''
nonlocal chan
aio_err = chan._aio_err
# only to avoid ``asyncio`` complaining about uncaptured
# task exceptions
try:
task.exception()
except BaseException as terr:
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
if aio_err is not None:
if type(aio_err) is CancelledError:
log.cancel("infected task was cancelled")
else:
aio_err.with_traceback(aio_err.__traceback__)
log.exception("infected task errorred:")
# NOTE: currently mem chan closure may act as a form
# of error relay (at least in the ``asyncio.CancelledError``
# case) since we have no way to directly trigger a ``trio``
# task error without creating a nursery to throw one.
# We might want to change this in the future though.
from_aio.close()
task.add_done_callback(cancel_trio)
return chan
@acm
async def translate_aio_errors(
chan: LinkedTaskChannel,
) -> AsyncIterator[None]:
'''
Error handling context around ``asyncio`` task spawns which
appropriately translates errors and cancels into ``trio`` land.
'''
aio_err: Optional[BaseException] = None
def maybe_raise_aio_err(
err: Optional[Exception] = None
) -> None:
aio_err = chan._aio_err
if (
aio_err is not None and
type(aio_err) != CancelledError
):
# always raise from any captured asyncio error
if err:
raise aio_err from err
else:
raise aio_err
task = chan._aio_task
assert task
try:
yield
except (
# NOTE: see the note in the ``cancel_trio()`` asyncio task
# termination callback
trio.ClosedResourceError,
):
aio_err = chan._aio_err
if (
task.cancelled() and
type(aio_err) is CancelledError
):
# if an underlying ``asyncio.CancelledError`` triggered this
# channel close, raise our (non-``BaseException``) wrapper
# error: ``AsyncioCancelled`` from that source error.
raise AsyncioCancelled from aio_err
else:
raise
finally:
# always cancel the ``asyncio`` task if we've made it this far
# and it's not done.
if not task.done() and aio_err:
# assert not aio_err, 'WTF how did asyncio do this?!'
task.cancel()
# if any ``asyncio`` error was caught, raise it here inline
# here in the ``trio`` task
maybe_raise_aio_err()
async def run_task(
func: Callable,
*,
qsize: int = 2**10,
**kwargs,
) -> Any:
'''
Run an ``asyncio`` async function or generator in a task, return
or stream the result back to ``trio``.
'''
# simple async func
chan = _run_asyncio_task(
func,
qsize=1,
**kwargs,
)
with chan._from_aio:
# try:
async with translate_aio_errors(chan):
# return single value that is the output from the
# ``asyncio`` function-as-task. Expect the mem chan api to
# do the job of handling cross-framework cancellations
# / errors via closure and translation in the
# ``translate_aio_errors()`` in the above ctx mngr.
return await chan.receive()
@acm
async def open_channel_from(
target: Callable[..., Any],
**kwargs,
) -> AsyncIterator[Any]:
'''
Open an inter-loop linked task channel for streaming between a target
spawned ``asyncio`` task and ``trio``.
'''
chan = _run_asyncio_task(
target,
qsize=2**8,
provide_channels=True,
**kwargs,
)
async with chan._from_aio:
async with translate_aio_errors(chan):
# sync to a "started()"-like first delivered value from the
# ``asyncio`` task.
first = await chan.receive()
# stream values upward
yield first, chan
def run_as_asyncio_guest(
trio_main: Callable,
) -> None:
'''
Entry for an "infected ``asyncio`` actor".
Entrypoint for a Python process which starts the ``asyncio`` event
loop and runs ``trio`` in guest mode resulting in a system where
``trio`` tasks can control ``asyncio`` tasks whilst maintaining
SC semantics.
'''
# Uh, oh. :o
# It looks like your event loop has caught a case of the ``trio``s.
# :()
# Don't worry, we've heard you'll barely notice. You might hallucinate
# a few more propagating errors and feel like your digestion has
# slowed but if anything get's too bad your parents will know about
# it.
# :)
async def aio_main(trio_main):
loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
print(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
# start the infection: run trio on the asyncio loop in "guest mode"
log.info(f"Infecting asyncio process with {trio_main}")
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
return (await trio_done_fut).unwrap()
# might as well if it's installed.
try:
import uvloop
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
except ImportError:
pass
return asyncio.run(aio_main(trio_main))

View File

@ -47,8 +47,9 @@ class AsyncReceiver(
Protocol,
Generic[ReceiveType],
):
'''An async receivable duck-type that quacks much like trio's
``trio.abc.ReceieveChannel``.
'''
An async receivable duck-type that quacks much like trio's
``trio.abc.ReceiveChannel``.
'''
@abstractmethod
@ -78,7 +79,8 @@ class AsyncReceiver(
class Lagged(trio.TooSlowError):
'''Subscribed consumer task was too slow and was overrun
'''
Subscribed consumer task was too slow and was overrun
by the fastest consumer-producer pair.
'''
@ -86,7 +88,8 @@ class Lagged(trio.TooSlowError):
@dataclass
class BroadcastState:
'''Common state to all receivers of a broadcast.
'''
Common state to all receivers of a broadcast.
'''
queue: deque
@ -111,7 +114,8 @@ class BroadcastState:
class BroadcastReceiver(ReceiveChannel):
'''A memory receive channel broadcaster which is non-lossy for the
'''
A memory receive channel broadcaster which is non-lossy for the
fastest consumer.
Additional consumer tasks can receive all produced values by registering