diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d1fe811..b03ba9f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,7 @@ jobs: testing-linux: name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}' - timeout-minutes: 9 + timeout-minutes: 10 runs-on: ${{ matrix.os }} strategy: diff --git a/docs/README.rst b/docs/README.rst index 842cbcd..180cd81 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -27,7 +27,9 @@ Features - A modular transport stack, allowing for custom serialization (eg. `msgspec`_), communications protocols, and environment specific IPC primitives -- `structured concurrency`_ from the ground up +- Support for spawning process-level-SC, inter-loop one-to-one-task oriented + ``asyncio`` actors via "infected ``asyncio``" mode +- `structured chadcurrency`_ from the ground up Run a func in a process @@ -313,6 +315,117 @@ real time:: This uses no extra threads, fancy semaphores or futures; all we need is ``tractor``'s IPC! +"Infected ``asyncio``" mode +--------------------------- +Have a bunch of ``asyncio`` code you want to force to be SC at the process level? + +Check out our experimental system for `guest-mode`_ controlled +``asyncio`` actors: + +.. code:: python + + import asyncio + from statistics import mean + import time + + import trio + import tractor + + + async def aio_echo_server( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, + ) -> None: + + # a first message must be sent **from** this ``asyncio`` + # task or the ``trio`` side will never unblock from + # ``tractor.to_asyncio.open_channel_from():`` + to_trio.send_nowait('start') + + # XXX: this uses an ``from_trio: asyncio.Queue`` currently but we + # should probably offer something better. + while True: + # echo the msg back + to_trio.send_nowait(await from_trio.get()) + await asyncio.sleep(0) + + + @tractor.context + async def trio_to_aio_echo_server( + ctx: tractor.Context, + ): + # this will block until the ``asyncio`` task sends a "first" + # message. + async with tractor.to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan): + + assert first == 'start' + await ctx.started(first) + + async with ctx.open_stream() as stream: + + async for msg in stream: + await chan.send(msg) + + out = await chan.receive() + # echo back to parent actor-task + await stream.send(out) + + + async def main(): + + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'aio_server', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + trio_to_aio_echo_server, + ) as (ctx, first): + + assert first == 'start' + + count = 0 + async with ctx.open_stream() as stream: + + delays = [] + send = time.time() + + await stream.send(count) + async for msg in stream: + recv = time.time() + delays.append(recv - send) + assert msg == count + count += 1 + send = time.time() + await stream.send(count) + + if count >= 1e3: + break + + print(f'mean round trip rate (Hz): {1/mean(delays)}') + await p.cancel_actor() + + + if __name__ == '__main__': + trio.run(main) + + +Yes, we spawn a python process, run ``asyncio``, start ``trio`` on the +``asyncio`` loop, then send commands to the ``trio`` scheduled tasks to +tell ``asyncio`` tasks what to do XD + +We need help refining the `asyncio`-side channel API to be more +`trio`-like. Feel free to sling your opinion in `#273`_! + + +.. _#273: https://github.com/goodboy/tractor/issues/273 + + +Higher level "cluster" APIs +--------------------------- To be extra terse the ``tractor`` devs have started hacking some "higher level" APIs for managing actor trees/clusters. These interfaces should generally be condsidered provisional for now but we encourage you to try @@ -476,6 +589,7 @@ channel`_! .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s +.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _trio gitter channel: https://gitter.im/python-trio/general .. _matrix channel: https://matrix.to/#/!tractor:matrix.org @@ -484,11 +598,14 @@ channel`_! .. _messages: https://en.wikipedia.org/wiki/Message_passing .. _trio docs: https://trio.readthedocs.io/en/latest/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ -.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ +.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency +.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency +.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _trio-parallel: https://github.com/richardsheridan/trio-parallel .. _msgspec: https://jcristharif.com/msgspec/ +.. _guest-mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops .. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square diff --git a/examples/infected_asyncio_echo_server.py b/examples/infected_asyncio_echo_server.py new file mode 100644 index 0000000..ee7c45b --- /dev/null +++ b/examples/infected_asyncio_echo_server.py @@ -0,0 +1,91 @@ +''' +An SC compliant infected ``asyncio`` echo server. + +''' +import asyncio +from statistics import mean +import time + +import trio +import tractor + + +async def aio_echo_server( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, +) -> None: + + # a first message must be sent **from** this ``asyncio`` + # task or the ``trio`` side will never unblock from + # ``tractor.to_asyncio.open_channel_from():`` + to_trio.send_nowait('start') + + # XXX: this uses an ``from_trio: asyncio.Queue`` currently but we + # should probably offer something better. + while True: + # echo the msg back + to_trio.send_nowait(await from_trio.get()) + await asyncio.sleep(0) + + +@tractor.context +async def trio_to_aio_echo_server( + ctx: tractor.Context, +): + # this will block until the ``asyncio`` task sends a "first" + # message. + async with tractor.to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan): + + assert first == 'start' + await ctx.started(first) + + async with ctx.open_stream() as stream: + + async for msg in stream: + await chan.send(msg) + + out = await chan.receive() + # echo back to parent actor-task + await stream.send(out) + + +async def main(): + + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'aio_server', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + trio_to_aio_echo_server, + ) as (ctx, first): + + assert first == 'start' + + count = 0 + async with ctx.open_stream() as stream: + + delays = [] + send = time.time() + + await stream.send(count) + async for msg in stream: + recv = time.time() + delays.append(recv - send) + assert msg == count + count += 1 + send = time.time() + await stream.send(count) + + if count >= 1e3: + break + + print(f'mean round trip rate (Hz): {1/mean(delays)}') + await p.cancel_actor() + + +if __name__ == '__main__': + trio.run(main) diff --git a/newsfragments/121.feature.rst b/newsfragments/121.feature.rst new file mode 100644 index 0000000..2ba8fc0 --- /dev/null +++ b/newsfragments/121.feature.rst @@ -0,0 +1,28 @@ +Add "infected ``asyncio`` mode; a sub-system to spawn and control +``asyncio`` actors using ``trio``'s guest-mode. + +This gets us the following very interesting functionality: + +- ability to spawn an actor that has a process entry point of + ``asyncio.run()`` by passing ``infect_asyncio=True`` to + ``Portal.start_actor()`` (and friends). +- the ``asyncio`` actor embeds ``trio`` using guest-mode and starts + a main ``trio`` task which runs the ``tractor.Actor._async_main()`` + entry point engages all the normal ``tractor`` runtime IPC/messaging + machinery; for all purposes the actor is now running normally on + a ``trio.run()``. +- the actor can now make one-to-one task spawning requests to the + underlying ``asyncio`` event loop using either of: + * ``to_asyncio.run_task()`` to spawn and run an ``asyncio`` task to + completion and block until a return value is delivered. + * ``async with to_asyncio.open_channel_from():`` which spawns a task + and hands it a pair of "memory channels" to allow for bi-directional + streaming between the now SC-linked ``trio`` and ``asyncio`` tasks. + +The output from any call(s) to ``asyncio`` can be handled as normal in +``trio``/``tractor`` task operation with the caveat of the overhead due +to guest-mode use. + +For more details see the `original PR +`_ and `issue +`_. diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index c346806..4144859 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -523,7 +523,7 @@ def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon( cancellation, and it's faster, we might as well do it. ''' - kbi_delay = 0.2 + kbi_delay = 0.5 async def main(): start = time.time() diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 5f47419..af17ff1 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -1,6 +1,7 @@ -""" +''' Let's make sure them docs work yah? -""" + +''' from contextlib import contextmanager import itertools import os diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py new file mode 100644 index 0000000..9881be9 --- /dev/null +++ b/tests/test_infected_asyncio.py @@ -0,0 +1,431 @@ +''' +The hipster way to force SC onto the stdlib's "async": 'infection mode'. + +''' +from typing import Optional, Iterable +import asyncio +import builtins +import importlib + +import pytest +import trio +import tractor +from tractor import to_asyncio +from tractor import RemoteActorError + + +async def sleep_and_err(): + await asyncio.sleep(0.1) + assert 0 + + +async def sleep_forever(): + await asyncio.sleep(float('inf')) + + +async def trio_cancels_single_aio_task(): + + # spawn an ``asyncio`` task to run a func and return result + with trio.move_on_after(.2): + await tractor.to_asyncio.run_task(sleep_forever) + + +def test_trio_cancels_aio_on_actor_side(arb_addr): + ''' + Spawn an infected actor that is cancelled by the ``trio`` side + task using std cancel scope apis. + + ''' + async def main(): + async with tractor.open_nursery( + arbiter_addr=arb_addr + ) as n: + await n.run_in_actor( + trio_cancels_single_aio_task, + infect_asyncio=True, + ) + + trio.run(main) + + +async def asyncio_actor( + + target: str, + expect_err: Optional[Exception] = None + +) -> None: + + assert tractor.current_actor().is_infected_aio() + target = globals()[target] + + if '.' in expect_err: + modpath, _, name = expect_err.rpartition('.') + mod = importlib.import_module(modpath) + error_type = getattr(mod, name) + + else: # toplevel builtin error type + error_type = builtins.__dict__.get(expect_err) + + try: + # spawn an ``asyncio`` task to run a func and return result + await tractor.to_asyncio.run_task(target) + + except BaseException as err: + if expect_err: + assert isinstance(err, error_type) + + raise + + +def test_aio_simple_error(arb_addr): + ''' + Verify a simple remote asyncio error propagates back through trio + to the parent actor. + + + ''' + async def main(): + async with tractor.open_nursery( + arbiter_addr=arb_addr + ) as n: + await n.run_in_actor( + asyncio_actor, + target='sleep_and_err', + expect_err='AssertionError', + infect_asyncio=True, + ) + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + err = excinfo.value + assert isinstance(err, RemoteActorError) + assert err.type == AssertionError + + +def test_tractor_cancels_aio(arb_addr): + ''' + Verify we can cancel a spawned asyncio task gracefully. + + ''' + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + asyncio_actor, + target='sleep_forever', + expect_err='trio.Cancelled', + infect_asyncio=True, + ) + # cancel the entire remote runtime + await portal.cancel_actor() + + trio.run(main) + + +def test_trio_cancels_aio(arb_addr): + ''' + Much like the above test with ``tractor.Portal.cancel_actor()`` + except we just use a standard ``trio`` cancellation api. + + ''' + async def main(): + + with trio.move_on_after(1): + # cancel the nursery shortly after boot + + async with tractor.open_nursery() as n: + await n.run_in_actor( + asyncio_actor, + target='sleep_forever', + expect_err='trio.Cancelled', + infect_asyncio=True, + ) + + trio.run(main) + + +async def aio_cancel(): + '''' + Cancel urself boi. + + ''' + await asyncio.sleep(0.5) + task = asyncio.current_task() + + # cancel and enter sleep + task.cancel() + await sleep_forever() + + +def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): + + async def main(): + async with tractor.open_nursery() as n: + await n.run_in_actor( + asyncio_actor, + target='aio_cancel', + expect_err='tractor.to_asyncio.AsyncioCancelled', + infect_asyncio=True, + ) + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure boxed error is correct + assert excinfo.value.type == to_asyncio.AsyncioCancelled + + +# TODO: verify open_channel_from will fail on this.. +async def no_to_trio_in_args(): + pass + + +async def push_from_aio_task( + + sequence: Iterable, + to_trio: trio.abc.SendChannel, + expect_cancel: False, + fail_early: bool, + +) -> None: + + try: + # sync caller ctx manager + to_trio.send_nowait(True) + + for i in sequence: + print(f'asyncio sending {i}') + to_trio.send_nowait(i) + await asyncio.sleep(0.001) + + if i == 50 and fail_early: + raise Exception + + print('asyncio streamer complete!') + + except asyncio.CancelledError: + if not expect_cancel: + pytest.fail("aio task was cancelled unexpectedly") + raise + else: + if expect_cancel: + pytest.fail("aio task wasn't cancelled as expected!?") + + +async def stream_from_aio( + + exit_early: bool = False, + raise_err: bool = False, + aio_raise_err: bool = False, + +) -> None: + seq = range(100) + expect = list(seq) + + try: + pulled = [] + + async with to_asyncio.open_channel_from( + push_from_aio_task, + sequence=seq, + expect_cancel=raise_err or exit_early, + fail_early=aio_raise_err, + ) as (first, chan): + + assert first is True + + async for value in chan: + print(f'trio received {value}') + pulled.append(value) + + if value == 50: + if raise_err: + raise Exception + elif exit_early: + break + finally: + + if ( + not raise_err and + not exit_early and + not aio_raise_err + ): + assert pulled == expect + else: + assert pulled == expect[:51] + + print('trio guest mode task completed!') + + +def test_basic_interloop_channel_stream(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + infect_asyncio=True, + ) + await portal.result() + + trio.run(main) + + +# TODO: parametrize the above test and avoid the duplication here? +def test_trio_error_cancels_intertask_chan(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + raise_err=True, + infect_asyncio=True, + ) + # should trigger remote actor error + await portal.result() + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure boxed error is correct + assert excinfo.value.type == Exception + + +def test_trio_closes_early_and_channel_exits(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + exit_early=True, + infect_asyncio=True, + ) + # should trigger remote actor error + await portal.result() + + # should be a quiet exit on a simple channel exit + trio.run(main) + + +def test_aio_errors_and_channel_propagates_and_closes(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + aio_raise_err=True, + infect_asyncio=True, + ) + # should trigger remote actor error + await portal.result() + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure boxed error is correct + assert excinfo.value.type == Exception + + +@tractor.context +async def trio_to_aio_echo_server( + ctx: tractor.Context, +): + + async def aio_echo_server( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, + ) -> None: + + to_trio.send_nowait('start') + + while True: + msg = await from_trio.get() + + # echo the msg back + to_trio.send_nowait(msg) + + # if we get the terminate sentinel + # break the echo loop + if msg is None: + print('breaking aio echo loop') + break + + async with to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan): + + assert first == 'start' + await ctx.started(first) + + async with ctx.open_stream() as stream: + + async for msg in stream: + print(f'asyncio echoing {msg}') + await chan.send(msg) + + out = await chan.receive() + # echo back to parent actor-task + await stream.send(out) + + if out is None: + try: + out = await chan.receive() + except trio.EndOfChannel: + break + else: + raise RuntimeError('aio channel never stopped?') + + +@pytest.mark.parametrize( + 'raise_error_mid_stream', + [False, Exception, KeyboardInterrupt], + ids='raise_error={}'.format, +) +def test_echoserver_detailed_mechanics( + arb_addr, + raise_error_mid_stream, +): + + async def main(): + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'aio_server', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + trio_to_aio_echo_server, + ) as (ctx, first): + + assert first == 'start' + + async with ctx.open_stream() as stream: + for i in range(100): + await stream.send(i) + out = await stream.receive() + assert i == out + + if raise_error_mid_stream and i == 50: + raise raise_error_mid_stream + + # send terminate msg + await stream.send(None) + out = await stream.receive() + assert out is None + + if out is None: + # ensure the stream is stopped + # with trio.fail_after(0.1): + try: + await stream.receive() + except trio.EndOfChannel: + pass + else: + pytest.fail( + "stream wasn't stopped after sentinel?!") + + # TODO: the case where this blocks and + # is cancelled by kbi or out of task cancellation + await p.cancel_actor() + + if raise_error_mid_stream: + with pytest.raises(raise_error_mid_stream): + trio.run(main) + + else: + trio.run(main) diff --git a/tractor/_actor.py b/tractor/_actor.py index e737004..8e5d548 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -361,6 +361,9 @@ class Actor: # syncs for setup/teardown sequences _server_down: Optional[trio.Event] = None + # if started on ``asycio`` running ``trio`` in guest mode + _infected_aio: bool = False + def __init__( self, name: str, @@ -472,6 +475,7 @@ class Actor: self._mods[modpath] = mod if modpath == '__main__': self._mods['__mp_main__'] = mod + except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later @@ -1459,6 +1463,9 @@ class Actor: log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid + def is_infected_aio(self) -> bool: + return self._infected_aio + class Arbiter(Actor): ''' diff --git a/tractor/_child.py b/tractor/_child.py index f384ac4..7790731 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -37,12 +37,15 @@ def parse_ipaddr(arg): return (str(host), int(port)) +from ._entry import _trio_main + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--uid", type=parse_uid) parser.add_argument("--loglevel", type=str) parser.add_argument("--parent_addr", type=parse_ipaddr) + parser.add_argument("--asyncio", action='store_true') args = parser.parse_args() subactor = Actor( @@ -54,5 +57,6 @@ if __name__ == "__main__": _trio_main( subactor, - parent_addr=args.parent_addr - ) \ No newline at end of file + parent_addr=args.parent_addr, + infect_asyncio=args.asyncio, + ) diff --git a/tractor/_entry.py b/tractor/_entry.py index 5eda669..c860f2b 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -26,20 +26,26 @@ import trio # type: ignore from .log import get_console_log, get_logger from . import _state +from .to_asyncio import run_as_asyncio_guest log = get_logger(__name__) def _mp_main( + actor: 'Actor', # type: ignore accept_addr: Tuple[str, int], forkserver_info: Tuple[Any, Any, Any, Any, Any], start_method: str, parent_addr: Tuple[str, int] = None, + infect_asyncio: bool = False, + ) -> None: - """The routine called *after fork* which invokes a fresh ``trio.run`` - """ + ''' + The routine called *after fork* which invokes a fresh ``trio.run`` + + ''' actor._forkserver_info = forkserver_info from ._spawn import try_set_start_method spawn_ctx = try_set_start_method(start_method) @@ -62,7 +68,11 @@ def _mp_main( parent_addr=parent_addr ) try: - trio.run(trio_main) + if infect_asyncio: + actor._infected_aio = True + run_as_asyncio_guest(trio_main) + else: + trio.run(trio_main) except KeyboardInterrupt: pass # handle it the same way trio does? @@ -71,16 +81,17 @@ def _mp_main( def _trio_main( + actor: 'Actor', # type: ignore *, parent_addr: Tuple[str, int] = None, -) -> None: - """Entry point for a `trio_run_in_process` subactor. - """ - # Disable sigint handling in children; - # we don't need it thanks to our cancellation machinery. - # signal.signal(signal.SIGINT, signal.SIG_IGN) + infect_asyncio: bool = False, +) -> None: + ''' + Entry point for a `trio_run_in_process` subactor. + + ''' log.info(f"Started new trio process for {actor.uid}") if actor.loglevel is not None: @@ -100,7 +111,11 @@ def _trio_main( ) try: - trio.run(trio_main) + if infect_asyncio: + actor._infected_aio = True + run_as_asyncio_guest(trio_main) + else: + trio.run(trio_main) except KeyboardInterrupt: log.warning(f"Actor {actor.uid} received KBI") diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 95d7533..f3beb5a 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -82,6 +82,15 @@ class StreamOverrun(trio.TooSlowError): "This stream was overrun by sender" +class AsyncioCancelled(Exception): + ''' + Asyncio cancelled translation (non-base) error + for use with the ``to_asyncio`` module + to be raised in the ``trio`` side task + + ''' + + def pack_error( exc: BaseException, tb=None, diff --git a/tractor/_root.py b/tractor/_root.py index 3468a25..797e736 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -103,7 +103,6 @@ async def open_root_actor( _default_arbiter_port, ) - if loglevel is None: loglevel = log.get_loglevel() else: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index d16ddbd..ead91df 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -22,10 +22,10 @@ import sys import multiprocessing as mp import platform from typing import ( - Any, Dict, Optional, Union, Callable, + Any, Dict, Optional, Callable, TypeVar, ) -from collections.abc import Awaitable, Coroutine +from collections.abc import Awaitable import trio from trio_typing import TaskStatus @@ -244,6 +244,8 @@ async def new_proc( _runtime_vars: Dict[str, Any], # serialized and sent to _child *, + + infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: @@ -260,7 +262,6 @@ async def new_proc( uid = subactor.uid if _spawn_method == 'trio': - spawn_cmd = [ sys.executable, "-m", @@ -283,6 +284,9 @@ async def new_proc( "--loglevel", subactor.loglevel ] + # Tell child to run in guest mode on top of ``asyncio`` loop + if infect_asyncio: + spawn_cmd.append("--asyncio") cancelled_during_spawn: bool = False proc: Optional[trio.Process] = None @@ -412,6 +416,7 @@ async def new_proc( bind_addr=bind_addr, parent_addr=parent_addr, _runtime_vars=_runtime_vars, + infect_asyncio=infect_asyncio, task_status=task_status, ) @@ -427,6 +432,7 @@ async def mp_new_proc( parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child *, + infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: @@ -472,6 +478,7 @@ async def mp_new_proc( fs_info, start_method, parent_addr, + infect_asyncio, ), # daemon=True, name=name, diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 36e05b5..f2d907d 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -45,8 +45,33 @@ _default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) class ActorNursery: - """Spawn scoped subprocess actors. - """ + ''' + The fundamental actor supervision construct: spawn and manage + explicit lifetime and capability restricted, bootstrapped, + ``trio.run()`` scheduled sub-processes. + + Though the concept of a "process nursery" is different in complexity + and slightly different in semantics then a tradtional single + threaded task nursery, much of the interface is the same. New + processes each require a top level "parent" or "root" task which is + itself no different then any task started by a tradtional + ``trio.Nursery``. The main difference is that each "actor" (a + process + ``trio.run()``) contains a full, paralell executing + ``trio``-task-tree. The following super powers ensue: + + - starting tasks in a child actor are completely independent of + tasks started in the current process. They execute in *parallel* + relative to tasks in the current process and are scheduled by their + own actor's ``trio`` run loop. + - tasks scheduled in a remote process still maintain an SC protocol + across memory boundaries using a so called "structured concurrency + dialogue protocol" which ensures task-hierarchy-lifetimes are linked. + - remote tasks (in another actor) can fail and relay failure back to + the caller task (in some other actor) via a seralized + ``RemoteActorError`` which means no zombie process or RPC + initiated task can ever go off on its own. + + ''' def __init__( self, actor: Actor, @@ -81,6 +106,7 @@ class ActorNursery: loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, debug_mode: Optional[bool] = None, + infect_asyncio: bool = False, ) -> Portal: ''' Start a (daemon) actor: an process that has no designated @@ -134,19 +160,25 @@ class ActorNursery: bind_addr, parent_addr, _rtv, # run time vars + infect_asyncio=infect_asyncio, ) ) async def run_in_actor( self, + fn: typing.Callable, *, + name: Optional[str] = None, bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: Optional[List[str]] = None, enable_modules: List[str] = None, loglevel: str = None, # set log level per subactor + infect_asyncio: bool = False, + **kwargs, # explicit args to ``fn`` + ) -> Portal: """Spawn a new actor, run a lone task, then terminate the actor and return its result. @@ -170,6 +202,7 @@ class ActorNursery: loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery, + infect_asyncio=infect_asyncio, ) # XXX: don't allow stream funcs @@ -408,8 +441,10 @@ async def _open_and_supervise_one_cancels_all_nursery( @asynccontextmanager async def open_nursery( **kwargs, + ) -> typing.AsyncGenerator[ActorNursery, None]: - """Create and yield a new ``ActorNursery`` to be used for spawning + ''' + Create and yield a new ``ActorNursery`` to be used for spawning structured concurrent subactors. When an actor is spawned a new trio task is started which @@ -421,7 +456,8 @@ async def open_nursery( close it. It turns out this approach is probably more correct anyway since it is more clear from the following nested nurseries which cancellation scopes correspond to each spawned subactor set. - """ + + ''' implicit_runtime = False actor = current_actor(err_on_no_runtime=False) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py new file mode 100644 index 0000000..9b18a87 --- /dev/null +++ b/tractor/to_asyncio.py @@ -0,0 +1,405 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Infection apis for ``asyncio`` loops running ``trio`` using guest mode. + +''' +import asyncio +from asyncio.exceptions import CancelledError +from contextlib import asynccontextmanager as acm +from dataclasses import dataclass +import inspect +from typing import ( + Any, + Callable, + AsyncIterator, + Awaitable, + Optional, +) + +import trio + +from .log import get_logger +from ._state import current_actor +from ._exceptions import AsyncioCancelled + +log = get_logger(__name__) + + +__all__ = ['run_task', 'run_as_asyncio_guest'] + + +@dataclass +class LinkedTaskChannel(trio.abc.Channel): + ''' + A "linked task channel" which allows for two-way synchronized msg + passing between a ``trio``-in-guest-mode task and an ``asyncio`` + task scheduled in the host loop. + + ''' + _to_aio: asyncio.Queue + _from_aio: trio.MemoryReceiveChannel + _to_trio: trio.MemorySendChannel + + _trio_cs: trio.CancelScope + _aio_task_complete: trio.Event + + # set after ``asyncio.create_task()`` + _aio_task: Optional[asyncio.Task] = None + _aio_err: Optional[BaseException] = None + + async def aclose(self) -> None: + await self._from_aio.aclose() + + async def receive(self) -> Any: + async with translate_aio_errors(self): + return await self._from_aio.receive() + + async def wait_ayncio_complete(self) -> None: + await self._aio_task_complete.wait() + + # def cancel_asyncio_task(self) -> None: + # self._aio_task.cancel() + + async def send(self, item: Any) -> None: + ''' + Send a value through to the asyncio task presuming + it defines a ``from_trio`` argument, if it does not + this method will raise an error. + + ''' + self._to_aio.put_nowait(item) + + +def _run_asyncio_task( + + func: Callable, + *, + qsize: int = 1, + provide_channels: bool = False, + **kwargs, + +) -> LinkedTaskChannel: + ''' + Run an ``asyncio`` async function or generator in a task, return + or stream the result back to ``trio``. + + ''' + if not current_actor().is_infected_aio(): + raise RuntimeError("`infect_asyncio` mode is not enabled!?") + + # ITC (inter task comms), these channel/queue names are mostly from + # ``asyncio``'s perspective. + aio_q = from_trio = asyncio.Queue(qsize) # type: ignore + to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore + + args = tuple(inspect.getfullargspec(func).args) + + if getattr(func, '_tractor_steam_function', None): + # the assumption is that the target async routine accepts the + # send channel then it intends to yield more then one return + # value otherwise it would just return ;P + assert qsize > 1 + + if provide_channels: + assert 'to_trio' in args + + # allow target func to accept/stream results manually by name + if 'to_trio' in args: + kwargs['to_trio'] = to_trio + + if 'from_trio' in args: + kwargs['from_trio'] = from_trio + + coro = func(**kwargs) + + cancel_scope = trio.CancelScope() + aio_task_complete = trio.Event() + aio_err: Optional[BaseException] = None + + chan = LinkedTaskChannel( + aio_q, # asyncio.Queue + from_aio, # recv chan + to_trio, # send chan + + cancel_scope, + aio_task_complete, + ) + + async def wait_on_coro_final_result( + + to_trio: trio.MemorySendChannel, + coro: Awaitable, + aio_task_complete: trio.Event, + + ) -> None: + ''' + Await ``coro`` and relay result back to ``trio``. + + ''' + nonlocal aio_err + nonlocal chan + + orig = result = id(coro) + try: + result = await coro + except BaseException as aio_err: + chan._aio_err = aio_err + raise + + else: + if ( + result != orig and + aio_err is None and + + # in the ``open_channel_from()`` case we don't + # relay through the "return value". + not provide_channels + ): + to_trio.send_nowait(result) + + finally: + # if the task was spawned using ``open_channel_from()`` + # then we close the channels on exit. + if provide_channels: + # only close the sender side which will relay + # a ``trio.EndOfChannel`` to the trio (consumer) side. + to_trio.close() + + aio_task_complete.set() + + # start the asyncio task we submitted from trio + if not inspect.isawaitable(coro): + raise TypeError(f"No support for invoking {coro}") + + task = asyncio.create_task( + wait_on_coro_final_result( + to_trio, + coro, + aio_task_complete + ) + ) + chan._aio_task = task + + def cancel_trio(task: asyncio.Task) -> None: + ''' + Cancel the calling ``trio`` task on error. + + ''' + nonlocal chan + aio_err = chan._aio_err + + # only to avoid ``asyncio`` complaining about uncaptured + # task exceptions + try: + task.exception() + except BaseException as terr: + assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' + + if aio_err is not None: + if type(aio_err) is CancelledError: + log.cancel("infected task was cancelled") + else: + aio_err.with_traceback(aio_err.__traceback__) + log.exception("infected task errorred:") + + # NOTE: currently mem chan closure may act as a form + # of error relay (at least in the ``asyncio.CancelledError`` + # case) since we have no way to directly trigger a ``trio`` + # task error without creating a nursery to throw one. + # We might want to change this in the future though. + from_aio.close() + + task.add_done_callback(cancel_trio) + + return chan + + +@acm +async def translate_aio_errors( + + chan: LinkedTaskChannel, + +) -> AsyncIterator[None]: + ''' + Error handling context around ``asyncio`` task spawns which + appropriately translates errors and cancels into ``trio`` land. + + ''' + aio_err: Optional[BaseException] = None + + def maybe_raise_aio_err( + err: Optional[Exception] = None + ) -> None: + aio_err = chan._aio_err + if ( + aio_err is not None and + type(aio_err) != CancelledError + ): + # always raise from any captured asyncio error + if err: + raise aio_err from err + else: + raise aio_err + + task = chan._aio_task + assert task + try: + yield + except ( + # NOTE: see the note in the ``cancel_trio()`` asyncio task + # termination callback + trio.ClosedResourceError, + ): + aio_err = chan._aio_err + if ( + task.cancelled() and + type(aio_err) is CancelledError + ): + # if an underlying ``asyncio.CancelledError`` triggered this + # channel close, raise our (non-``BaseException``) wrapper + # error: ``AsyncioCancelled`` from that source error. + raise AsyncioCancelled from aio_err + + else: + raise + finally: + # always cancel the ``asyncio`` task if we've made it this far + # and it's not done. + if not task.done() and aio_err: + # assert not aio_err, 'WTF how did asyncio do this?!' + task.cancel() + + # if any ``asyncio`` error was caught, raise it here inline + # here in the ``trio`` task + maybe_raise_aio_err() + + +async def run_task( + func: Callable, + *, + + qsize: int = 2**10, + **kwargs, + +) -> Any: + ''' + Run an ``asyncio`` async function or generator in a task, return + or stream the result back to ``trio``. + + ''' + # simple async func + chan = _run_asyncio_task( + func, + qsize=1, + **kwargs, + ) + with chan._from_aio: + # try: + async with translate_aio_errors(chan): + # return single value that is the output from the + # ``asyncio`` function-as-task. Expect the mem chan api to + # do the job of handling cross-framework cancellations + # / errors via closure and translation in the + # ``translate_aio_errors()`` in the above ctx mngr. + return await chan.receive() + + +@acm +async def open_channel_from( + + target: Callable[..., Any], + **kwargs, + +) -> AsyncIterator[Any]: + ''' + Open an inter-loop linked task channel for streaming between a target + spawned ``asyncio`` task and ``trio``. + + ''' + chan = _run_asyncio_task( + target, + qsize=2**8, + provide_channels=True, + **kwargs, + ) + async with chan._from_aio: + async with translate_aio_errors(chan): + # sync to a "started()"-like first delivered value from the + # ``asyncio`` task. + first = await chan.receive() + + # stream values upward + yield first, chan + + +def run_as_asyncio_guest( + + trio_main: Callable, + +) -> None: + ''' + Entry for an "infected ``asyncio`` actor". + + Entrypoint for a Python process which starts the ``asyncio`` event + loop and runs ``trio`` in guest mode resulting in a system where + ``trio`` tasks can control ``asyncio`` tasks whilst maintaining + SC semantics. + + ''' + # Uh, oh. :o + + # It looks like your event loop has caught a case of the ``trio``s. + + # :() + + # Don't worry, we've heard you'll barely notice. You might hallucinate + # a few more propagating errors and feel like your digestion has + # slowed but if anything get's too bad your parents will know about + # it. + + # :) + + async def aio_main(trio_main): + + loop = asyncio.get_running_loop() + trio_done_fut = asyncio.Future() + + def trio_done_callback(main_outcome): + + print(f"trio_main finished: {main_outcome!r}") + trio_done_fut.set_result(main_outcome) + + # start the infection: run trio on the asyncio loop in "guest mode" + log.info(f"Infecting asyncio process with {trio_main}") + + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + done_callback=trio_done_callback, + ) + return (await trio_done_fut).unwrap() + + # might as well if it's installed. + try: + import uvloop + loop = uvloop.new_event_loop() + asyncio.set_event_loop(loop) + except ImportError: + pass + + return asyncio.run(aio_main(trio_main)) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 77ab6d0..6c04895 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -47,8 +47,9 @@ class AsyncReceiver( Protocol, Generic[ReceiveType], ): - '''An async receivable duck-type that quacks much like trio's - ``trio.abc.ReceieveChannel``. + ''' + An async receivable duck-type that quacks much like trio's + ``trio.abc.ReceiveChannel``. ''' @abstractmethod @@ -78,7 +79,8 @@ class AsyncReceiver( class Lagged(trio.TooSlowError): - '''Subscribed consumer task was too slow and was overrun + ''' + Subscribed consumer task was too slow and was overrun by the fastest consumer-producer pair. ''' @@ -86,7 +88,8 @@ class Lagged(trio.TooSlowError): @dataclass class BroadcastState: - '''Common state to all receivers of a broadcast. + ''' + Common state to all receivers of a broadcast. ''' queue: deque @@ -111,7 +114,8 @@ class BroadcastState: class BroadcastReceiver(ReceiveChannel): - '''A memory receive channel broadcaster which is non-lossy for the + ''' + A memory receive channel broadcaster which is non-lossy for the fastest consumer. Additional consumer tasks can receive all produced values by registering