From 98de2fab31c730a2dee1a8b7b85c7b31331dcbe5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Jul 2022 16:09:05 -0400 Subject: [PATCH 1/7] Add context test that opens an inter-task-channel that errors --- examples/infected_asyncio_echo_server.py | 1 + tests/test_infected_asyncio.py | 97 +++++++++++++++++++++++- 2 files changed, 95 insertions(+), 3 deletions(-) diff --git a/examples/infected_asyncio_echo_server.py b/examples/infected_asyncio_echo_server.py index ee7c45b..0250835 100644 --- a/examples/infected_asyncio_echo_server.py +++ b/examples/infected_asyncio_echo_server.py @@ -13,6 +13,7 @@ import tractor async def aio_echo_server( to_trio: trio.MemorySendChannel, from_trio: asyncio.Queue, + ) -> None: # a first message must be sent **from** this ``asyncio`` diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 37c85fd..e1228c0 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -11,12 +11,26 @@ import importlib import pytest import trio import tractor -from tractor import to_asyncio -from tractor import RemoteActorError +from tractor import ( + to_asyncio, + RemoteActorError, + ContextCancelled +) from tractor.trionics import BroadcastReceiver -async def sleep_and_err(sleep_for: float = 0.1): +async def sleep_and_err( + sleep_for: float = 0.1, + + # just signature placeholders for compat with + # ``to_asyncio.open_channel_from()`` + to_trio: Optional[trio.MemorySendChannel] = None, + from_trio: Optional[asyncio.Queue] = None, + +): + if to_trio: + to_trio.send_nowait('start') + await asyncio.sleep(sleep_for) assert 0 @@ -146,6 +160,81 @@ def test_trio_cancels_aio(arb_addr): trio.run(main) +@tractor.context +async def trio_ctx( + ctx: tractor.Context, +): + + await ctx.started('start') + + # this will block until the ``asyncio`` task sends a "first" + # message. + with trio.fail_after(0.5): + async with ( + tractor.to_asyncio.open_channel_from( + sleep_and_err, + ) as (first, chan), + + trio.open_nursery() as n, + ): + + assert first == 'start' + + # spawn another asyncio task for the cuck of it. + n.start_soon( + tractor.to_asyncio.run_task, + sleep_forever, + ) + # await trio.sleep_forever() + + +@pytest.mark.parametrize( + 'parent_cancels', [False, True], + ids='parent_actor_cancels_child={}'.format +) +def test_context_spawns_aio_task_that_errors( + arb_addr, + parent_cancels: bool, +): + ''' + Verify that spawning a task via an intertask channel ctx mngr that + errors correctly propagates the error back from the `asyncio`-side + taksk. + + ''' + async def main(): + + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'aio_daemon', + enable_modules=[__name__], + infect_asyncio=True, + # debug_mode=True, + loglevel='cancel', + ) + async with p.open_context( + trio_ctx, + ) as (ctx, first): + + assert first == 'start' + + if parent_cancels: + await p.cancel_actor() + + await trio.sleep_forever() + + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + err = excinfo.value + assert isinstance(err, RemoteActorError) + if parent_cancels: + assert err.type == trio.Cancelled + else: + assert err.type == AssertionError + + async def aio_cancel(): '''' Cancel urself boi. @@ -385,6 +474,8 @@ async def trio_to_aio_echo_server( print('breaking aio echo loop') break + print('exiting asyncio task') + async with to_asyncio.open_channel_from( aio_echo_server, ) as (first, chan): From 38d03858d748e1227386b2e69078a1023ef7c600 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Jul 2022 16:35:41 -0400 Subject: [PATCH 2/7] Fix `asyncio`-task-sync and error propagation This fixes an previously undetected bug where if an `.open_channel_from()` spawned task errored the error would not be propagated to the `trio` side and instead would fail silently with a console log error. What was most odd is that it only seems easy to trigger when you put a slight task sleep before the error is raised (:eyeroll:). This patch adds a few things to address this and just in general improve iter-task lifetime syncing: - add `LinkedTaskChannel._trio_exited: bool` a flag set from the `trio` side when the channel block exits. - add a `wait_on_aio_task: bool` flag to `translate_aio_errors` which toggles whether to wait the `asyncio` task termination event on exit. - cancel the `asyncio` task if the trio side has ended, when `._trio_exited == True`. - always close the `trio` mem channel when the task exits such that the `asyncio` side can error on any next `.send()` call. --- tests/test_infected_asyncio.py | 2 +- tractor/to_asyncio.py | 65 +++++++++++++++++++++++++++------- 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index e1228c0..81a4f3e 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -185,7 +185,7 @@ async def trio_ctx( tractor.to_asyncio.run_task, sleep_forever, ) - # await trio.sleep_forever() + await trio.sleep_forever() @pytest.mark.parametrize( diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 6ca07ca..5168234 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -63,6 +63,7 @@ class LinkedTaskChannel(trio.abc.Channel): _trio_cs: trio.CancelScope _aio_task_complete: trio.Event + _trio_exited: bool = False # set after ``asyncio.create_task()`` _aio_task: Optional[asyncio.Task] = None @@ -73,7 +74,13 @@ class LinkedTaskChannel(trio.abc.Channel): await self._from_aio.aclose() async def receive(self) -> Any: - async with translate_aio_errors(self): + async with translate_aio_errors( + self, + + # XXX: obviously this will deadlock if an on-going stream is + # being procesed. + # wait_on_aio_task=False, + ): # TODO: do we need this to guarantee asyncio code get's # cancelled in the case where the trio side somehow creates @@ -210,10 +217,8 @@ def _run_asyncio_task( orig = result = id(coro) try: result = await coro - except GeneratorExit: - # no need to relay error - raise except BaseException as aio_err: + log.exception('asyncio task errored') chan._aio_err = aio_err raise @@ -237,6 +242,7 @@ def _run_asyncio_task( to_trio.close() aio_task_complete.set() + log.runtime(f'`asyncio` task: {task.get_name()} is complete') # start the asyncio task we submitted from trio if not inspect.isawaitable(coro): @@ -296,6 +302,11 @@ def _run_asyncio_task( f'infected task errorred:\n{msg}' ) + # XXX: alway cancel the scope on error + # in case the trio task is blocking + # on a checkpoint. + cancel_scope.cancel() + # raise any ``asyncio`` side error. raise aio_err @@ -307,6 +318,7 @@ def _run_asyncio_task( async def translate_aio_errors( chan: LinkedTaskChannel, + wait_on_aio_task: bool = False, ) -> AsyncIterator[None]: ''' @@ -318,6 +330,7 @@ async def translate_aio_errors( aio_err: Optional[BaseException] = None + # TODO: make thisi a channel method? def maybe_raise_aio_err( err: Optional[Exception] = None ) -> None: @@ -367,13 +380,30 @@ async def translate_aio_errors( raise finally: - # always cancel the ``asyncio`` task if we've made it this far - # and it's not done. - if not task.done() and aio_err: + if ( + # NOTE: always cancel the ``asyncio`` task if we've made it + # this far and it's not done. + not task.done() and aio_err + + # or the trio side has exited it's surrounding cancel scope + # indicating the lifetime of the ``asyncio``-side task + # should also be terminated. + or chan._trio_exited + ): + log.runtime( + f'Cancelling `asyncio`-task: {chan._aio_taskget_name()}' + ) # assert not aio_err, 'WTF how did asyncio do this?!' task.cancel() - # if any ``asyncio`` error was caught, raise it here inline + # Required to sync with the far end ``asyncio``-task to ensure + # any error is captured (via monkeypatching the + # ``channel._aio_err``) before calling ``maybe_raise_aio_err()`` + # below! + if wait_on_aio_task: + await chan._aio_task_complete.wait() + + # NOTE: if any ``asyncio`` error was caught, raise it here inline # here in the ``trio`` task maybe_raise_aio_err() @@ -398,7 +428,10 @@ async def run_task( **kwargs, ) with chan._from_aio: - async with translate_aio_errors(chan): + async with translate_aio_errors( + chan, + wait_on_aio_task=True, + ): # return single value that is the output from the # ``asyncio`` function-as-task. Expect the mem chan api to # do the job of handling cross-framework cancellations @@ -426,13 +459,21 @@ async def open_channel_from( **kwargs, ) async with chan._from_aio: - async with translate_aio_errors(chan): + async with translate_aio_errors( + chan, + wait_on_aio_task=True, + ): # sync to a "started()"-like first delivered value from the # ``asyncio`` task. first = await chan.receive() # deliver stream handle upward - yield first, chan + try: + with chan._trio_cs: + yield first, chan + finally: + chan._trio_exited = True + chan._to_trio.close() def run_as_asyncio_guest( @@ -482,7 +523,7 @@ def run_as_asyncio_guest( main_outcome.unwrap() else: trio_done_fut.set_result(main_outcome) - print(f"trio_main finished: {main_outcome!r}") + log.runtime(f"trio_main finished: {main_outcome!r}") # start the infection: run trio on the asyncio loop in "guest mode" log.info(f"Infecting asyncio process with {trio_main}") From 0906559ed9177ab24630b94e2dc045ad4056c5b8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Jul 2022 20:43:17 -0400 Subject: [PATCH 3/7] Drop manual stack construction, fix attr typo --- tractor/to_asyncio.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 5168234..d0497c1 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -297,10 +297,7 @@ def _run_asyncio_task( elif task_err is None: assert aio_err aio_err.with_traceback(aio_err.__traceback__) - msg = ''.join(traceback.format_exception(type(aio_err))) - log.error( - f'infected task errorred:\n{msg}' - ) + log.error(f'infected task errorred') # XXX: alway cancel the scope on error # in case the trio task is blocking @@ -391,7 +388,7 @@ async def translate_aio_errors( or chan._trio_exited ): log.runtime( - f'Cancelling `asyncio`-task: {chan._aio_taskget_name()}' + f'Cancelling `asyncio`-task: {chan._aio_task.get_name()}' ) # assert not aio_err, 'WTF how did asyncio do this?!' task.cancel() From ce01f6b21cb87aba9c03c09223348f4fabf18365 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Jul 2022 20:44:10 -0400 Subject: [PATCH 4/7] Increase timeout for CI/windows --- tests/test_infected_asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 81a4f3e..979b558 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -169,7 +169,7 @@ async def trio_ctx( # this will block until the ``asyncio`` task sends a "first" # message. - with trio.fail_after(0.5): + with trio.fail_after(2): async with ( tractor.to_asyncio.open_channel_from( sleep_and_err, From f0d78e1a6ea9143c34a284a67cc7c8279c7f22e5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Jul 2022 10:39:49 -0400 Subject: [PATCH 5/7] Use local task ref, fixes `mypy` --- tractor/to_asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index d0497c1..0634215 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -388,7 +388,7 @@ async def translate_aio_errors( or chan._trio_exited ): log.runtime( - f'Cancelling `asyncio`-task: {chan._aio_task.get_name()}' + f'Cancelling `asyncio`-task: {task.get_name()}' ) # assert not aio_err, 'WTF how did asyncio do this?!' task.cancel() From 565c603300f8fc3c1c8d8d124945bad8885d0168 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Jul 2022 11:17:57 -0400 Subject: [PATCH 6/7] Add nooz --- nooz/318.bug.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 nooz/318.bug.rst diff --git a/nooz/318.bug.rst b/nooz/318.bug.rst new file mode 100644 index 0000000..5bbf4f0 --- /dev/null +++ b/nooz/318.bug.rst @@ -0,0 +1,13 @@ +Fix a previously undetected ``trio``-``asyncio`` task lifetime linking +issue with the ``to_asyncio.open_channel_from()`` api where both sides +where not properly waiting/signalling termination and it was possible +for ``asyncio``-side errors to not propagate due to a race condition. + +The implementation fix summary is: +- add state to signal the end of the ``trio`` side task to be + read by the ``asyncio`` side and always cancel any ongoing + task in such cases. +- always wait on the ``asyncio`` task termination from the ``trio`` + side on error before maybe raising said error. +- always close the ``trio`` mem chan on exit to ensure the other + side can detect it and follow. From 05790a20c1abbb5bc4752263e83cdd6d69c826de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Jul 2022 11:18:48 -0400 Subject: [PATCH 7/7] Slight lint fixes --- tests/test_infected_asyncio.py | 4 +--- tractor/to_asyncio.py | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 979b558..976741d 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -14,7 +14,6 @@ import tractor from tractor import ( to_asyncio, RemoteActorError, - ContextCancelled ) from tractor.trionics import BroadcastReceiver @@ -199,7 +198,7 @@ def test_context_spawns_aio_task_that_errors( ''' Verify that spawning a task via an intertask channel ctx mngr that errors correctly propagates the error back from the `asyncio`-side - taksk. + task. ''' async def main(): @@ -223,7 +222,6 @@ def test_context_spawns_aio_task_that_errors( await trio.sleep_forever() - with pytest.raises(RemoteActorError) as excinfo: trio.run(main) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 0634215..a19afe1 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -23,7 +23,6 @@ from asyncio.exceptions import CancelledError from contextlib import asynccontextmanager as acm from dataclasses import dataclass import inspect -import traceback from typing import ( Any, Callable, @@ -297,7 +296,7 @@ def _run_asyncio_task( elif task_err is None: assert aio_err aio_err.with_traceback(aio_err.__traceback__) - log.error(f'infected task errorred') + log.error('infected task errorred') # XXX: alway cancel the scope on error # in case the trio task is blocking