From acd63d0c895469c7de7813e0364a905c2ec51f77 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 28 Jun 2020 22:44:16 -0400 Subject: [PATCH 01/53] First draft "infected `asyncio` mode" This should mostly maintain top level SC principles for any task spawned using `tractor.to_asyncio.run()`. When the `asyncio` task completes make sure to cancel the pertaining `trio` cancel scope and raise any error that may have resulted. This interface uses `trio`'s "guest-mode" to run `asyncio` loop using a special entrypoint which is handed to Python during process spawn. --- tractor/_actor.py | 6 ++ tractor/_entry.py | 7 +- tractor/to_asyncio.py | 150 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 tractor/to_asyncio.py diff --git a/tractor/_actor.py b/tractor/_actor.py index e737004..47806be 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -361,6 +361,9 @@ class Actor: # syncs for setup/teardown sequences _server_down: Optional[trio.Event] = None + # if started on ``asycio`` running ``trio`` in guest mode + _infected_aio: bool = False + def __init__( self, name: str, @@ -1459,6 +1462,9 @@ class Actor: log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid + def is_infected_aio(self) -> bool: + return self._infected_aio + class Arbiter(Actor): ''' diff --git a/tractor/_entry.py b/tractor/_entry.py index 5eda669..812f3ab 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -26,6 +26,7 @@ import trio # type: ignore from .log import get_console_log, get_logger from . import _state +from .to_asyncio import run_as_asyncio_guest log = get_logger(__name__) @@ -62,7 +63,11 @@ def _mp_main( parent_addr=parent_addr ) try: - trio.run(trio_main) + if infect_asyncio: + actor._infected_aio = True + run_as_asyncio_guest(trio_main) + else: + trio.run(trio_main) except KeyboardInterrupt: pass # handle it the same way trio does? diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py new file mode 100644 index 0000000..a45904a --- /dev/null +++ b/tractor/to_asyncio.py @@ -0,0 +1,150 @@ +""" +Infection apis for ``asyncio`` loops running ``trio`` using guest mode. +""" +import asyncio +import inspect +from typing import ( + Any, + Callable, + AsyncGenerator, + Awaitable, + Union, +) + +import trio + +from .log import get_logger +from ._state import current_actor + +log = get_logger(__name__) + + +__all__ = ['run_task'] + + +async def _invoke( + from_trio: trio.abc.ReceiveChannel, + to_trio: asyncio.Queue, + coro: Awaitable, +) -> Union[AsyncGenerator, Awaitable]: + """Await or stream awaiable object based on type into + ``trio`` memory channel. + """ + async def stream_from_gen(c): + async for item in c: + to_trio.send_nowait(item) + + async def just_return(c): + to_trio.send_nowait(await c) + + if inspect.isasyncgen(coro): + return await stream_from_gen(coro) + elif inspect.iscoroutine(coro): + return await coro + + +async def run_task( + func: Callable, + qsize: int = 2**10, + **kwargs, +) -> Any: + """Run an ``asyncio`` async function or generator in a task, return + or stream the result back to ``trio``. + """ + assert current_actor()._infected_aio + + # ITC (inter task comms) + from_trio = asyncio.Queue(qsize) + to_trio, from_aio = trio.open_memory_channel(qsize) + + # allow target func to accept/stream results manually + kwargs['to_trio'] = to_trio + kwargs['from_trio'] = to_trio + + coro = func(**kwargs) + + cancel_scope = trio.CancelScope() + + # start the asyncio task we submitted from trio + # TODO: try out ``anyio`` asyncio based tg here + task = asyncio.create_task(_invoke(from_trio, to_trio, coro)) + err = None + + # XXX: I'm not sure this actually does anything... + def cancel_trio(task): + """Cancel the calling ``trio`` task on error. + """ + nonlocal err + err = task.exception() + cancel_scope.cancel() + + task.add_done_callback(cancel_trio) + + # determine return type async func vs. gen + if inspect.isasyncgen(coro): + # simple async func + async def result(): + with cancel_scope: + return await from_aio.get() + if cancel_scope.cancelled_caught and err: + raise err + + elif inspect.iscoroutine(coro): + # asycn gen + async def result(): + with cancel_scope: + async with from_aio: + async for item in from_aio: + yield item + if cancel_scope.cancelled_caught and err: + raise err + + return result() + + +def run_as_asyncio_guest( + trio_main: Awaitable, +) -> None: + """Entry for an "infected ``asyncio`` actor". + + Uh, oh. :o + + It looks like your event loop has caught a case of the ``trio``s. + + :() + + Don't worry, we've heard you'll barely notice. You might hallucinate + a few more propagating errors and feel like your digestion has + slowed but if anything get's too bad your parents will know about + it. + + :) + """ + async def aio_main(trio_main): + loop = asyncio.get_running_loop() + + trio_done_fut = asyncio.Future() + + def trio_done_callback(main_outcome): + log.info(f"trio_main finished: {main_outcome!r}") + trio_done_fut.set_result(main_outcome) + + # start the infection: run trio on the asyncio loop in "guest mode" + log.info(f"Infecting asyncio process with {trio_main}") + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + done_callback=trio_done_callback, + ) + + (await trio_done_fut).unwrap() + + # might as well if it's installed. + try: + import uvloop + loop = uvloop.new_event_loop() + asyncio.set_event_loop(loop) + except ImportError: + pass + + asyncio.run(aio_main(trio_main)) From 1825b21d2c4e29d9fc09a0a681324d7b3e3f0df9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Jul 2020 17:33:46 -0400 Subject: [PATCH 02/53] Wow, fix all the broken async func invoking code.. Clearly this wasn't developed against a task that spawned just an async func in `asyncio`.. Fix all that and remove a bunch of unnecessary func layers. Add provisional support for the target receiving the `to_trio` and `from_trio` channels and for the @tractor.stream marker. --- tractor/to_asyncio.py | 67 +++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index a45904a..fb8f4cd 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -4,7 +4,6 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. import asyncio import inspect from typing import ( - Any, Callable, AsyncGenerator, Awaitable, @@ -19,47 +18,55 @@ from ._state import current_actor log = get_logger(__name__) -__all__ = ['run_task'] +__all__ = ['run_task', 'run_as_asyncio_guest'] async def _invoke( from_trio: trio.abc.ReceiveChannel, to_trio: asyncio.Queue, coro: Awaitable, -) -> Union[AsyncGenerator, Awaitable]: - """Await or stream awaiable object based on type into +) -> None: + """Await or stream awaiable object based on ``coro`` type into ``trio`` memory channel. + + ``from_trio`` might eventually be used here for bidirectional streaming. """ - async def stream_from_gen(c): - async for item in c: - to_trio.send_nowait(item) - - async def just_return(c): - to_trio.send_nowait(await c) - if inspect.isasyncgen(coro): - return await stream_from_gen(coro) + async for item in coro: + to_trio.send_nowait(item) elif inspect.iscoroutine(coro): - return await coro + to_trio.send_nowait(await coro) async def run_task( func: Callable, + *, qsize: int = 2**10, + _treat_as_stream: bool = False, **kwargs, -) -> Any: +) -> Union[AsyncGenerator, Awaitable]: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. """ - assert current_actor()._infected_aio + assert current_actor().is_infected_aio() # ITC (inter task comms) from_trio = asyncio.Queue(qsize) to_trio, from_aio = trio.open_memory_channel(qsize) - # allow target func to accept/stream results manually - kwargs['to_trio'] = to_trio - kwargs['from_trio'] = to_trio + args = tuple(inspect.getfullargspec(func).args) + + if getattr(func, '_tractor_steam_function', None): + # the assumption is that the target async routine accepts the + # send channel then it intends to yield more then one return + # value otherwise it would just return ;P + _treat_as_stream = True + + # allow target func to accept/stream results manually by name + if 'to_trio' in args: + kwargs['to_trio'] = to_trio + if 'from_trio' in args: + kwargs['from_trio'] = to_trio coro = func(**kwargs) @@ -70,7 +77,6 @@ async def run_task( task = asyncio.create_task(_invoke(from_trio, to_trio, coro)) err = None - # XXX: I'm not sure this actually does anything... def cancel_trio(task): """Cancel the calling ``trio`` task on error. """ @@ -80,17 +86,8 @@ async def run_task( task.add_done_callback(cancel_trio) - # determine return type async func vs. gen - if inspect.isasyncgen(coro): - # simple async func - async def result(): - with cancel_scope: - return await from_aio.get() - if cancel_scope.cancelled_caught and err: - raise err - - elif inspect.iscoroutine(coro): - # asycn gen + # asycn gen + if inspect.isasyncgen(coro) or _treat_as_stream: async def result(): with cancel_scope: async with from_aio: @@ -99,7 +96,15 @@ async def run_task( if cancel_scope.cancelled_caught and err: raise err - return result() + return result() + + # simple async func + elif inspect.iscoroutine(coro): + with cancel_scope: + result = await from_aio.receive() + return result + if cancel_scope.cancelled_caught and err: + raise err def run_as_asyncio_guest( From 055788cf1642390685aa7c9cd3bf15c656d1ef95 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 21 Jul 2020 10:32:37 -0400 Subject: [PATCH 03/53] Attempt to make mypy happy.. --- tractor/to_asyncio.py | 58 +++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index fb8f4cd..0f22300 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -4,6 +4,7 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. import asyncio import inspect from typing import ( + Any, Callable, AsyncGenerator, Awaitable, @@ -21,21 +22,26 @@ log = get_logger(__name__) __all__ = ['run_task', 'run_as_asyncio_guest'] -async def _invoke( - from_trio: trio.abc.ReceiveChannel, - to_trio: asyncio.Queue, +async def run_coro( + to_trio: trio.MemorySendChannel, coro: Awaitable, ) -> None: - """Await or stream awaiable object based on ``coro`` type into - ``trio`` memory channel. - - ``from_trio`` might eventually be used here for bidirectional streaming. + """Await ``coro`` and relay result back to ``trio``. """ - if inspect.isasyncgen(coro): - async for item in coro: - to_trio.send_nowait(item) - elif inspect.iscoroutine(coro): - to_trio.send_nowait(await coro) + to_trio.send_nowait(await coro) + + +async def consume_asyncgen( + to_trio: trio.MemorySendChannel, + coro: AsyncGenerator, +) -> None: + """Stream async generator results back to ``trio``. + + ``from_trio`` might eventually be used here for + bidirectional streaming. + """ + async for item in coro: + to_trio.send_nowait(item) async def run_task( @@ -44,15 +50,15 @@ async def run_task( qsize: int = 2**10, _treat_as_stream: bool = False, **kwargs, -) -> Union[AsyncGenerator, Awaitable]: +) -> Union[AsyncGenerator, Any]: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. """ assert current_actor().is_infected_aio() # ITC (inter task comms) - from_trio = asyncio.Queue(qsize) - to_trio, from_aio = trio.open_memory_channel(qsize) + from_trio = asyncio.Queue(qsize) # type: ignore + to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore args = tuple(inspect.getfullargspec(func).args) @@ -66,7 +72,7 @@ async def run_task( if 'to_trio' in args: kwargs['to_trio'] = to_trio if 'from_trio' in args: - kwargs['from_trio'] = to_trio + kwargs['from_trio'] = from_trio coro = func(**kwargs) @@ -74,7 +80,13 @@ async def run_task( # start the asyncio task we submitted from trio # TODO: try out ``anyio`` asyncio based tg here - task = asyncio.create_task(_invoke(from_trio, to_trio, coro)) + if inspect.isawaitable(coro): + task = asyncio.create_task(run_coro(to_trio, coro)) + elif inspect.isasyncgen(coro): + task = asyncio.create_task(consume_asyncgen(to_trio, coro)) + else: + raise TypeError(f"No support for {coro}") + err = None def cancel_trio(task): @@ -88,27 +100,29 @@ async def run_task( # asycn gen if inspect.isasyncgen(coro) or _treat_as_stream: - async def result(): + + async def stream_results(): with cancel_scope: + # stream values upward async with from_aio: async for item in from_aio: yield item if cancel_scope.cancelled_caught and err: raise err - return result() + return stream_results() # simple async func elif inspect.iscoroutine(coro): with cancel_scope: - result = await from_aio.receive() - return result + # return single value + return await from_aio.receive() if cancel_scope.cancelled_caught and err: raise err def run_as_asyncio_guest( - trio_main: Awaitable, + trio_main: Callable, ) -> None: """Entry for an "infected ``asyncio`` actor". From 1406ddc5ee5c16af1c30be17e03798bd8228fa26 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jul 2020 00:35:41 -0400 Subject: [PATCH 04/53] Add `infect_asyncio: bool` flag to nursery methods --- tractor/_supervise.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 36e05b5..5725338 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -81,6 +81,7 @@ class ActorNursery: loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, debug_mode: Optional[bool] = None, + infect_asyncio: bool = False, ) -> Portal: ''' Start a (daemon) actor: an process that has no designated @@ -134,6 +135,7 @@ class ActorNursery: bind_addr, parent_addr, _rtv, # run time vars + infect_asyncio=infect_asyncio, ) ) @@ -146,6 +148,7 @@ class ActorNursery: rpc_module_paths: Optional[List[str]] = None, enable_modules: List[str] = None, loglevel: str = None, # set log level per subactor + infect_asyncio: bool = False, **kwargs, # explicit args to ``fn`` ) -> Portal: """Spawn a new actor, run a lone task, then terminate the actor and @@ -170,6 +173,7 @@ class ActorNursery: loglevel=loglevel, # use the run_in_actor nursery nursery=self._ria_nursery, + infect_asyncio=infect_asyncio, ) # XXX: don't allow stream funcs From 8070b16bd0c694592a55ce96ea29d81d0aec080b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 27 Jul 2020 11:03:17 -0400 Subject: [PATCH 05/53] Support asyncio actors with the trio spawner backend --- tractor/_child.py | 8 ++++++-- tractor/_entry.py | 10 +++++++++- tractor/_spawn.py | 5 ++++- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/tractor/_child.py b/tractor/_child.py index f384ac4..7790731 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -37,12 +37,15 @@ def parse_ipaddr(arg): return (str(host), int(port)) +from ._entry import _trio_main + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--uid", type=parse_uid) parser.add_argument("--loglevel", type=str) parser.add_argument("--parent_addr", type=parse_ipaddr) + parser.add_argument("--asyncio", action='store_true') args = parser.parse_args() subactor = Actor( @@ -54,5 +57,6 @@ if __name__ == "__main__": _trio_main( subactor, - parent_addr=args.parent_addr - ) \ No newline at end of file + parent_addr=args.parent_addr, + infect_asyncio=args.asyncio, + ) diff --git a/tractor/_entry.py b/tractor/_entry.py index 812f3ab..0e31f32 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -38,6 +38,7 @@ def _mp_main( forkserver_info: Tuple[Any, Any, Any, Any, Any], start_method: str, parent_addr: Tuple[str, int] = None, + infect_asyncio: bool = False, ) -> None: """The routine called *after fork* which invokes a fresh ``trio.run`` """ @@ -79,6 +80,7 @@ def _trio_main( actor: 'Actor', # type: ignore *, parent_addr: Tuple[str, int] = None, + infect_asyncio: bool = False, ) -> None: """Entry point for a `trio_run_in_process` subactor. """ @@ -88,6 +90,8 @@ def _trio_main( log.info(f"Started new trio process for {actor.uid}") + log.info(f"Started new trio process for {actor.uid}") + if actor.loglevel is not None: log.info( f"Setting loglevel for {actor.uid} to {actor.loglevel}") @@ -105,7 +109,11 @@ def _trio_main( ) try: - trio.run(trio_main) + if infect_asyncio: + actor._infected_aio = True + run_as_asyncio_guest(trio_main) + else: + trio.run(trio_main) except KeyboardInterrupt: log.warning(f"Actor {actor.uid} received KBI") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index d16ddbd..5525c5d 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -244,6 +244,7 @@ async def new_proc( _runtime_vars: Dict[str, Any], # serialized and sent to _child *, + infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: @@ -260,7 +261,6 @@ async def new_proc( uid = subactor.uid if _spawn_method == 'trio': - spawn_cmd = [ sys.executable, "-m", @@ -283,6 +283,9 @@ async def new_proc( "--loglevel", subactor.loglevel ] + # Tell child to run in guest mode on top of ``asyncio`` loop + if infect_asyncio: + spawn_cmd.append("--asyncio") cancelled_during_spawn: bool = False proc: Optional[trio.Process] = None From 2cf87146a30cbc31c2f1f1ffd8b92cc1b5f4cb47 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 12 Sep 2020 11:41:17 -0400 Subject: [PATCH 06/53] Log any asyncio error --- tractor/to_asyncio.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 0f22300..59db280 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -94,6 +94,8 @@ async def run_task( """ nonlocal err err = task.exception() + if err: + log.exception(f"asyncio task errorred:\n{err}") cancel_scope.cancel() task.add_done_callback(cancel_trio) @@ -114,6 +116,7 @@ async def run_task( # simple async func elif inspect.iscoroutine(coro): + with cancel_scope: # return single value return await from_aio.receive() @@ -121,6 +124,7 @@ async def run_task( raise err + def run_as_asyncio_guest( trio_main: Callable, ) -> None: From 80f47dece24d4ad383b8e28a4cd0f8dfe2e6a468 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 14 Oct 2020 12:51:41 -0400 Subject: [PATCH 07/53] Raise from asyncio error; fixes mypy --- tractor/to_asyncio.py | 50 +++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 59db280..85600d7 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -6,7 +6,7 @@ import inspect from typing import ( Any, Callable, - AsyncGenerator, + AsyncIterator, Awaitable, Union, ) @@ -33,7 +33,7 @@ async def run_coro( async def consume_asyncgen( to_trio: trio.MemorySendChannel, - coro: AsyncGenerator, + coro: AsyncIterator, ) -> None: """Stream async generator results back to ``trio``. @@ -50,7 +50,7 @@ async def run_task( qsize: int = 2**10, _treat_as_stream: bool = False, **kwargs, -) -> Union[AsyncGenerator, Any]: +) -> Any: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. """ @@ -79,50 +79,58 @@ async def run_task( cancel_scope = trio.CancelScope() # start the asyncio task we submitted from trio - # TODO: try out ``anyio`` asyncio based tg here if inspect.isawaitable(coro): task = asyncio.create_task(run_coro(to_trio, coro)) elif inspect.isasyncgen(coro): task = asyncio.create_task(consume_asyncgen(to_trio, coro)) else: - raise TypeError(f"No support for {coro}") + raise TypeError(f"No support for invoking {coro}") - err = None + aio_err = None def cancel_trio(task): """Cancel the calling ``trio`` task on error. """ nonlocal err - err = task.exception() - if err: - log.exception(f"asyncio task errorred:\n{err}") + aio_err = task.exception() + if aio_err: + log.exception(f"asyncio task errorred:\n{aio_err}") cancel_scope.cancel() task.add_done_callback(cancel_trio) - # asycn gen + # async iterator if inspect.isasyncgen(coro) or _treat_as_stream: async def stream_results(): - with cancel_scope: - # stream values upward - async with from_aio: - async for item in from_aio: - yield item - if cancel_scope.cancelled_caught and err: - raise err + try: + with cancel_scope: + # stream values upward + async with from_aio: + async for item in from_aio: + yield item + except BaseException as err: + if aio_err is not None: + # always raise from any captured asyncio error + raise err from aio_err + else: + raise return stream_results() # simple async func - elif inspect.iscoroutine(coro): - + try: with cancel_scope: # return single value return await from_aio.receive() - if cancel_scope.cancelled_caught and err: - raise err + # Do we need this? + except BaseException as err: + if aio_err is not None: + # always raise from any captured asyncio error + raise err from aio_err + else: + raise def run_as_asyncio_guest( From 509ae132ec6d1d3e72420dc252d08b816b6bf7e3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Dec 2020 13:48:40 -0500 Subject: [PATCH 08/53] Raise any asyncio errors if in trio task on cancel --- tractor/to_asyncio.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 85600d7..1fa923d 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -8,7 +8,6 @@ from typing import ( Callable, AsyncIterator, Awaitable, - Union, ) import trio @@ -91,10 +90,12 @@ async def run_task( def cancel_trio(task): """Cancel the calling ``trio`` task on error. """ - nonlocal err + nonlocal aio_err aio_err = task.exception() + if aio_err: log.exception(f"asyncio task errorred:\n{aio_err}") + cancel_scope.cancel() task.add_done_callback(cancel_trio) @@ -109,6 +110,12 @@ async def run_task( async with from_aio: async for item in from_aio: yield item + + if cancel_scope.cancelled_caught: + # always raise from any captured asyncio error + if aio_err: + raise aio_err + except BaseException as err: if aio_err is not None: # always raise from any captured asyncio error @@ -124,6 +131,11 @@ async def run_task( # return single value return await from_aio.receive() + if cancel_scope.cancelled_caught: + # always raise from any captured asyncio error + if aio_err: + raise aio_err + # Do we need this? except BaseException as err: if aio_err is not None: From 340effae110d6857134ce016703e278d3b4091c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Dec 2020 13:49:11 -0500 Subject: [PATCH 09/53] Add initial infected asyncio error propagation test --- tests/test_infected_asyncio.py | 24 ++++++++++++++++++++++++ tractor/to_asyncio.py | 3 +++ 2 files changed, 27 insertions(+) create mode 100644 tests/test_infected_asyncio.py diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py new file mode 100644 index 0000000..def1776 --- /dev/null +++ b/tests/test_infected_asyncio.py @@ -0,0 +1,24 @@ +import asyncio + +import pytest +import tractor + +async def sleep_and_err(): + await asyncio.sleep(0.1) + assert 0 + + +async def asyncio_actor(): + assert tractor.current_actor().is_infected_aio() + + await tractor.to_asyncio.run_task(sleep_and_err) + + +def test_infected_simple_error(arb_addr): + + async def main(): + async with tractor.open_nursery() as n: + await n.run_in_actor(asyncio_actor, infected_asyncio=True) + + with pytest.raises(tractor.RemoteActorError) as excinfo: + tractor.run(main, arbiter_addr=arb_addr) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 1fa923d..6e46903 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -164,16 +164,19 @@ def run_as_asyncio_guest( :) """ async def aio_main(trio_main): + loop = asyncio.get_running_loop() trio_done_fut = asyncio.Future() def trio_done_callback(main_outcome): + log.info(f"trio_main finished: {main_outcome!r}") trio_done_fut.set_result(main_outcome) # start the infection: run trio on the asyncio loop in "guest mode" log.info(f"Infecting asyncio process with {trio_main}") + trio.lowlevel.start_guest_run( trio_main, run_sync_soon_threadsafe=loop.call_soon_threadsafe, From d80f8d7a396684c004b64115c3617aca8cbee841 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Apr 2021 12:20:33 -0400 Subject: [PATCH 10/53] WIP redo asyncio async gen streaming --- tractor/_root.py | 1 - tractor/to_asyncio.py | 167 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 136 insertions(+), 32 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 3468a25..797e736 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -103,7 +103,6 @@ async def open_root_actor( _default_arbiter_port, ) - if loglevel is None: loglevel = log.get_loglevel() else: diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 6e46903..cd761c9 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -43,15 +43,16 @@ async def consume_asyncgen( to_trio.send_nowait(item) -async def run_task( +def _run_asyncio_task( func: Callable, *, - qsize: int = 2**10, + qsize: int = 1, _treat_as_stream: bool = False, **kwargs, ) -> Any: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. + """ assert current_actor().is_infected_aio() @@ -59,29 +60,38 @@ async def run_task( from_trio = asyncio.Queue(qsize) # type: ignore to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore + from_aio._err = None + args = tuple(inspect.getfullargspec(func).args) if getattr(func, '_tractor_steam_function', None): # the assumption is that the target async routine accepts the # send channel then it intends to yield more then one return # value otherwise it would just return ;P - _treat_as_stream = True + # _treat_as_stream = True + assert qsize > 1 # allow target func to accept/stream results manually by name if 'to_trio' in args: kwargs['to_trio'] = to_trio + if 'from_trio' in args: kwargs['from_trio'] = from_trio + # if 'from_aio' in args: + # kwargs['from_aio'] = from_aio + coro = func(**kwargs) - cancel_scope = trio.CancelScope() + # cancel_scope = trio.CancelScope() # start the asyncio task we submitted from trio if inspect.isawaitable(coro): task = asyncio.create_task(run_coro(to_trio, coro)) + elif inspect.isasyncgen(coro): task = asyncio.create_task(consume_asyncgen(to_trio, coro)) + else: raise TypeError(f"No support for invoking {coro}") @@ -96,54 +106,149 @@ async def run_task( if aio_err: log.exception(f"asyncio task errorred:\n{aio_err}") - cancel_scope.cancel() + # cancel_scope.cancel() + from_aio._err = aio_err + to_trio.close() task.add_done_callback(cancel_trio) + return task, from_aio, to_trio + + +async def run_task( + func: Callable, + *, + qsize: int = 2**10, + _treat_as_stream: bool = False, + **kwargs, +) -> Any: + """Run an ``asyncio`` async function or generator in a task, return + or stream the result back to ``trio``. + + """ + # assert current_actor().is_infected_aio() + + # # ITC (inter task comms) + # from_trio = asyncio.Queue(qsize) # type: ignore + # to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore + + # args = tuple(inspect.getfullargspec(func).args) + + # if getattr(func, '_tractor_steam_function', None): + # # the assumption is that the target async routine accepts the + # # send channel then it intends to yield more then one return + # # value otherwise it would just return ;P + # _treat_as_stream = True + + # # allow target func to accept/stream results manually by name + # if 'to_trio' in args: + # kwargs['to_trio'] = to_trio + # if 'from_trio' in args: + # kwargs['from_trio'] = from_trio + + # coro = func(**kwargs) + + # cancel_scope = trio.CancelScope() + + # # start the asyncio task we submitted from trio + # if inspect.isawaitable(coro): + # task = asyncio.create_task(run_coro(to_trio, coro)) + + # elif inspect.isasyncgen(coro): + # task = asyncio.create_task(consume_asyncgen(to_trio, coro)) + + # else: + # raise TypeError(f"No support for invoking {coro}") + + # aio_err = None + + # def cancel_trio(task): + # """Cancel the calling ``trio`` task on error. + # """ + # nonlocal aio_err + # aio_err = task.exception() + + # if aio_err: + # log.exception(f"asyncio task errorred:\n{aio_err}") + + # cancel_scope.cancel() + + # task.add_done_callback(cancel_trio) + # async iterator - if inspect.isasyncgen(coro) or _treat_as_stream: + # if inspect.isasyncgen(coro) or _treat_as_stream: - async def stream_results(): - try: - with cancel_scope: - # stream values upward - async with from_aio: - async for item in from_aio: - yield item + # if inspect.isasyncgenfunction(meth) or : + if _treat_as_stream: - if cancel_scope.cancelled_caught: - # always raise from any captured asyncio error - if aio_err: - raise aio_err + task, from_aio, to_trio = _run_asyncio_task( + func, + qsize=2**8, + **kwargs, + ) - except BaseException as err: - if aio_err is not None: - # always raise from any captured asyncio error - raise err from aio_err - else: - raise + return from_aio - return stream_results() + # async def stream_results(): + # try: + # with cancel_scope: + # # stream values upward + # async with from_aio: + # async for item in from_aio: + # yield item + + # if cancel_scope.cancelled_caught: + # # always raise from any captured asyncio error + # if aio_err: + # raise aio_err + + # except BaseException as err: + # if aio_err is not None: + # # always raise from any captured asyncio error + # raise err from aio_err + # else: + # raise + # finally: + # # breakpoint() + # task.cancel() + + # return stream_results() # simple async func try: - with cancel_scope: - # return single value - return await from_aio.receive() + task, from_aio, to_trio = _run_asyncio_task( + func, + qsize=1, + **kwargs, + ) - if cancel_scope.cancelled_caught: - # always raise from any captured asyncio error - if aio_err: - raise aio_err + # with cancel_scope: + # async with from_aio: + # return single value + return await from_aio.receive() + + # if cancel_scope.cancelled_caught: + # # always raise from any captured asyncio error + # if aio_err: + # raise aio_err # Do we need this? except BaseException as err: + # await tractor.breakpoint() + aio_err = from_aio._err if aio_err is not None: # always raise from any captured asyncio error raise err from aio_err else: raise + finally: + task.cancel() + + +# async def stream_from_task +# pass + def run_as_asyncio_guest( trio_main: Callable, From 793bcfb7d466d74cd085b1b8945b02369af33e61 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 May 2021 07:47:38 -0400 Subject: [PATCH 11/53] Pass `infect_asyncio` flag to mp actors as well --- tractor/_spawn.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 5525c5d..b8c63fb 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -244,6 +244,7 @@ async def new_proc( _runtime_vars: Dict[str, Any], # serialized and sent to _child *, + infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED @@ -415,6 +416,7 @@ async def new_proc( bind_addr=bind_addr, parent_addr=parent_addr, _runtime_vars=_runtime_vars, + infect_asyncio=infect_asyncio, task_status=task_status, ) @@ -430,6 +432,7 @@ async def mp_new_proc( parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child *, + infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: @@ -475,6 +478,7 @@ async def mp_new_proc( fs_info, start_method, parent_addr, + infect_asyncio, ), # daemon=True, name=name, From aa24bbc11cd194775e0f2d2e59b2395435aa348a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 May 2021 23:43:33 -0400 Subject: [PATCH 12/53] Proxy asyncio cancelleds as well --- tractor/to_asyncio.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index cd761c9..f678b55 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -101,7 +101,10 @@ def _run_asyncio_task( """Cancel the calling ``trio`` task on error. """ nonlocal aio_err - aio_err = task.exception() + try: + aio_err = task.exception() + except asyncio.CancelledError as cerr: + aio_err = cerr if aio_err: log.exception(f"asyncio task errorred:\n{aio_err}") @@ -233,17 +236,25 @@ async def run_task( # raise aio_err # Do we need this? - except BaseException as err: + except Exception as err: # await tractor.breakpoint() aio_err = from_aio._err + + # try: if aio_err is not None: # always raise from any captured asyncio error raise err from aio_err else: raise + # finally: + # if not task.done(): + # task.cancel() - finally: - task.cancel() + except trio.Cancelled: + if not task.done(): + task.cancel() + + raise # async def stream_from_task From 55e210fec628117b5d9e7970ff32399b562d32bd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 2 Jun 2021 08:22:51 -0400 Subject: [PATCH 13/53] Drop bad .close() call --- tractor/to_asyncio.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index f678b55..ef7d595 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -111,7 +111,6 @@ def _run_asyncio_task( # cancel_scope.cancel() from_aio._err = aio_err - to_trio.close() task.add_done_callback(cancel_trio) From 325c0cdb1b93a6ee102bf90411def5b70e3d15d2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Jul 2021 12:32:46 -0400 Subject: [PATCH 14/53] Fix error propagation on asyncio streaming tasks --- tractor/to_asyncio.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index ef7d595..92320a0 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -83,7 +83,7 @@ def _run_asyncio_task( coro = func(**kwargs) - # cancel_scope = trio.CancelScope() + cancel_scope = trio.CancelScope() # start the asyncio task we submitted from trio if inspect.isawaitable(coro): @@ -109,12 +109,13 @@ def _run_asyncio_task( if aio_err: log.exception(f"asyncio task errorred:\n{aio_err}") - # cancel_scope.cancel() + cancel_scope.cancel() from_aio._err = aio_err + from_aio.close() task.add_done_callback(cancel_trio) - return task, from_aio, to_trio + return task, from_aio, to_trio, cancel_scope async def run_task( @@ -183,7 +184,7 @@ async def run_task( # if inspect.isasyncgenfunction(meth) or : if _treat_as_stream: - task, from_aio, to_trio = _run_asyncio_task( + task, from_aio, to_trio, cs = _run_asyncio_task( func, qsize=2**8, **kwargs, @@ -218,7 +219,7 @@ async def run_task( # simple async func try: - task, from_aio, to_trio = _run_asyncio_task( + task, from_aio, to_trio, cs = _run_asyncio_task( func, qsize=1, **kwargs, @@ -227,12 +228,13 @@ async def run_task( # with cancel_scope: # async with from_aio: # return single value - return await from_aio.receive() + with cs: + return await from_aio.receive() - # if cancel_scope.cancelled_caught: - # # always raise from any captured asyncio error - # if aio_err: - # raise aio_err + if cs.cancelled_caught: + # always raise from any captured asyncio error + if from_aio._err: + raise from_aio._err # Do we need this? except Exception as err: From d9dac3f36cc436c65616bf6ef9465e0209c3ef52 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 2 Aug 2021 12:36:40 -0400 Subject: [PATCH 15/53] Drop old implementation cruft --- tractor/to_asyncio.py | 121 +++++++++++------------------------------- 1 file changed, 31 insertions(+), 90 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 92320a0..c7bcc89 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -1,6 +1,7 @@ -""" +''' Infection apis for ``asyncio`` loops running ``trio`` using guest mode. -""" + +''' import asyncio import inspect from typing import ( @@ -119,69 +120,20 @@ def _run_asyncio_task( async def run_task( + func: Callable, *, + qsize: int = 2**10, _treat_as_stream: bool = False, **kwargs, + ) -> Any: """Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. """ - # assert current_actor().is_infected_aio() - - # # ITC (inter task comms) - # from_trio = asyncio.Queue(qsize) # type: ignore - # to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore - - # args = tuple(inspect.getfullargspec(func).args) - - # if getattr(func, '_tractor_steam_function', None): - # # the assumption is that the target async routine accepts the - # # send channel then it intends to yield more then one return - # # value otherwise it would just return ;P - # _treat_as_stream = True - - # # allow target func to accept/stream results manually by name - # if 'to_trio' in args: - # kwargs['to_trio'] = to_trio - # if 'from_trio' in args: - # kwargs['from_trio'] = from_trio - - # coro = func(**kwargs) - - # cancel_scope = trio.CancelScope() - - # # start the asyncio task we submitted from trio - # if inspect.isawaitable(coro): - # task = asyncio.create_task(run_coro(to_trio, coro)) - - # elif inspect.isasyncgen(coro): - # task = asyncio.create_task(consume_asyncgen(to_trio, coro)) - - # else: - # raise TypeError(f"No support for invoking {coro}") - - # aio_err = None - - # def cancel_trio(task): - # """Cancel the calling ``trio`` task on error. - # """ - # nonlocal aio_err - # aio_err = task.exception() - - # if aio_err: - # log.exception(f"asyncio task errorred:\n{aio_err}") - - # cancel_scope.cancel() - - # task.add_done_callback(cancel_trio) - - # async iterator - # if inspect.isasyncgen(coro) or _treat_as_stream: - - # if inspect.isasyncgenfunction(meth) or : + # streaming ``asyncio`` task if _treat_as_stream: task, from_aio, to_trio, cs = _run_asyncio_task( @@ -190,33 +142,10 @@ async def run_task( **kwargs, ) + # naively expect the mem chan api to do the job + # of handling cross-framework cancellations / errors return from_aio - # async def stream_results(): - # try: - # with cancel_scope: - # # stream values upward - # async with from_aio: - # async for item in from_aio: - # yield item - - # if cancel_scope.cancelled_caught: - # # always raise from any captured asyncio error - # if aio_err: - # raise aio_err - - # except BaseException as err: - # if aio_err is not None: - # # always raise from any captured asyncio error - # raise err from aio_err - # else: - # raise - # finally: - # # breakpoint() - # task.cancel() - - # return stream_results() - # simple async func try: task, from_aio, to_trio, cs = _run_asyncio_task( @@ -225,9 +154,7 @@ async def run_task( **kwargs, ) - # with cancel_scope: - # async with from_aio: - # return single value + # return single value with cs: return await from_aio.receive() @@ -238,18 +165,14 @@ async def run_task( # Do we need this? except Exception as err: - # await tractor.breakpoint() + aio_err = from_aio._err - # try: if aio_err is not None: # always raise from any captured asyncio error raise err from aio_err else: raise - # finally: - # if not task.done(): - # task.cancel() except trio.Cancelled: if not task.done(): @@ -258,8 +181,26 @@ async def run_task( raise -# async def stream_from_task -# pass +# TODO: explicit api for the streaming case where +# we pull from the mem chan in an async generator? +# This ends up looking more like our ``Portal.open_stream_from()`` +# NB: code below is untested. + +# @asynccontextmanager +# async def stream_from_task( + +# target: Callable[Any], +# **kwargs, + +# ) -> AsyncIterator[Any]: + +# from_aoi = await run_task(target, _treat_as_stream=True, **kwargs) + +# with cancel_scope: +# # stream values upward +# async with from_aio: +# async for item in from_aio: +# yield item def run_as_asyncio_guest( From c262b1a3e813c941adcd515929e63d58678e077d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 18 Sep 2021 14:10:21 -0400 Subject: [PATCH 16/53] Always cancel the asyncio task? --- tractor/to_asyncio.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index c7bcc89..aa8c6fc 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -55,7 +55,8 @@ def _run_asyncio_task( or stream the result back to ``trio``. """ - assert current_actor().is_infected_aio() + if not current_actor().is_infected_aio(): + raise RuntimeError("`infect_asyncio` mode is not enabled!?") # ITC (inter task comms) from_trio = asyncio.Queue(qsize) # type: ignore @@ -174,11 +175,11 @@ async def run_task( else: raise - except trio.Cancelled: - if not task.done(): - task.cancel() - - raise + # except trio.Cancelled: + # raise + finally: + # if not task.done(): + task.cancel() # TODO: explicit api for the streaming case where From b376b7cd329e4982e9a0d2c7977a92b2e736075b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Oct 2021 23:14:34 -0400 Subject: [PATCH 17/53] First draft: `.to_asyncio.open_channel_from()` --- tractor/to_asyncio.py | 181 ++++++++++++++++++++++++++---------------- 1 file changed, 113 insertions(+), 68 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index aa8c6fc..936ab3f 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -3,17 +3,19 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. ''' import asyncio +from contextlib import asynccontextmanager as acm import inspect from typing import ( Any, Callable, AsyncIterator, Awaitable, + Optional, ) import trio -from .log import get_logger +from .log import get_logger, get_console_log from ._state import current_actor log = get_logger(__name__) @@ -22,36 +24,30 @@ log = get_logger(__name__) __all__ = ['run_task', 'run_as_asyncio_guest'] -async def run_coro( - to_trio: trio.MemorySendChannel, - coro: Awaitable, -) -> None: - """Await ``coro`` and relay result back to ``trio``. - """ - to_trio.send_nowait(await coro) +# async def consume_asyncgen( +# to_trio: trio.MemorySendChannel, +# coro: AsyncIterator, +# ) -> None: +# """Stream async generator results back to ``trio``. - -async def consume_asyncgen( - to_trio: trio.MemorySendChannel, - coro: AsyncIterator, -) -> None: - """Stream async generator results back to ``trio``. - - ``from_trio`` might eventually be used here for - bidirectional streaming. - """ - async for item in coro: - to_trio.send_nowait(item) +# ``from_trio`` might eventually be used here for +# bidirectional streaming. +# """ +# async for item in coro: +# to_trio.send_nowait(item) def _run_asyncio_task( func: Callable, *, qsize: int = 1, - _treat_as_stream: bool = False, + # _treat_as_stream: bool = False, + provide_channels: bool = False, **kwargs, + ) -> Any: - """Run an ``asyncio`` async function or generator in a task, return + """ + Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. """ @@ -73,6 +69,9 @@ def _run_asyncio_task( # _treat_as_stream = True assert qsize > 1 + if provide_channels: + assert 'to_trio' in args + # allow target func to accept/stream results manually by name if 'to_trio' in args: kwargs['to_trio'] = to_trio @@ -80,25 +79,46 @@ def _run_asyncio_task( if 'from_trio' in args: kwargs['from_trio'] = from_trio - # if 'from_aio' in args: - # kwargs['from_aio'] = from_aio - coro = func(**kwargs) cancel_scope = trio.CancelScope() + aio_task_complete = trio.Event() + aio_err: Optional[BaseException] = None + + async def wait_on_coro_final_result( + to_trio: trio.MemorySendChannel, + coro: Awaitable, + aio_task_complete: trio.Event, + + ) -> None: + """ + Await ``coro`` and relay result back to ``trio``. + + """ + nonlocal aio_err + orig = result = id(coro) + try: + result = await coro + except BaseException as err: + aio_err = err + from_aio._err = aio_err + finally: + aio_task_complete.set() + if result != orig and aio_err is None: + to_trio.send_nowait(result) # start the asyncio task we submitted from trio if inspect.isawaitable(coro): - task = asyncio.create_task(run_coro(to_trio, coro)) + task = asyncio.create_task( + wait_on_coro_final_result(to_trio, coro, aio_task_complete) + ) - elif inspect.isasyncgen(coro): - task = asyncio.create_task(consume_asyncgen(to_trio, coro)) + # elif inspect.isasyncgen(coro): + # task = asyncio.create_task(consume_asyncgen(to_trio, coro)) else: raise TypeError(f"No support for invoking {coro}") - aio_err = None - def cancel_trio(task): """Cancel the calling ``trio`` task on error. """ @@ -110,23 +130,21 @@ def _run_asyncio_task( if aio_err: log.exception(f"asyncio task errorred:\n{aio_err}") - - cancel_scope.cancel() - from_aio._err = aio_err - from_aio.close() + from_aio._err = aio_err + cancel_scope.cancel() + from_aio.close() task.add_done_callback(cancel_trio) - return task, from_aio, to_trio, cancel_scope + return task, from_aio, to_trio, cancel_scope, aio_task_complete async def run_task( - func: Callable, *, qsize: int = 2**10, - _treat_as_stream: bool = False, + # _treat_as_stream: bool = False, **kwargs, ) -> Any: @@ -134,22 +152,9 @@ async def run_task( or stream the result back to ``trio``. """ - # streaming ``asyncio`` task - if _treat_as_stream: - - task, from_aio, to_trio, cs = _run_asyncio_task( - func, - qsize=2**8, - **kwargs, - ) - - # naively expect the mem chan api to do the job - # of handling cross-framework cancellations / errors - return from_aio - # simple async func try: - task, from_aio, to_trio, cs = _run_asyncio_task( + task, from_aio, to_trio, cs, _ = _run_asyncio_task( func, qsize=1, **kwargs, @@ -157,6 +162,8 @@ async def run_task( # return single value with cs: + # naively expect the mem chan api to do the job + # of handling cross-framework cancellations / errors return await from_aio.receive() if cs.cancelled_caught: @@ -165,7 +172,7 @@ async def run_task( raise from_aio._err # Do we need this? - except Exception as err: + except BaseException as err: aio_err = from_aio._err @@ -178,8 +185,8 @@ async def run_task( # except trio.Cancelled: # raise finally: - # if not task.done(): - task.cancel() + if not task.done(): + task.cancel() # TODO: explicit api for the streaming case where @@ -187,21 +194,54 @@ async def run_task( # This ends up looking more like our ``Portal.open_stream_from()`` # NB: code below is untested. -# @asynccontextmanager -# async def stream_from_task( +# async def _start_and_sync_aio_task( +# from_trio, +# to_trio, +# from_aio, -# target: Callable[Any], -# **kwargs, -# ) -> AsyncIterator[Any]: +@acm +async def open_channel_from( -# from_aoi = await run_task(target, _treat_as_stream=True, **kwargs) + target: Callable[[Any, ...], Any], + **kwargs, -# with cancel_scope: -# # stream values upward -# async with from_aio: -# async for item in from_aio: -# yield item +) -> AsyncIterator[Any]: + + try: + task, from_aio, to_trio, cs, aio_task_complete = _run_asyncio_task( + target, + qsize=2**8, + provide_channels=True, + **kwargs, + ) + + with cs: + # sync to "started()" call. + first = await from_aio.receive() + # stream values upward + async with from_aio: + yield first, from_aio + # await aio_task_complete.wait() + + except BaseException as err: + + aio_err = from_aio._err + + if aio_err is not None: + # always raise from any captured asyncio error + raise err from aio_err + else: + raise + + finally: + if cs.cancelled_caught: + # always raise from any captured asyncio error + if from_aio._err: + raise from_aio._err + + if not task.done(): + task.cancel() def run_as_asyncio_guest( @@ -221,16 +261,22 @@ def run_as_asyncio_guest( it. :) + """ + # Disable sigint handling in children? + # import signal + # signal.signal(signal.SIGINT, signal.SIG_IGN) + + get_console_log('runtime') + async def aio_main(trio_main): loop = asyncio.get_running_loop() - trio_done_fut = asyncio.Future() def trio_done_callback(main_outcome): - log.info(f"trio_main finished: {main_outcome!r}") + print(f"trio_main finished: {main_outcome!r}") trio_done_fut.set_result(main_outcome) # start the infection: run trio on the asyncio loop in "guest mode" @@ -241,7 +287,6 @@ def run_as_asyncio_guest( run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) - (await trio_done_fut).unwrap() # might as well if it's installed. From 7a651652796d78a98e354c34bb56599ff07bf15e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Oct 2021 17:30:18 -0400 Subject: [PATCH 18/53] Facepalm, re-raise captured `asyncio` task error --- tractor/to_asyncio.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 936ab3f..d27edc9 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -102,6 +102,7 @@ def _run_asyncio_task( except BaseException as err: aio_err = err from_aio._err = aio_err + raise finally: aio_task_complete.set() if result != orig and aio_err is None: @@ -119,9 +120,11 @@ def _run_asyncio_task( else: raise TypeError(f"No support for invoking {coro}") - def cancel_trio(task): - """Cancel the calling ``trio`` task on error. - """ + def cancel_trio(task) -> None: + ''' + Cancel the calling ``trio`` task on error. + + ''' nonlocal aio_err try: aio_err = task.exception() From 41eddffc2c024934fbe9992f3f9585f234eba21e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Oct 2021 17:33:56 -0400 Subject: [PATCH 19/53] Drop old (and deluded) "streaming" cruft --- tractor/to_asyncio.py | 45 +++++++++---------------------------------- 1 file changed, 9 insertions(+), 36 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index d27edc9..7b2287f 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -24,33 +24,19 @@ log = get_logger(__name__) __all__ = ['run_task', 'run_as_asyncio_guest'] -# async def consume_asyncgen( -# to_trio: trio.MemorySendChannel, -# coro: AsyncIterator, -# ) -> None: -# """Stream async generator results back to ``trio``. - -# ``from_trio`` might eventually be used here for -# bidirectional streaming. -# """ -# async for item in coro: -# to_trio.send_nowait(item) - - def _run_asyncio_task( func: Callable, *, qsize: int = 1, - # _treat_as_stream: bool = False, provide_channels: bool = False, **kwargs, ) -> Any: - """ + ''' Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. - """ + ''' if not current_actor().is_infected_aio(): raise RuntimeError("`infect_asyncio` mode is not enabled!?") @@ -66,7 +52,6 @@ def _run_asyncio_task( # the assumption is that the target async routine accepts the # send channel then it intends to yield more then one return # value otherwise it would just return ;P - # _treat_as_stream = True assert qsize > 1 if provide_channels: @@ -91,10 +76,10 @@ def _run_asyncio_task( aio_task_complete: trio.Event, ) -> None: - """ + ''' Await ``coro`` and relay result back to ``trio``. - """ + ''' nonlocal aio_err orig = result = id(coro) try: @@ -114,9 +99,6 @@ def _run_asyncio_task( wait_on_coro_final_result(to_trio, coro, aio_task_complete) ) - # elif inspect.isasyncgen(coro): - # task = asyncio.create_task(consume_asyncgen(to_trio, coro)) - else: raise TypeError(f"No support for invoking {coro}") @@ -147,7 +129,6 @@ async def run_task( *, qsize: int = 2**10, - # _treat_as_stream: bool = False, **kwargs, ) -> Any: @@ -184,28 +165,19 @@ async def run_task( raise err from aio_err else: raise - - # except trio.Cancelled: - # raise finally: if not task.done(): task.cancel() -# TODO: explicit api for the streaming case where +# TODO: explicitly api for the streaming case where # we pull from the mem chan in an async generator? # This ends up looking more like our ``Portal.open_stream_from()`` # NB: code below is untested. -# async def _start_and_sync_aio_task( -# from_trio, -# to_trio, -# from_aio, - @acm async def open_channel_from( - target: Callable[[Any, ...], Any], **kwargs, @@ -250,7 +222,8 @@ async def open_channel_from( def run_as_asyncio_guest( trio_main: Callable, ) -> None: - """Entry for an "infected ``asyncio`` actor". + ''' + Entry for an "infected ``asyncio`` actor". Uh, oh. :o @@ -265,8 +238,8 @@ def run_as_asyncio_guest( :) - """ - # Disable sigint handling in children? + ''' + # Disable sigint handling in children? (nawp) # import signal # signal.signal(signal.SIGINT, signal.SIG_IGN) From 299e4192b088e02e5443f620acb62932ecd7bc30 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 10:42:43 -0400 Subject: [PATCH 20/53] Plan asyncio test set --- tests/test_docs_examples.py | 5 +++-- tests/test_infected_asyncio.py | 39 +++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 5f47419..af17ff1 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -1,6 +1,7 @@ -""" +''' Let's make sure them docs work yah? -""" + +''' from contextlib import contextmanager import itertools import os diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index def1776..2beac71 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -1,8 +1,13 @@ +''' +The most hipster way to force SC onto the stdlib's "async". + +''' import asyncio import pytest import tractor + async def sleep_and_err(): await asyncio.sleep(0.1) assert 0 @@ -14,7 +19,7 @@ async def asyncio_actor(): await tractor.to_asyncio.run_task(sleep_and_err) -def test_infected_simple_error(arb_addr): +def test_aio_simple_error(arb_addr): async def main(): async with tractor.open_nursery() as n: @@ -22,3 +27,35 @@ def test_infected_simple_error(arb_addr): with pytest.raises(tractor.RemoteActorError) as excinfo: tractor.run(main, arbiter_addr=arb_addr) + + +def test_aio_cancel_from_trio(arb_addr): + ... + + +def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): + ... + + +def test_trio_cancels_aio(arb_addr): + ... + + +def test_trio_error_cancels_aio(arb_addr): + ... + + +def test_basic_interloop_channel_stream(arb_addr): + ... + + +def test_basic_interloop_channel_stream(arb_addr): + ... + + +def test_trio_cancels_and_channel_exits(arb_addr): + ... + + +def test_aio_errors_and_channel_propagates(arb_addr): + ... From 446feff1724cbaeaf3788a1213f490db42b8a82a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 10:43:14 -0400 Subject: [PATCH 21/53] Clean type imports --- tractor/_spawn.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index b8c63fb..ead91df 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -22,10 +22,10 @@ import sys import multiprocessing as mp import platform from typing import ( - Any, Dict, Optional, Union, Callable, + Any, Dict, Optional, Callable, TypeVar, ) -from collections.abc import Awaitable, Coroutine +from collections.abc import Awaitable import trio from trio_typing import TaskStatus From 06fa650ed0b9d21c6120c1753ba8fea29d0e9458 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 7 Nov 2021 16:44:41 -0500 Subject: [PATCH 22/53] Drop runtime logging for asyncio mode --- tractor/to_asyncio.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 7b2287f..62394ee 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -220,7 +220,9 @@ async def open_channel_from( def run_as_asyncio_guest( + trio_main: Callable, + ) -> None: ''' Entry for an "infected ``asyncio`` actor". @@ -243,8 +245,6 @@ def run_as_asyncio_guest( # import signal # signal.signal(signal.SIGINT, signal.SIG_IGN) - get_console_log('runtime') - async def aio_main(trio_main): loop = asyncio.get_running_loop() @@ -273,4 +273,4 @@ def run_as_asyncio_guest( except ImportError: pass - asyncio.run(aio_main(trio_main)) + return asyncio.run(aio_main(trio_main)) From 0ab5e5cadd10ee4592b495c7afcac15348eb5887 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 7 Nov 2021 16:45:00 -0500 Subject: [PATCH 23/53] Fill out nursery docstring --- tractor/_supervise.py | 40 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 5725338..f2d907d 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -45,8 +45,33 @@ _default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) class ActorNursery: - """Spawn scoped subprocess actors. - """ + ''' + The fundamental actor supervision construct: spawn and manage + explicit lifetime and capability restricted, bootstrapped, + ``trio.run()`` scheduled sub-processes. + + Though the concept of a "process nursery" is different in complexity + and slightly different in semantics then a tradtional single + threaded task nursery, much of the interface is the same. New + processes each require a top level "parent" or "root" task which is + itself no different then any task started by a tradtional + ``trio.Nursery``. The main difference is that each "actor" (a + process + ``trio.run()``) contains a full, paralell executing + ``trio``-task-tree. The following super powers ensue: + + - starting tasks in a child actor are completely independent of + tasks started in the current process. They execute in *parallel* + relative to tasks in the current process and are scheduled by their + own actor's ``trio`` run loop. + - tasks scheduled in a remote process still maintain an SC protocol + across memory boundaries using a so called "structured concurrency + dialogue protocol" which ensures task-hierarchy-lifetimes are linked. + - remote tasks (in another actor) can fail and relay failure back to + the caller task (in some other actor) via a seralized + ``RemoteActorError`` which means no zombie process or RPC + initiated task can ever go off on its own. + + ''' def __init__( self, actor: Actor, @@ -141,15 +166,19 @@ class ActorNursery: async def run_in_actor( self, + fn: typing.Callable, *, + name: Optional[str] = None, bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: Optional[List[str]] = None, enable_modules: List[str] = None, loglevel: str = None, # set log level per subactor infect_asyncio: bool = False, + **kwargs, # explicit args to ``fn`` + ) -> Portal: """Spawn a new actor, run a lone task, then terminate the actor and return its result. @@ -412,8 +441,10 @@ async def _open_and_supervise_one_cancels_all_nursery( @asynccontextmanager async def open_nursery( **kwargs, + ) -> typing.AsyncGenerator[ActorNursery, None]: - """Create and yield a new ``ActorNursery`` to be used for spawning + ''' + Create and yield a new ``ActorNursery`` to be used for spawning structured concurrent subactors. When an actor is spawned a new trio task is started which @@ -425,7 +456,8 @@ async def open_nursery( close it. It turns out this approach is probably more correct anyway since it is more clear from the following nested nurseries which cancellation scopes correspond to each spawned subactor set. - """ + + ''' implicit_runtime = False actor = current_actor(err_on_no_runtime=False) From 56357242e9844b856725eb79be38371f073613e1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 7 Nov 2021 17:05:40 -0500 Subject: [PATCH 24/53] Add a `Portal.cancel_actor()` test --- tests/test_infected_asyncio.py | 83 ++++++++++++++++++++++++++++------ tractor/_actor.py | 1 + 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 2beac71..ad7758d 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -2,10 +2,15 @@ The most hipster way to force SC onto the stdlib's "async". ''' +from typing import Optional import asyncio +import builtins +import importlib import pytest +import trio import tractor +from tractor import RemoteActorError async def sleep_and_err(): @@ -13,24 +18,78 @@ async def sleep_and_err(): assert 0 -async def asyncio_actor(): - assert tractor.current_actor().is_infected_aio() +async def sleep_forever(): + await asyncio.sleep(float('inf')) - await tractor.to_asyncio.run_task(sleep_and_err) + +async def asyncio_actor( + + target: str, + expect_err: Optional[Exception] = None + +) -> None: + + assert tractor.current_actor().is_infected_aio() + target = globals()[target] + + if '.' in expect_err: + modpath, _, name = expect_err.rpartition('.') + mod = importlib.import_module(modpath) + error = getattr(mod, name) + error = builtins.__dict__.get(expect_err) + + try: + # spawn an ``asyncio`` task to run a func and return result + await tractor.to_asyncio.run_task(target) + except Exception as err: + if expect_err: + assert isinstance(err, error) + + raise def test_aio_simple_error(arb_addr): + ''' + Verify a simple remote asyncio error propagates back through trio + to the parent actor. + + ''' + async def main(): + async with tractor.open_nursery( + arbiter_addr=arb_addr + ) as n: + await n.run_in_actor( + asyncio_actor, + target='sleep_and_err', + expect_err='AssertionError', + infect_asyncio=True, + ) + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + err = excinfo.value + assert isinstance(err, RemoteActorError) + assert err.type == AssertionError + + +def test_tractor_cancels_aio(arb_addr): + ''' + Verify we can cancel a spawned asyncio task gracefully. + + ''' async def main(): async with tractor.open_nursery() as n: - await n.run_in_actor(asyncio_actor, infected_asyncio=True) + portal = await n.run_in_actor( + asyncio_actor, + target='sleep_forever', + expect_err='asyncio.CancelledError', + infect_asyncio=True, + ) + await portal.cancel_actor() - with pytest.raises(tractor.RemoteActorError) as excinfo: - tractor.run(main, arbiter_addr=arb_addr) - - -def test_aio_cancel_from_trio(arb_addr): - ... + trio.run(main) def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): @@ -49,10 +108,6 @@ def test_basic_interloop_channel_stream(arb_addr): ... -def test_basic_interloop_channel_stream(arb_addr): - ... - - def test_trio_cancels_and_channel_exits(arb_addr): ... diff --git a/tractor/_actor.py b/tractor/_actor.py index 47806be..8e5d548 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -475,6 +475,7 @@ class Actor: self._mods[modpath] = mod if modpath == '__main__': self._mods['__mp_main__'] = mod + except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later From 1114b6980e1232122841970143f71927f13d4731 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 Nov 2021 13:20:04 -0500 Subject: [PATCH 25/53] Adjust linked-loop-task tear down sequence Close the mem chan before cancelling the `trio` task in order to ensure we retrieve whatever error is shuttled from `asyncio` before the channel read is potentially cancelled (previously a race?). Handle `asyncio.CancelledError` specially such that we raise it directly (instead of `raise aio_cancelled from other_err`) since it *is* the source error in the case where the cancellation is `asyncio` internal. --- tractor/to_asyncio.py | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 62394ee..5607031 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -3,6 +3,7 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. ''' import asyncio +from asyncio.exceptions import CancelledError from contextlib import asynccontextmanager as acm import inspect from typing import ( @@ -15,7 +16,7 @@ from typing import ( import trio -from .log import get_logger, get_console_log +from .log import get_logger from ._state import current_actor log = get_logger(__name__) @@ -110,14 +111,16 @@ def _run_asyncio_task( nonlocal aio_err try: aio_err = task.exception() - except asyncio.CancelledError as cerr: + except CancelledError as cerr: + log.exception("infected task was cancelled") + # raise aio_err = cerr if aio_err: - log.exception(f"asyncio task errorred:\n{aio_err}") + log.exception(f"infected task errorred with {type(aio_err)}") from_aio._err = aio_err - cancel_scope.cancel() from_aio.close() + cancel_scope.cancel() task.add_done_callback(cancel_trio) @@ -132,10 +135,11 @@ async def run_task( **kwargs, ) -> Any: - """Run an ``asyncio`` async function or generator in a task, return + ''' + Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. - """ + ''' # simple async func try: task, from_aio, to_trio, cs, _ = _run_asyncio_task( @@ -151,24 +155,36 @@ async def run_task( return await from_aio.receive() if cs.cancelled_caught: + aio_err = from_aio._err + # always raise from any captured asyncio error - if from_aio._err: - raise from_aio._err + if aio_err: + raise aio_err # Do we need this? - except BaseException as err: + except ( + Exception, + CancelledError, + ) as err: aio_err = from_aio._err - if aio_err is not None: + if ( + aio_err is not None and + type(aio_err) != CancelledError + ): # always raise from any captured asyncio error raise err from aio_err else: raise + finally: if not task.done(): task.cancel() + # if task.cancelled(): + # ... do what .. + # TODO: explicitly api for the streaming case where # we pull from the mem chan in an async generator? From 04c0eda69dee09df0135c62d62bad1bcbedb9112 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 Nov 2021 13:32:42 -0500 Subject: [PATCH 26/53] Add an `asyncio`-internal cancel test Verify that if the `asyncio` side task cancels (itself) that we raise that `asyncio.CancelledError` on the `trio` side. In the case where `trio` initiated the cancel whether or not the `asyncio` side ended up raising `CancelledError` doesn't really matter to us as long as the far task did indeed terminate. --- tests/test_infected_asyncio.py | 46 ++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index ad7758d..6cb742a 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -35,17 +35,20 @@ async def asyncio_actor( if '.' in expect_err: modpath, _, name = expect_err.rpartition('.') mod = importlib.import_module(modpath) - error = getattr(mod, name) - error = builtins.__dict__.get(expect_err) + error_type = getattr(mod, name) + + else: # toplevel builtin error type + error_type = builtins.__dict__.get(expect_err) try: # spawn an ``asyncio`` task to run a func and return result await tractor.to_asyncio.run_task(target) - except Exception as err: - if expect_err: - assert isinstance(err, error) - raise + except BaseException as err: + if expect_err: + assert isinstance(err, error_type) + + raise err def test_aio_simple_error(arb_addr): @@ -84,16 +87,43 @@ def test_tractor_cancels_aio(arb_addr): portal = await n.run_in_actor( asyncio_actor, target='sleep_forever', - expect_err='asyncio.CancelledError', + expect_err='trio.Cancelled', infect_asyncio=True, ) + # cancel the entire remote runtime await portal.cancel_actor() trio.run(main) +async def aio_cancel(): + ''''Cancel urself boi. + + ''' + await asyncio.sleep(0.5) + task = asyncio.current_task() + + # cancel and enter sleep + task.cancel() + await sleep_forever() + + def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): - ... + + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + asyncio_actor, + target='aio_cancel', + expect_err='asyncio.CancelledError', + infect_asyncio=True, + ) + + # with trio.CancelScope(shield=True): + await portal.result() + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) def test_trio_cancels_aio(arb_addr): From 870466471983720b4db8fd94e420a83b0f44ee01 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 Nov 2021 18:56:23 -0500 Subject: [PATCH 27/53] Reverse the order for asyncio cancelleds? I dunno why --- tractor/to_asyncio.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 5607031..7409d7c 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -72,6 +72,7 @@ def _run_asyncio_task( aio_err: Optional[BaseException] = None async def wait_on_coro_final_result( + to_trio: trio.MemorySendChannel, coro: Awaitable, aio_task_complete: trio.Event, @@ -89,6 +90,7 @@ def _run_asyncio_task( aio_err = err from_aio._err = aio_err raise + finally: aio_task_complete.set() if result != orig and aio_err is None: @@ -112,15 +114,17 @@ def _run_asyncio_task( try: aio_err = task.exception() except CancelledError as cerr: - log.exception("infected task was cancelled") - # raise - aio_err = cerr - - if aio_err: - log.exception(f"infected task errorred with {type(aio_err)}") - from_aio._err = aio_err + log.cancel("infected task was cancelled") + from_aio._err = cerr from_aio.close() cancel_scope.cancel() + else: + if aio_err is not None: + log.exception(f"infected task errorred:") + from_aio._err = aio_err + # order is opposite here + cancel_scope.cancel() + from_aio.close() task.add_done_callback(cancel_trio) From c19123b588c211fedca269d698f83aad47186478 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 18 Nov 2021 09:35:59 -0500 Subject: [PATCH 28/53] Add trio-cancels-anursery-cancels-aio test --- tests/test_infected_asyncio.py | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 6cb742a..9325a05 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -96,6 +96,31 @@ def test_tractor_cancels_aio(arb_addr): trio.run(main) +def test_trio_cancels_aio(arb_addr): + ''' + Much like the above test with ``tractor.Portal.cancel_actor()`` + except we just use a standard ``trio`` cancellation api. + + ''' + async def main(): + + with trio.move_on_after(1): + # cancel the nursery shortly after boot + + async with tractor.open_nursery() as n: + # debug_mode=True + # ) as n: + portal = await n.run_in_actor( + asyncio_actor, + target='sleep_forever', + expect_err='trio.Cancelled', + infect_asyncio=True, + ) + + trio.run(main) + + + async def aio_cancel(): ''''Cancel urself boi. @@ -126,10 +151,6 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): trio.run(main) -def test_trio_cancels_aio(arb_addr): - ... - - def test_trio_error_cancels_aio(arb_addr): ... From e815f766f6c44dafa414bc9dd8b14da8dcd3e916 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 18 Nov 2021 14:01:13 -0500 Subject: [PATCH 29/53] Add a cancelled-from-remote-trio-task case --- tests/test_infected_asyncio.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 9325a05..73d92fa 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -22,6 +22,31 @@ async def sleep_forever(): await asyncio.sleep(float('inf')) +async def trio_cancels_single_aio_task(): + + # spawn an ``asyncio`` task to run a func and return result + with trio.move_on_after(.2): + await tractor.to_asyncio.run_task(sleep_forever) + + +def test_trio_cancels_aio_on_actor_side(arb_addr): + ''' + Spawn an infected actor that is cancelled by the ``trio`` side + task using std cancel scope apis. + + ''' + async def main(): + async with tractor.open_nursery( + arbiter_addr=arb_addr + ) as n: + await n.run_in_actor( + trio_cancels_single_aio_task, + infect_asyncio=True, + ) + + trio.run(main) + + async def asyncio_actor( target: str, From e6687bcdc4992e79c857076c228ea391c4123e3b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 Nov 2021 10:31:42 -0500 Subject: [PATCH 30/53] Serious-ify doc string --- tractor/to_asyncio.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 7409d7c..6013fb1 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -247,23 +247,24 @@ def run_as_asyncio_guest( ''' Entry for an "infected ``asyncio`` actor". - Uh, oh. :o - - It looks like your event loop has caught a case of the ``trio``s. - - :() - - Don't worry, we've heard you'll barely notice. You might hallucinate - a few more propagating errors and feel like your digestion has - slowed but if anything get's too bad your parents will know about - it. - - :) + Entrypoint for a Python process which starts the ``asyncio`` event + loop and runs ``trio`` in guest mode resulting in a system where + ``trio`` tasks can control ``asyncio`` tasks whilst maintaining + SC semantics. ''' - # Disable sigint handling in children? (nawp) - # import signal - # signal.signal(signal.SIGINT, signal.SIG_IGN) + # Uh, oh. :o + + # It looks like your event loop has caught a case of the ``trio``s. + + # :() + + # Don't worry, we've heard you'll barely notice. You might hallucinate + # a few more propagating errors and feel like your digestion has + # slowed but if anything get's too bad your parents will know about + # it. + + # :) async def aio_main(trio_main): From 9bc94b5ccc792a339915e85d8d0ad2e8942508bd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Nov 2021 12:43:54 -0500 Subject: [PATCH 31/53] Factor error translation into a ctx mngr Pull the common `asyncio` -> `trio` error translation logic into a common context manager and don't expect a final result to be captured when using `open_channel_from()` since it's a manager interface and it would be clunky to try and deliver some "final result" after exit. --- tractor/to_asyncio.py | 133 ++++++++++++++++++++++-------------------- 1 file changed, 71 insertions(+), 62 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 6013fb1..5d168f7 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -92,14 +92,28 @@ def _run_asyncio_task( raise finally: - aio_task_complete.set() - if result != orig and aio_err is None: + if ( + result != orig and + aio_err is None and + + # in the ``open_channel_from()`` case we don't + # relay through the "return value". + not provide_channels + ): to_trio.send_nowait(result) + to_trio.close() + from_aio.close() + aio_task_complete.set() + # start the asyncio task we submitted from trio if inspect.isawaitable(coro): task = asyncio.create_task( - wait_on_coro_final_result(to_trio, coro, aio_task_complete) + wait_on_coro_final_result( + to_trio, + coro, + aio_task_complete + ) ) else: @@ -120,7 +134,7 @@ def _run_asyncio_task( cancel_scope.cancel() else: if aio_err is not None: - log.exception(f"infected task errorred:") + log.exception("infected task errorred:") from_aio._err = aio_err # order is opposite here cancel_scope.cancel() @@ -131,41 +145,20 @@ def _run_asyncio_task( return task, from_aio, to_trio, cancel_scope, aio_task_complete -async def run_task( - func: Callable, - *, +@acm +async def translate_aio_errors( - qsize: int = 2**10, - **kwargs, + from_aio: trio.MemoryReceiveChannel, + task: asyncio.Task, -) -> Any: +) -> None: ''' - Run an ``asyncio`` async function or generator in a task, return - or stream the result back to ``trio``. + Error handling context around ``asyncio`` task spawns which + appropriately translates errors and cancels into ``trio`` land. ''' - # simple async func try: - task, from_aio, to_trio, cs, _ = _run_asyncio_task( - func, - qsize=1, - **kwargs, - ) - - # return single value - with cs: - # naively expect the mem chan api to do the job - # of handling cross-framework cancellations / errors - return await from_aio.receive() - - if cs.cancelled_caught: - aio_err = from_aio._err - - # always raise from any captured asyncio error - if aio_err: - raise aio_err - - # Do we need this? + yield except ( Exception, CancelledError, @@ -190,6 +183,41 @@ async def run_task( # ... do what .. +async def run_task( + func: Callable, + *, + + qsize: int = 2**10, + **kwargs, + +) -> Any: + ''' + Run an ``asyncio`` async function or generator in a task, return + or stream the result back to ``trio``. + + ''' + # simple async func + task, from_aio, to_trio, cs, _ = _run_asyncio_task( + func, + qsize=1, + **kwargs, + ) + async with translate_aio_errors(from_aio, task): + + # return single value + with cs: + # naively expect the mem chan api to do the job + # of handling cross-framework cancellations / errors + return await from_aio.receive() + + if cs.cancelled_caught: + aio_err = from_aio._err + + # always raise from any captured asyncio error + if aio_err: + raise aio_err + + # TODO: explicitly api for the streaming case where # we pull from the mem chan in an async generator? # This ends up looking more like our ``Portal.open_stream_from()`` @@ -203,40 +231,21 @@ async def open_channel_from( ) -> AsyncIterator[Any]: - try: - task, from_aio, to_trio, cs, aio_task_complete = _run_asyncio_task( - target, - qsize=2**8, - provide_channels=True, - **kwargs, - ) - + task, from_aio, to_trio, cs, aio_task_complete = _run_asyncio_task( + target, + qsize=2**8, + provide_channels=True, + **kwargs, + ) + async with translate_aio_errors(from_aio, task): with cs: # sync to "started()" call. first = await from_aio.receive() + # stream values upward async with from_aio: yield first, from_aio - # await aio_task_complete.wait() - - except BaseException as err: - - aio_err = from_aio._err - - if aio_err is not None: - # always raise from any captured asyncio error - raise err from aio_err - else: - raise - - finally: - if cs.cancelled_caught: - # always raise from any captured asyncio error - if from_aio._err: - raise from_aio._err - - if not task.done(): - task.cancel() + await aio_task_complete.wait() def run_as_asyncio_guest( @@ -284,7 +293,7 @@ def run_as_asyncio_guest( run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) - (await trio_done_fut).unwrap() + return (await trio_done_fut).unwrap() # might as well if it's installed. try: From d27ddb7bbb36b2aeb584426c4aa5532c32fb27d0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Nov 2021 12:47:03 -0500 Subject: [PATCH 32/53] Add a basic `open_channel_from()` streaming test --- tests/test_infected_asyncio.py | 64 +++++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 73d92fa..c219a4b 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -2,7 +2,7 @@ The most hipster way to force SC onto the stdlib's "async". ''' -from typing import Optional +from typing import Optional, Iterable import asyncio import builtins import importlib @@ -10,6 +10,7 @@ import importlib import pytest import trio import tractor +from tractor import to_asyncio from tractor import RemoteActorError @@ -176,17 +177,64 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): trio.run(main) -def test_trio_error_cancels_aio(arb_addr): - ... +# TODO: +async def no_to_trio_in_args(): + pass + + +async def push_from_aio_task( + + sequence: Iterable, + to_trio: trio.abc.SendChannel, + +) -> None: + for i in range(100): + print(f'asyncio sending {i}') + to_trio.send_nowait(i) + await asyncio.sleep(0.001) + + print(f'asyncio streamer complete!') + + +async def stream_from_aio(): + seq = range(100) + expect = list(seq) + + async with to_asyncio.open_channel_from( + push_from_aio_task, + sequence=seq, + ) as (first, chan): + + pulled = [first] + async for value in chan: + print(f'trio received {value}') + pulled.append(value) + + assert pulled == expect + + print('trio guest mode task completed!') def test_basic_interloop_channel_stream(arb_addr): - ... + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + infect_asyncio=True, + ) + await portal.result() + + trio.run(main) -def test_trio_cancels_and_channel_exits(arb_addr): - ... + +# def test_trio_error_cancels_intertask_chan(arb_addr): +# ... -def test_aio_errors_and_channel_propagates(arb_addr): - ... +# def test_trio_cancels_and_channel_exits(arb_addr): +# ... + + +# def test_aio_errors_and_channel_propagates(arb_addr): +# ... From 44d0e9fc3273669b65995b7b19d3eb9c231fb908 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Nov 2021 13:08:00 -0500 Subject: [PATCH 33/53] Add a `LinkedTaskChannel` for synced inter-loop-streaming Wraps the pairs of underlying `trio` mem chans and the `asyncio.Queue` with this new composite which will be delivered from `open_channel_from()`. This allows for both sending and receiving values from the `asyncio` task (2 way msg passing) as well controls for cancelling or waiting on the task. Factor `asyncio` translation and re-raising logic into a new closure which is run on both `trio` side error handling as well as on normal termination to avoid missing `asyncio` errors even when `trio` task cancellation is handled first. Only close the `trio` mem chans on `trio` task termination *iff* the task was spawned using `open_channel_from()`: - on `open_channel_from()` exit, mem chan closure is the desired semantic - on `run_task()` we normally only return a single value or error and if the channel is closed before the error is raised we may propagate a `trio.EndOfChannel` instead of the desired underlying `asyncio` task's error --- tractor/to_asyncio.py | 104 ++++++++++++++++++++++++++++++++---------- 1 file changed, 80 insertions(+), 24 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 5d168f7..6132303 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -5,6 +5,7 @@ Infection apis for ``asyncio`` loops running ``trio`` using guest mode. import asyncio from asyncio.exceptions import CancelledError from contextlib import asynccontextmanager as acm +from dataclasses import dataclass import inspect from typing import ( Any, @@ -41,7 +42,8 @@ def _run_asyncio_task( if not current_actor().is_infected_aio(): raise RuntimeError("`infect_asyncio` mode is not enabled!?") - # ITC (inter task comms) + # ITC (inter task comms), these channel/queue names are mostly from + # ``asyncio``'s perspective. from_trio = asyncio.Queue(qsize) # type: ignore to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore @@ -89,6 +91,8 @@ def _run_asyncio_task( except BaseException as err: aio_err = err from_aio._err = aio_err + to_trio.close() + from_aio.close() raise finally: @@ -102,8 +106,12 @@ def _run_asyncio_task( ): to_trio.send_nowait(result) - to_trio.close() - from_aio.close() + # if the task was spawned using ``open_channel_from()`` + # then we close the channels on exit. + if provide_channels: + to_trio.close() + from_aio.close() + aio_task_complete.set() # start the asyncio task we submitted from trio @@ -134,15 +142,17 @@ def _run_asyncio_task( cancel_scope.cancel() else: if aio_err is not None: + aio_err.with_traceback(aio_err.__traceback__) log.exception("infected task errorred:") from_aio._err = aio_err - # order is opposite here + + # NOTE: order is opposite here cancel_scope.cancel() from_aio.close() task.add_done_callback(cancel_trio) - return task, from_aio, to_trio, cancel_scope, aio_task_complete + return task, from_aio, to_trio, from_trio, cancel_scope, aio_task_complete @acm @@ -157,28 +167,32 @@ async def translate_aio_errors( appropriately translates errors and cancels into ``trio`` land. ''' + err: Optional[Exception] = None + aio_err: Optional[Exception] = None + + def maybe_raise_aio_err(err: Exception): + aio_err = from_aio._err + if ( + aio_err is not None and + type(aio_err) != CancelledError + ): + # always raise from any captured asyncio error + raise aio_err from err + try: yield except ( Exception, CancelledError, ) as err: - - aio_err = from_aio._err - - if ( - aio_err is not None and - type(aio_err) != CancelledError - ): - # always raise from any captured asyncio error - raise err from aio_err - else: - raise + maybe_raise_aio_err(err) + raise finally: - if not task.done(): + if not task.done() and aio_err: task.cancel() + maybe_raise_aio_err(err) # if task.cancelled(): # ... do what .. @@ -197,7 +211,7 @@ async def run_task( ''' # simple async func - task, from_aio, to_trio, cs, _ = _run_asyncio_task( + task, from_aio, to_trio, aio_q, cs, _ = _run_asyncio_task( func, qsize=1, **kwargs, @@ -224,28 +238,70 @@ async def run_task( # NB: code below is untested. +@dataclass +class LinkedTaskChannel(trio.abc.Channel): + ''' + A "linked task channel" which allows for two-way synchronized msg + passing between a ``trio``-in-guest-mode task and an ``asyncio`` + task. + + ''' + _aio_task: asyncio.Task + _to_aio: asyncio.Queue + _from_aio: trio.MemoryReceiveChannel + _aio_task_complete: trio.Event + + async def aclose(self) -> None: + self._from_aio.close() + + async def receive(self) -> Any: + async with translate_aio_errors(self._from_aio, self._aio_task): + return await self._from_aio.receive() + + async def wait_ayncio_complete(self) -> None: + await self._aio_task_complete.wait() + + def cancel_asyncio_task(self) -> None: + self._aio_task.cancel() + + async def send(self, item: Any) -> None: + ''' + Send a value through to the asyncio task presuming + it defines a ``from_trio`` argument, if it does not + this method will raise an error. + + ''' + self._to_aio.put_nowait(item) + + @acm async def open_channel_from( + target: Callable[[Any, ...], Any], **kwargs, ) -> AsyncIterator[Any]: + ''' + Open an inter-loop linked task channel for streaming between a target + spawned ``asyncio`` task and ``trio``. - task, from_aio, to_trio, cs, aio_task_complete = _run_asyncio_task( + ''' + task, from_aio, to_trio, aio_q, cs, aio_task_complete = _run_asyncio_task( target, qsize=2**8, provide_channels=True, **kwargs, ) - async with translate_aio_errors(from_aio, task): - with cs: - # sync to "started()" call. + chan = LinkedTaskChannel(task, aio_q, from_aio, aio_task_complete) + with cs: + async with translate_aio_errors(from_aio, task): + # sync to a "started()"-like first delivered value from the + # ``asyncio`` task. first = await from_aio.receive() # stream values upward async with from_aio: - yield first, from_aio - await aio_task_complete.wait() + yield first, chan def run_as_asyncio_guest( From ad2567dd735dc462e0812dfe6d5ee74ff17adb81 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Nov 2021 13:27:16 -0500 Subject: [PATCH 34/53] Add first set of interloop streaming tests --- tests/test_infected_asyncio.py | 141 ++++++++++++++++++++++++++------- 1 file changed, 113 insertions(+), 28 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index c219a4b..78003f7 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -134,8 +134,6 @@ def test_trio_cancels_aio(arb_addr): # cancel the nursery shortly after boot async with tractor.open_nursery() as n: - # debug_mode=True - # ) as n: portal = await n.run_in_actor( asyncio_actor, target='sleep_forever', @@ -177,7 +175,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): trio.run(main) -# TODO: +# TODO: verify open_channel_from will fail on this.. async def no_to_trio_in_args(): pass @@ -186,33 +184,77 @@ async def push_from_aio_task( sequence: Iterable, to_trio: trio.abc.SendChannel, + expect_cancel: False, + fail_early: bool, ) -> None: - for i in range(100): - print(f'asyncio sending {i}') - to_trio.send_nowait(i) - await asyncio.sleep(0.001) - print(f'asyncio streamer complete!') + try: + # sync caller ctx manager + to_trio.send_nowait(True) + + for i in sequence: + print(f'asyncio sending {i}') + to_trio.send_nowait(i) + await asyncio.sleep(0.001) + + if i == 50 and fail_early: + raise Exception + + print(f'asyncio streamer complete!') + + except asyncio.CancelledError: + if not expect_cancel: + pytest.fail("aio task was cancelled unexpectedly") + raise + else: + if expect_cancel: + pytest.fail("aio task wasn't cancelled as expected!?") -async def stream_from_aio(): +async def stream_from_aio( + + exit_early: bool = False, + raise_err: bool = False, + aio_raise_err: bool = False, + +) -> None: seq = range(100) expect = list(seq) - async with to_asyncio.open_channel_from( - push_from_aio_task, - sequence=seq, - ) as (first, chan): + try: + pulled = [] - pulled = [first] - async for value in chan: - print(f'trio received {value}') - pulled.append(value) + async with to_asyncio.open_channel_from( + push_from_aio_task, + sequence=seq, + expect_cancel=raise_err or exit_early, + fail_early=aio_raise_err, + ) as (first, chan): - assert pulled == expect + assert first is True - print('trio guest mode task completed!') + async for value in chan: + print(f'trio received {value}') + pulled.append(value) + + if value == 50: + if raise_err: + raise Exception + elif exit_early: + break + finally: + + if ( + not raise_err and + not exit_early and + not aio_raise_err + ): + assert pulled == expect + else: + assert pulled == expect[:51] + + print('trio guest mode task completed!') def test_basic_interloop_channel_stream(arb_addr): @@ -227,14 +269,57 @@ def test_basic_interloop_channel_stream(arb_addr): trio.run(main) +# TODO: parametrize the above test and avoid the duplication here? +def test_trio_error_cancels_intertask_chan(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + raise_err=True, + infect_asyncio=True, + ) + # should trigger remote actor error + await portal.result() -# def test_trio_error_cancels_intertask_chan(arb_addr): -# ... - - -# def test_trio_cancels_and_channel_exits(arb_addr): -# ... - - -# def test_aio_errors_and_channel_propagates(arb_addr): + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure boxed error is correct + assert excinfo.value.type == Exception + + +def test_trio_closes_early_and_channel_exits(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + exit_early=True, + infect_asyncio=True, + ) + # should trigger remote actor error + await portal.result() + + # should be a quiet exit on a simple channel exit + trio.run(main) + + +def test_aio_errors_and_channel_propagates_and_closes(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + aio_raise_err=True, + infect_asyncio=True, + ) + # should trigger remote actor error + await portal.result() + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure boxed error is correct + assert excinfo.value.type == Exception + + +# def test_2way_reqresp(arb_addr): # ... From c48c68c0bcaecca329d15b6559fb22a92581fc80 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Nov 2021 13:29:11 -0500 Subject: [PATCH 35/53] Flip doc strings to my preferred format --- tractor/trionics/_broadcast.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 77ab6d0..6c04895 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -47,8 +47,9 @@ class AsyncReceiver( Protocol, Generic[ReceiveType], ): - '''An async receivable duck-type that quacks much like trio's - ``trio.abc.ReceieveChannel``. + ''' + An async receivable duck-type that quacks much like trio's + ``trio.abc.ReceiveChannel``. ''' @abstractmethod @@ -78,7 +79,8 @@ class AsyncReceiver( class Lagged(trio.TooSlowError): - '''Subscribed consumer task was too slow and was overrun + ''' + Subscribed consumer task was too slow and was overrun by the fastest consumer-producer pair. ''' @@ -86,7 +88,8 @@ class Lagged(trio.TooSlowError): @dataclass class BroadcastState: - '''Common state to all receivers of a broadcast. + ''' + Common state to all receivers of a broadcast. ''' queue: deque @@ -111,7 +114,8 @@ class BroadcastState: class BroadcastReceiver(ReceiveChannel): - '''A memory receive channel broadcaster which is non-lossy for the + ''' + A memory receive channel broadcaster which is non-lossy for the fastest consumer. Additional consumer tasks can receive all produced values by registering From 5f4094691df01765bc00e336ba767ac11126a8ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 23 Nov 2021 16:19:19 -0500 Subject: [PATCH 36/53] Re-wrap and raise `asyncio.CancelledError` For whatever reason `trio` seems to be swallowing this exception when raised in the `trio` task so instead wrap it in our own non-base exception type: `AsyncioCancelled` and raise that when the `asyncio` task cancels itself internally using `raise from ` style. Further don't bother cancelling the `trio` task (via cancel scope) since we we can just use the recv mem chan closure error as a signal and explicitly lookup any set asyncio error. --- tractor/_exceptions.py | 9 +++ tractor/to_asyncio.py | 127 +++++++++++++++++++++++------------------ 2 files changed, 79 insertions(+), 57 deletions(-) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 95d7533..f3beb5a 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -82,6 +82,15 @@ class StreamOverrun(trio.TooSlowError): "This stream was overrun by sender" +class AsyncioCancelled(Exception): + ''' + Asyncio cancelled translation (non-base) error + for use with the ``to_asyncio`` module + to be raised in the ``trio`` side task + + ''' + + def pack_error( exc: BaseException, tb=None, diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 6132303..4e33e68 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -19,6 +19,7 @@ import trio from .log import get_logger from ._state import current_actor +from ._exceptions import AsyncioCancelled log = get_logger(__name__) @@ -91,11 +92,9 @@ def _run_asyncio_task( except BaseException as err: aio_err = err from_aio._err = aio_err - to_trio.close() - from_aio.close() raise - finally: + else: if ( result != orig and aio_err is None and @@ -106,11 +105,13 @@ def _run_asyncio_task( ): to_trio.send_nowait(result) + finally: # if the task was spawned using ``open_channel_from()`` # then we close the channels on exit. if provide_channels: + # only close the sender side which will relay + # a ``trio.EndOfChannel`` to the trio (consumer) side. to_trio.close() - from_aio.close() aio_task_complete.set() @@ -127,28 +128,27 @@ def _run_asyncio_task( else: raise TypeError(f"No support for invoking {coro}") - def cancel_trio(task) -> None: + def cancel_trio(task: asyncio.Task) -> None: ''' Cancel the calling ``trio`` task on error. ''' nonlocal aio_err - try: - aio_err = task.exception() - except CancelledError as cerr: - log.cancel("infected task was cancelled") - from_aio._err = cerr - from_aio.close() - cancel_scope.cancel() - else: - if aio_err is not None: + aio_err = from_aio._err + + if aio_err is not None: + if type(aio_err) is CancelledError: + log.cancel("infected task was cancelled") + else: aio_err.with_traceback(aio_err.__traceback__) log.exception("infected task errorred:") - from_aio._err = aio_err - # NOTE: order is opposite here - cancel_scope.cancel() - from_aio.close() + # NOTE: currently mem chan closure may act as a form + # of error relay (at least in the ``asyncio.CancelledError`` + # case) since we have no way to directly trigger a ``trio`` + # task error without creating a nursery to throw one. + # We might want to change this in the future though. + from_aio.close() task.add_done_callback(cancel_trio) @@ -160,6 +160,7 @@ async def translate_aio_errors( from_aio: trio.MemoryReceiveChannel, task: asyncio.Task, + trio_cs: trio.CancelScope, ) -> None: ''' @@ -167,34 +168,50 @@ async def translate_aio_errors( appropriately translates errors and cancels into ``trio`` land. ''' - err: Optional[Exception] = None aio_err: Optional[Exception] = None - def maybe_raise_aio_err(err: Exception): + def maybe_raise_aio_err( + err: Optional[Exception] = None + ) -> None: aio_err = from_aio._err if ( aio_err is not None and type(aio_err) != CancelledError ): # always raise from any captured asyncio error - raise aio_err from err - + if err: + raise aio_err from err + else: + raise aio_err try: yield except ( - Exception, - CancelledError, - ) as err: - maybe_raise_aio_err(err) - raise + # NOTE: see the note in the ``cancel_trio()`` asyncio task + # termination callback + trio.ClosedResourceError, + ): + aio_err = from_aio._err + if ( + task.cancelled() and + type(aio_err) is CancelledError + ): + # if an underlying ``asyncio.CancelledError`` triggered this + # channel close, raise our (non-``BaseException``) wrapper + # error: ``AsyncioCancelled`` from that source error. + raise AsyncioCancelled from aio_err + else: + raise finally: + # always cancel the ``asyncio`` task if we've made it this far + # and it's not done. if not task.done() and aio_err: + # assert not aio_err, 'WTF how did asyncio do this?!' task.cancel() - maybe_raise_aio_err(err) - # if task.cancelled(): - # ... do what .. + # if any ``asyncio`` error was caught, raise it here inline + # here in the ``trio`` task + maybe_raise_aio_err() async def run_task( @@ -216,27 +233,16 @@ async def run_task( qsize=1, **kwargs, ) - async with translate_aio_errors(from_aio, task): - - # return single value - with cs: - # naively expect the mem chan api to do the job - # of handling cross-framework cancellations / errors + with from_aio: + # try: + async with translate_aio_errors(from_aio, task, cs): + # return single value that is the output from the + # ``asyncio`` function-as-task. Expect the mem chan api to + # do the job of handling cross-framework cancellations + # / errors via closure and translation in the + # ``translate_aio_errors()`` in the above ctx mngr. return await from_aio.receive() - if cs.cancelled_caught: - aio_err = from_aio._err - - # always raise from any captured asyncio error - if aio_err: - raise aio_err - - -# TODO: explicitly api for the streaming case where -# we pull from the mem chan in an async generator? -# This ends up looking more like our ``Portal.open_stream_from()`` -# NB: code below is untested. - @dataclass class LinkedTaskChannel(trio.abc.Channel): @@ -250,19 +256,24 @@ class LinkedTaskChannel(trio.abc.Channel): _to_aio: asyncio.Queue _from_aio: trio.MemoryReceiveChannel _aio_task_complete: trio.Event + _trio_cs: trio.CancelScope async def aclose(self) -> None: self._from_aio.close() async def receive(self) -> Any: - async with translate_aio_errors(self._from_aio, self._aio_task): + async with translate_aio_errors( + self._from_aio, + self._aio_task, + self._trio_cs, + ): return await self._from_aio.receive() async def wait_ayncio_complete(self) -> None: await self._aio_task_complete.wait() - def cancel_asyncio_task(self) -> None: - self._aio_task.cancel() + # def cancel_asyncio_task(self) -> None: + # self._aio_task.cancel() async def send(self, item: Any) -> None: ''' @@ -292,16 +303,18 @@ async def open_channel_from( provide_channels=True, **kwargs, ) - chan = LinkedTaskChannel(task, aio_q, from_aio, aio_task_complete) - with cs: - async with translate_aio_errors(from_aio, task): + chan = LinkedTaskChannel( + task, aio_q, from_aio, + aio_task_complete, cs + ) + async with from_aio: + async with translate_aio_errors(from_aio, task, cs): # sync to a "started()"-like first delivered value from the # ``asyncio`` task. first = await from_aio.receive() # stream values upward - async with from_aio: - yield first, chan + yield first, chan def run_as_asyncio_guest( From 6803891bd796a09b263d2bccafa1908d2176f2c7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Nov 2021 12:12:21 -0500 Subject: [PATCH 37/53] Collect `asyncio` task exceptions to avoid warning msg --- tractor/to_asyncio.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 4e33e68..b6ec90e 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -136,6 +136,13 @@ def _run_asyncio_task( nonlocal aio_err aio_err = from_aio._err + # only to avoid ``asyncio`` complaining about uncaptured + # task exceptions + try: + task.exception() + except BaseException as terr: + assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' + if aio_err is not None: if type(aio_err) is CancelledError: log.cancel("infected task was cancelled") From c4b3bb354ec40d6428fdced54e367c57ab52d2d2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Nov 2021 12:13:02 -0500 Subject: [PATCH 38/53] Port tests to handle our new `asyncio` cancelled type --- tests/test_infected_asyncio.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 78003f7..4a5a270 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -1,5 +1,5 @@ ''' -The most hipster way to force SC onto the stdlib's "async". +The hipster way to force SC onto the stdlib's "async": 'infection mode'. ''' from typing import Optional, Iterable @@ -74,7 +74,7 @@ async def asyncio_actor( if expect_err: assert isinstance(err, error_type) - raise err + raise def test_aio_simple_error(arb_addr): @@ -134,7 +134,7 @@ def test_trio_cancels_aio(arb_addr): # cancel the nursery shortly after boot async with tractor.open_nursery() as n: - portal = await n.run_in_actor( + await n.run_in_actor( asyncio_actor, target='sleep_forever', expect_err='trio.Cancelled', @@ -144,9 +144,9 @@ def test_trio_cancels_aio(arb_addr): trio.run(main) - async def aio_cancel(): - ''''Cancel urself boi. + '''' + Cancel urself boi. ''' await asyncio.sleep(0.5) @@ -164,16 +164,16 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): portal = await n.run_in_actor( asyncio_actor, target='aio_cancel', - expect_err='asyncio.CancelledError', + expect_err='tractor.to_asyncio.AsyncioCancelled', infect_asyncio=True, ) - # with trio.CancelScope(shield=True): - await portal.result() - with pytest.raises(RemoteActorError) as excinfo: trio.run(main) + # ensure boxed error is correct + assert excinfo.value.type == to_asyncio.AsyncioCancelled + # TODO: verify open_channel_from will fail on this.. async def no_to_trio_in_args(): @@ -201,7 +201,7 @@ async def push_from_aio_task( if i == 50 and fail_early: raise Exception - print(f'asyncio streamer complete!') + print('asyncio streamer complete!') except asyncio.CancelledError: if not expect_cancel: From b69412a903eae1bc0ddc8c9a24d6338d7181aeba Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Nov 2021 20:08:06 -0500 Subject: [PATCH 39/53] Drop cancel scope from linked task channel --- tractor/to_asyncio.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index b6ec90e..0e85908 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -167,7 +167,6 @@ async def translate_aio_errors( from_aio: trio.MemoryReceiveChannel, task: asyncio.Task, - trio_cs: trio.CancelScope, ) -> None: ''' @@ -242,7 +241,7 @@ async def run_task( ) with from_aio: # try: - async with translate_aio_errors(from_aio, task, cs): + async with translate_aio_errors(from_aio, task): # return single value that is the output from the # ``asyncio`` function-as-task. Expect the mem chan api to # do the job of handling cross-framework cancellations @@ -263,7 +262,6 @@ class LinkedTaskChannel(trio.abc.Channel): _to_aio: asyncio.Queue _from_aio: trio.MemoryReceiveChannel _aio_task_complete: trio.Event - _trio_cs: trio.CancelScope async def aclose(self) -> None: self._from_aio.close() @@ -272,7 +270,6 @@ class LinkedTaskChannel(trio.abc.Channel): async with translate_aio_errors( self._from_aio, self._aio_task, - self._trio_cs, ): return await self._from_aio.receive() @@ -312,10 +309,10 @@ async def open_channel_from( ) chan = LinkedTaskChannel( task, aio_q, from_aio, - aio_task_complete, cs + aio_task_complete ) async with from_aio: - async with translate_aio_errors(from_aio, task, cs): + async with translate_aio_errors(from_aio, task): # sync to a "started()"-like first delivered value from the # ``asyncio`` task. first = await from_aio.receive() From 2b9b29eb71681a6360b08b104c056d84d096b2d6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 25 Nov 2021 17:10:22 -0500 Subject: [PATCH 40/53] Add an asyncio echo server test --- tests/test_infected_asyncio.py | 96 ++++++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 3 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 4a5a270..ebd0616 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -161,7 +161,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): async def main(): async with tractor.open_nursery() as n: - portal = await n.run_in_actor( + await n.run_in_actor( asyncio_actor, target='aio_cancel', expect_err='tractor.to_asyncio.AsyncioCancelled', @@ -321,5 +321,95 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr): assert excinfo.value.type == Exception -# def test_2way_reqresp(arb_addr): -# ... +@tractor.context +async def trio_to_aio_echo_server( + ctx: tractor.Context, +): + + async def aio_echo_server( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, + ) -> None: + + to_trio.send_nowait('start') + + while True: + msg = await from_trio.get() + + # echo the msg back + to_trio.send_nowait(msg) + + # if we get the terminate sentinel + # break the echo loop + if msg is None: + print('breaking aio echo loop') + break + + async with to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan): + + assert first == 'start' + await ctx.started(first) + + async with ctx.open_stream() as stream: + + async for msg in stream: + print(f'asyncio echoing {msg}') + await chan.send(msg) + + out = await chan.receive() + # echo back to parent actor-task + await stream.send(out) + + if out is None: + try: + out = await chan.receive() + except trio.EndOfChannel: + break + else: + raise RuntimeError('aio channel never stopped?') + + +def test_echoserver_detailed_mechanics(arb_addr): + + async def main(): + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'aio_server', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + trio_to_aio_echo_server, + ) as (ctx, first): + + assert first == 'start' + + async with ctx.open_stream() as stream: + for i in range(100): + await stream.send(i) + out = await stream.receive() + assert i == out + + # send terminate msg + await stream.send(None) + out = await stream.receive() + assert out is None + + if out is None: + # ensure the stream is stopped + # with trio.fail_after(0.1): + try: + await stream.receive() + except trio.EndOfChannel: + pass + else: + pytest.fail( + "stream wasn't stopped after sentinel?!") + + # TODO: the case where this blocks and + # is cancelled by kbi or out of task cancellation + await p.cancel_actor() + + trio.run(main) From 9a2de90de6cfcd32e993fcc432ed68811d9aa50c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 27 Nov 2021 21:55:04 -0500 Subject: [PATCH 41/53] Add mid stream echoserver "bail" cases --- tests/test_infected_asyncio.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index ebd0616..9881be9 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -371,7 +371,15 @@ async def trio_to_aio_echo_server( raise RuntimeError('aio channel never stopped?') -def test_echoserver_detailed_mechanics(arb_addr): +@pytest.mark.parametrize( + 'raise_error_mid_stream', + [False, Exception, KeyboardInterrupt], + ids='raise_error={}'.format, +) +def test_echoserver_detailed_mechanics( + arb_addr, + raise_error_mid_stream, +): async def main(): async with tractor.open_nursery() as n: @@ -392,6 +400,9 @@ def test_echoserver_detailed_mechanics(arb_addr): out = await stream.receive() assert i == out + if raise_error_mid_stream and i == 50: + raise raise_error_mid_stream + # send terminate msg await stream.send(None) out = await stream.receive() @@ -412,4 +423,9 @@ def test_echoserver_detailed_mechanics(arb_addr): # is cancelled by kbi or out of task cancellation await p.cancel_actor() - trio.run(main) + if raise_error_mid_stream: + with pytest.raises(raise_error_mid_stream): + trio.run(main) + + else: + trio.run(main) From 56cc98375e2875d491b2530271c18552d1d626dd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 28 Nov 2021 12:38:37 -0500 Subject: [PATCH 42/53] Return channel type from `_run_asyncio_task()` Better encapsulate all the mem-chan, Queue, sync-primitives inside our linked task channel in order to avoid `mypy`'s complaints about monkey patching. This also sets footing for adding an `asyncio`-side channel API that can be used more like this `trio`-side API. --- tractor/to_asyncio.py | 166 ++++++++++++++++++++++-------------------- 1 file changed, 88 insertions(+), 78 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 0e85908..6ad8bf5 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -27,14 +27,57 @@ log = get_logger(__name__) __all__ = ['run_task', 'run_as_asyncio_guest'] +@dataclass +class LinkedTaskChannel(trio.abc.Channel): + ''' + A "linked task channel" which allows for two-way synchronized msg + passing between a ``trio``-in-guest-mode task and an ``asyncio`` + task scheduled in the host loop. + + ''' + _to_aio: asyncio.Queue + _from_aio: trio.MemoryReceiveChannel + _to_trio: trio.MemorySendChannel + + _trio_cs: trio.CancelScope + _aio_task_complete: trio.Event + + # set after ``asyncio.create_task()`` + _aio_task: Optional[asyncio.Task] = None + _aio_err: Optional[BaseException] = None + + async def aclose(self) -> None: + await self._from_aio.aclose() + + async def receive(self) -> Any: + async with translate_aio_errors(self): + return await self._from_aio.receive() + + async def wait_ayncio_complete(self) -> None: + await self._aio_task_complete.wait() + + # def cancel_asyncio_task(self) -> None: + # self._aio_task.cancel() + + async def send(self, item: Any) -> None: + ''' + Send a value through to the asyncio task presuming + it defines a ``from_trio`` argument, if it does not + this method will raise an error. + + ''' + self._to_aio.put_nowait(item) + + def _run_asyncio_task( + func: Callable, *, qsize: int = 1, provide_channels: bool = False, **kwargs, -) -> Any: +) -> LinkedTaskChannel: ''' Run an ``asyncio`` async function or generator in a task, return or stream the result back to ``trio``. @@ -45,11 +88,9 @@ def _run_asyncio_task( # ITC (inter task comms), these channel/queue names are mostly from # ``asyncio``'s perspective. - from_trio = asyncio.Queue(qsize) # type: ignore + aio_q = from_trio = asyncio.Queue(qsize) # type: ignore to_trio, from_aio = trio.open_memory_channel(qsize) # type: ignore - from_aio._err = None - args = tuple(inspect.getfullargspec(func).args) if getattr(func, '_tractor_steam_function', None): @@ -74,6 +115,15 @@ def _run_asyncio_task( aio_task_complete = trio.Event() aio_err: Optional[BaseException] = None + chan = LinkedTaskChannel( + aio_q, # asyncio.Queue + from_aio, # recv chan + to_trio, # send chan + + cancel_scope, + aio_task_complete, + ) + async def wait_on_coro_final_result( to_trio: trio.MemorySendChannel, @@ -86,12 +136,13 @@ def _run_asyncio_task( ''' nonlocal aio_err + nonlocal chan + orig = result = id(coro) try: result = await coro - except BaseException as err: - aio_err = err - from_aio._err = aio_err + except BaseException as aio_err: + chan._aio_err = aio_err raise else: @@ -116,25 +167,25 @@ def _run_asyncio_task( aio_task_complete.set() # start the asyncio task we submitted from trio - if inspect.isawaitable(coro): - task = asyncio.create_task( - wait_on_coro_final_result( - to_trio, - coro, - aio_task_complete - ) - ) - - else: + if not inspect.isawaitable(coro): raise TypeError(f"No support for invoking {coro}") + task = asyncio.create_task( + wait_on_coro_final_result( + to_trio, + coro, + aio_task_complete + ) + ) + chan._aio_task = task + def cancel_trio(task: asyncio.Task) -> None: ''' Cancel the calling ``trio`` task on error. ''' - nonlocal aio_err - aio_err = from_aio._err + nonlocal chan + aio_err = chan._aio_err # only to avoid ``asyncio`` complaining about uncaptured # task exceptions @@ -159,27 +210,26 @@ def _run_asyncio_task( task.add_done_callback(cancel_trio) - return task, from_aio, to_trio, from_trio, cancel_scope, aio_task_complete + return chan @acm async def translate_aio_errors( - from_aio: trio.MemoryReceiveChannel, - task: asyncio.Task, + chan: LinkedTaskChannel, -) -> None: +) -> AsyncIterator[None]: ''' Error handling context around ``asyncio`` task spawns which appropriately translates errors and cancels into ``trio`` land. ''' - aio_err: Optional[Exception] = None + aio_err: Optional[BaseException] = None def maybe_raise_aio_err( err: Optional[Exception] = None ) -> None: - aio_err = from_aio._err + aio_err = chan._aio_err if ( aio_err is not None and type(aio_err) != CancelledError @@ -189,6 +239,9 @@ async def translate_aio_errors( raise aio_err from err else: raise aio_err + + task = chan._aio_task + assert task try: yield except ( @@ -196,7 +249,7 @@ async def translate_aio_errors( # termination callback trio.ClosedResourceError, ): - aio_err = from_aio._err + aio_err = chan._aio_err if ( task.cancelled() and type(aio_err) is CancelledError @@ -234,65 +287,26 @@ async def run_task( ''' # simple async func - task, from_aio, to_trio, aio_q, cs, _ = _run_asyncio_task( + chan = _run_asyncio_task( func, qsize=1, **kwargs, ) - with from_aio: + with chan._from_aio: # try: - async with translate_aio_errors(from_aio, task): + async with translate_aio_errors(chan): # return single value that is the output from the # ``asyncio`` function-as-task. Expect the mem chan api to # do the job of handling cross-framework cancellations # / errors via closure and translation in the # ``translate_aio_errors()`` in the above ctx mngr. - return await from_aio.receive() - - -@dataclass -class LinkedTaskChannel(trio.abc.Channel): - ''' - A "linked task channel" which allows for two-way synchronized msg - passing between a ``trio``-in-guest-mode task and an ``asyncio`` - task. - - ''' - _aio_task: asyncio.Task - _to_aio: asyncio.Queue - _from_aio: trio.MemoryReceiveChannel - _aio_task_complete: trio.Event - - async def aclose(self) -> None: - self._from_aio.close() - - async def receive(self) -> Any: - async with translate_aio_errors( - self._from_aio, - self._aio_task, - ): - return await self._from_aio.receive() - - async def wait_ayncio_complete(self) -> None: - await self._aio_task_complete.wait() - - # def cancel_asyncio_task(self) -> None: - # self._aio_task.cancel() - - async def send(self, item: Any) -> None: - ''' - Send a value through to the asyncio task presuming - it defines a ``from_trio`` argument, if it does not - this method will raise an error. - - ''' - self._to_aio.put_nowait(item) + return await chan.receive() @acm async def open_channel_from( - target: Callable[[Any, ...], Any], + target: Callable[..., Any], **kwargs, ) -> AsyncIterator[Any]: @@ -301,21 +315,17 @@ async def open_channel_from( spawned ``asyncio`` task and ``trio``. ''' - task, from_aio, to_trio, aio_q, cs, aio_task_complete = _run_asyncio_task( + chan = _run_asyncio_task( target, qsize=2**8, provide_channels=True, **kwargs, ) - chan = LinkedTaskChannel( - task, aio_q, from_aio, - aio_task_complete - ) - async with from_aio: - async with translate_aio_errors(from_aio, task): + async with chan._from_aio: + async with translate_aio_errors(chan): # sync to a "started()"-like first delivered value from the # ``asyncio`` task. - first = await from_aio.receive() + first = await chan.receive() # stream values upward yield first, chan From 24078f2d6e0e8f5a17c5f75e3ca42f577fe5fc0a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Dec 2021 08:12:20 -0500 Subject: [PATCH 43/53] More doc string style tweaks --- tractor/_entry.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/tractor/_entry.py b/tractor/_entry.py index 0e31f32..c860f2b 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -33,15 +33,19 @@ log = get_logger(__name__) def _mp_main( + actor: 'Actor', # type: ignore accept_addr: Tuple[str, int], forkserver_info: Tuple[Any, Any, Any, Any, Any], start_method: str, parent_addr: Tuple[str, int] = None, infect_asyncio: bool = False, + ) -> None: - """The routine called *after fork* which invokes a fresh ``trio.run`` - """ + ''' + The routine called *after fork* which invokes a fresh ``trio.run`` + + ''' actor._forkserver_info = forkserver_info from ._spawn import try_set_start_method spawn_ctx = try_set_start_method(start_method) @@ -77,19 +81,17 @@ def _mp_main( def _trio_main( + actor: 'Actor', # type: ignore *, parent_addr: Tuple[str, int] = None, infect_asyncio: bool = False, + ) -> None: - """Entry point for a `trio_run_in_process` subactor. - """ - # Disable sigint handling in children; - # we don't need it thanks to our cancellation machinery. - # signal.signal(signal.SIGINT, signal.SIG_IGN) - - log.info(f"Started new trio process for {actor.uid}") + ''' + Entry point for a `trio_run_in_process` subactor. + ''' log.info(f"Started new trio process for {actor.uid}") if actor.loglevel is not None: From d65912e1aeefc367d783a4b0d24006a1f3482f99 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Dec 2021 08:12:46 -0500 Subject: [PATCH 44/53] Increase kbi delay in remote cancel test --- tests/test_cancellation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index c346806..4144859 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -523,7 +523,7 @@ def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon( cancellation, and it's faster, we might as well do it. ''' - kbi_delay = 0.2 + kbi_delay = 0.5 async def main(): start = time.time() From b463841019255ab801616b9c4744e6fd8c0b3247 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Dec 2021 13:48:41 -0500 Subject: [PATCH 45/53] Add infected `asyncio` echo server example --- examples/infected_asyncio_echo_server.py | 90 ++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 examples/infected_asyncio_echo_server.py diff --git a/examples/infected_asyncio_echo_server.py b/examples/infected_asyncio_echo_server.py new file mode 100644 index 0000000..aeda42c --- /dev/null +++ b/examples/infected_asyncio_echo_server.py @@ -0,0 +1,90 @@ +''' +An SC compliant infected ``asyncio`` echo server. + +''' +import asyncio +from statistics import mean +import time + +import trio +import tractor + + +async def aio_echo_server( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, +) -> None: + + # a first message must be sent **from** this ``asyncio`` + # task or the ``trio`` side will never unblock from + # ``tractor.to_asyncio.open_channel_from():`` + to_trio.send_nowait('start') + + # XXX: this uses an ``from_trio: asyncio.Queue`` currently but we + # should probably offer something better. + while True: + # echo the msg back + to_trio.send_nowait(await from_trio.get()) + + +@tractor.context +async def trio_to_aio_echo_server( + ctx: tractor.Context, +): + # this will block until the ``asyncio`` task sends a "first" + # message. + async with tractor.to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan): + + assert first == 'start' + await ctx.started(first) + + async with ctx.open_stream() as stream: + + async for msg in stream: + await chan.send(msg) + + out = await chan.receive() + # echo back to parent actor-task + await stream.send(out) + + +async def main(): + + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'aio_server', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + trio_to_aio_echo_server, + ) as (ctx, first): + + assert first == 'start' + + count = 0 + async with ctx.open_stream() as stream: + + delays = [] + send = time.time() + + await stream.send(count) + async for msg in stream: + recv = time.time() + delays.append(recv - send) + assert msg == count + count += 1 + send = time.time() + await stream.send(count) + + if count >= 1e3: + break + + print(f'mean round trip rate (Hz): {1/mean(delays)}') + await p.cancel_actor() + + +if __name__ == '__main__': + trio.run(main) From 7237d696ceca3dc818287586fcd893915bb76d9f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Dec 2021 14:03:23 -0500 Subject: [PATCH 46/53] Add asyncio echo server ex to readme; fix cluster section --- docs/README.rst | 105 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/docs/README.rst b/docs/README.rst index 842cbcd..3c00d87 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -313,6 +313,110 @@ real time:: This uses no extra threads, fancy semaphores or futures; all we need is ``tractor``'s IPC! +"Infected ``asyncio``" mode +--------------------------- +Have a bunch of ``asyncio`` code you want to force to be SC at the process level? + +Check out our experimental system for `guest-mode`_ controlled +``asyncio`` actors: + +.. code:: python + + import asyncio + from statistics import mean + import time + + import trio + import tractor + + + async def aio_echo_server( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, + ) -> None: + + # a first message must be sent **from** this ``asyncio`` + # task or the ``trio`` side will never unblock from + # ``tractor.to_asyncio.open_channel_from():`` + to_trio.send_nowait('start') + + # XXX: this uses an ``from_trio: asyncio.Queue`` currently but we + # should probably offer something better. + while True: + # echo the msg back + to_trio.send_nowait(await from_trio.get()) + + + @tractor.context + async def trio_to_aio_echo_server( + ctx: tractor.Context, + ): + # this will block until the ``asyncio`` task sends a "first" + # message. + async with tractor.to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan): + + assert first == 'start' + await ctx.started(first) + + async with ctx.open_stream() as stream: + + async for msg in stream: + await chan.send(msg) + + out = await chan.receive() + # echo back to parent actor-task + await stream.send(out) + + + async def main(): + + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'aio_server', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + trio_to_aio_echo_server, + ) as (ctx, first): + + assert first == 'start' + + count = 0 + async with ctx.open_stream() as stream: + + delays = [] + send = time.time() + + await stream.send(count) + async for msg in stream: + recv = time.time() + delays.append(recv - send) + assert msg == count + count += 1 + send = time.time() + await stream.send(count) + + if count >= 1e3: + break + + print(f'mean round trip rate (Hz): {1/mean(delays)}') + await p.cancel_actor() + + + if __name__ == '__main__': + trio.run(main) + + +Yes, we spawn a python process, run ``asyncio``, start ``trio`` on the +``asyncio`` loop, then send commands to the ``trio`` scheduled tasks to +tell ``asyncio`` tasks what to do XD + + +Higher level "cluster" APIs +--------------------------- To be extra terse the ``tractor`` devs have started hacking some "higher level" APIs for managing actor trees/clusters. These interfaces should generally be condsidered provisional for now but we encourage you to try @@ -489,6 +593,7 @@ channel`_! .. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _trio-parallel: https://github.com/richardsheridan/trio-parallel .. _msgspec: https://jcristharif.com/msgspec/ +.. _guest-mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops .. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square From 6952c7defa0b6689ac83621a779783a7c6c32137 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Dec 2021 14:15:23 -0500 Subject: [PATCH 47/53] Add features bullet, slip in a guille-ism --- docs/README.rst | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/README.rst b/docs/README.rst index 3c00d87..d4f0066 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -27,7 +27,9 @@ Features - A modular transport stack, allowing for custom serialization (eg. `msgspec`_), communications protocols, and environment specific IPC primitives -- `structured concurrency`_ from the ground up +- Support for spawning process-level-SC, inter-loop one-to-one-task oriented + ``asyncio`` actors via "infected ``asyncio``" mode +- `structured chadcurrency`_ from the ground up Run a func in a process @@ -588,7 +590,8 @@ channel`_! .. _messages: https://en.wikipedia.org/wiki/Message_passing .. _trio docs: https://trio.readthedocs.io/en/latest/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ -.. _structured concurrency: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ +.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency +.. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _trio-parallel: https://github.com/richardsheridan/trio-parallel From 1fdcaf36f3c258478b1c12d25f200cb4436e8781 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Dec 2021 15:50:34 -0500 Subject: [PATCH 48/53] Not enough time for new asyncio tests? --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d1fe811..b03ba9f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,7 +25,7 @@ jobs: testing-linux: name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}' - timeout-minutes: 9 + timeout-minutes: 10 runs-on: ${{ matrix.os }} strategy: From 73d252e09e372f01bdf25da4c6f7a130a026b7ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Dec 2021 13:09:36 -0500 Subject: [PATCH 49/53] Emphasize `asyncio` only with sleeps --- docs/README.rst | 1 + examples/infected_asyncio_echo_server.py | 1 + 2 files changed, 2 insertions(+) diff --git a/docs/README.rst b/docs/README.rst index d4f0066..b557e9e 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -347,6 +347,7 @@ Check out our experimental system for `guest-mode`_ controlled while True: # echo the msg back to_trio.send_nowait(await from_trio.get()) + await asyncio.sleep(0) @tractor.context diff --git a/examples/infected_asyncio_echo_server.py b/examples/infected_asyncio_echo_server.py index aeda42c..ee7c45b 100644 --- a/examples/infected_asyncio_echo_server.py +++ b/examples/infected_asyncio_echo_server.py @@ -25,6 +25,7 @@ async def aio_echo_server( while True: # echo the msg back to_trio.send_nowait(await from_trio.get()) + await asyncio.sleep(0) @tractor.context From 4c0cfa68acdf9b5963c1dd5748353004f0bd01a6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 11 Mar 2021 10:07:59 -0500 Subject: [PATCH 50/53] Link to SC on wikipedia --- docs/README.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/README.rst b/docs/README.rst index b557e9e..3a44d20 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -583,6 +583,7 @@ channel`_! .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s +.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _trio gitter channel: https://gitter.im/python-trio/general .. _matrix channel: https://matrix.to/#/!tractor:matrix.org From 4d1a48a47b69b366327fee48976164108027caf9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Dec 2021 20:16:19 -0500 Subject: [PATCH 51/53] Link to inter-loop channel issue in readme --- docs/README.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/README.rst b/docs/README.rst index 3a44d20..180cd81 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -417,6 +417,12 @@ Yes, we spawn a python process, run ``asyncio``, start ``trio`` on the ``asyncio`` loop, then send commands to the ``trio`` scheduled tasks to tell ``asyncio`` tasks what to do XD +We need help refining the `asyncio`-side channel API to be more +`trio`-like. Feel free to sling your opinion in `#273`_! + + +.. _#273: https://github.com/goodboy/tractor/issues/273 + Higher level "cluster" APIs --------------------------- @@ -594,6 +600,7 @@ channel`_! .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency .. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency +.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _trio-parallel: https://github.com/richardsheridan/trio-parallel From 9b14d82086095283ea7f02687bff935d85ccce80 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 12 Dec 2021 16:33:48 -0500 Subject: [PATCH 52/53] Add nooz --- newsfragments/121.feature.rst | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 newsfragments/121.feature.rst diff --git a/newsfragments/121.feature.rst b/newsfragments/121.feature.rst new file mode 100644 index 0000000..2ba8fc0 --- /dev/null +++ b/newsfragments/121.feature.rst @@ -0,0 +1,28 @@ +Add "infected ``asyncio`` mode; a sub-system to spawn and control +``asyncio`` actors using ``trio``'s guest-mode. + +This gets us the following very interesting functionality: + +- ability to spawn an actor that has a process entry point of + ``asyncio.run()`` by passing ``infect_asyncio=True`` to + ``Portal.start_actor()`` (and friends). +- the ``asyncio`` actor embeds ``trio`` using guest-mode and starts + a main ``trio`` task which runs the ``tractor.Actor._async_main()`` + entry point engages all the normal ``tractor`` runtime IPC/messaging + machinery; for all purposes the actor is now running normally on + a ``trio.run()``. +- the actor can now make one-to-one task spawning requests to the + underlying ``asyncio`` event loop using either of: + * ``to_asyncio.run_task()`` to spawn and run an ``asyncio`` task to + completion and block until a return value is delivered. + * ``async with to_asyncio.open_channel_from():`` which spawns a task + and hands it a pair of "memory channels" to allow for bi-directional + streaming between the now SC-linked ``trio`` and ``asyncio`` tasks. + +The output from any call(s) to ``asyncio`` can be handled as normal in +``trio``/``tractor`` task operation with the caveat of the overhead due +to guest-mode use. + +For more details see the `original PR +`_ and `issue +`_. From 9b4cdb00e6aa07543c4decf362850c68e98b3b1c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Dec 2021 17:42:40 -0500 Subject: [PATCH 53/53] Add agpl header --- tractor/to_asyncio.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 6ad8bf5..9b18a87 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -1,3 +1,19 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + ''' Infection apis for ``asyncio`` loops running ``trio`` using guest mode.