''' The hipster way to force SC onto the stdlib's "async": 'infection mode'. ''' import asyncio import builtins from contextlib import ExitStack from functools import partial import itertools import importlib import os from pathlib import Path import signal from typing import ( Callable, Iterable, Union, ) import pytest import trio import tractor from tractor import ( current_actor, Actor, to_asyncio, RemoteActorError, ContextCancelled, _state, ) from tractor.trionics import BroadcastReceiver from tractor._testing import expect_ctxc async def sleep_and_err( sleep_for: float = 0.1, # just signature placeholders for compat with # ``to_asyncio.open_channel_from()`` to_trio: trio.MemorySendChannel|None = None, from_trio: asyncio.Queue|None = None, ): if to_trio: to_trio.send_nowait('start') await asyncio.sleep(sleep_for) assert 0 async def aio_sleep_forever(): await asyncio.sleep(float('inf')) async def trio_cancels_single_aio_task(): # spawn an ``asyncio`` task to run a func and return result with trio.move_on_after(.2): await tractor.to_asyncio.run_task(aio_sleep_forever) def test_trio_cancels_aio_on_actor_side(reg_addr): ''' Spawn an infected actor that is cancelled by the ``trio`` side task using std cancel scope apis. ''' async def main(): async with tractor.open_nursery( registry_addrs=[reg_addr] ) as n: await n.run_in_actor( trio_cancels_single_aio_task, infect_asyncio=True, ) trio.run(main) async def asyncio_actor( target: str, expect_err: Exception|None = None ) -> None: # ensure internal runtime state is consistent actor: Actor = tractor.current_actor() assert ( actor.is_infected_aio() and actor._infected_aio and _state._runtime_vars['_is_infected_aio'] ) target: Callable = globals()[target] if '.' in expect_err: modpath, _, name = expect_err.rpartition('.') mod = importlib.import_module(modpath) error_type = getattr(mod, name) else: # toplevel builtin error type error_type = builtins.__dict__.get(expect_err) try: # spawn an ``asyncio`` task to run a func and return result await tractor.to_asyncio.run_task(target) except BaseException as err: if expect_err: assert isinstance(err, error_type) raise def test_aio_simple_error(reg_addr): ''' Verify a simple remote asyncio error propagates back through trio to the parent actor. ''' async def main(): async with tractor.open_nursery( registry_addrs=[reg_addr] ) as n: await n.run_in_actor( asyncio_actor, target='sleep_and_err', expect_err='AssertionError', infect_asyncio=True, ) with pytest.raises( expected_exception=(RemoteActorError, ExceptionGroup), ) as excinfo: trio.run(main) err = excinfo.value # might get multiple `trio.Cancelled`s as well inside an inception if isinstance(err, ExceptionGroup): err = next(itertools.dropwhile( lambda exc: not isinstance(exc, tractor.RemoteActorError), err.exceptions )) assert err assert isinstance(err, RemoteActorError) assert err.boxed_type is AssertionError def test_tractor_cancels_aio(reg_addr): ''' Verify we can cancel a spawned asyncio task gracefully. ''' async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( asyncio_actor, target='aio_sleep_forever', expect_err='trio.Cancelled', infect_asyncio=True, ) # cancel the entire remote runtime await portal.cancel_actor() trio.run(main) def test_trio_cancels_aio(reg_addr): ''' Much like the above test with ``tractor.Portal.cancel_actor()`` except we just use a standard ``trio`` cancellation api. ''' async def main(): with trio.move_on_after(1): # cancel the nursery shortly after boot async with tractor.open_nursery() as n: await n.run_in_actor( asyncio_actor, target='aio_sleep_forever', expect_err='trio.Cancelled', infect_asyncio=True, ) trio.run(main) @tractor.context async def trio_ctx( ctx: tractor.Context, ): await ctx.started('start') # this will block until the ``asyncio`` task sends a "first" # message. with trio.fail_after(2): async with ( trio.open_nursery() as n, tractor.to_asyncio.open_channel_from( sleep_and_err, ) as (first, chan), ): assert first == 'start' # spawn another asyncio task for the cuck of it. n.start_soon( tractor.to_asyncio.run_task, aio_sleep_forever, ) await trio.sleep_forever() @pytest.mark.parametrize( 'parent_cancels', ['context', 'actor', False], ids='parent_actor_cancels_child={}'.format ) def test_context_spawns_aio_task_that_errors( reg_addr, parent_cancels: bool, ): ''' Verify that spawning a task via an intertask channel ctx mngr that errors correctly propagates the error back from the `asyncio`-side task. ''' async def main(): with trio.fail_after(2): async with tractor.open_nursery() as n: p = await n.start_actor( 'aio_daemon', enable_modules=[__name__], infect_asyncio=True, # debug_mode=True, loglevel='cancel', ) async with ( expect_ctxc( yay=parent_cancels == 'actor', ), p.open_context( trio_ctx, ) as (ctx, first), ): assert first == 'start' if parent_cancels == 'actor': await p.cancel_actor() elif parent_cancels == 'context': await ctx.cancel() else: await trio.sleep_forever() async with expect_ctxc( yay=parent_cancels == 'actor', ): await ctx.result() if parent_cancels == 'context': # to tear down sub-acor await p.cancel_actor() return ctx.outcome if parent_cancels: # bc the parent made the cancel request, # the error is not raised locally but instead # the context is exited silently res = trio.run(main) assert isinstance(res, ContextCancelled) assert 'root' in res.canceller[0] else: expect = RemoteActorError with pytest.raises(expect) as excinfo: trio.run(main) err = excinfo.value assert isinstance(err, expect) assert err.boxed_type is AssertionError async def aio_cancel(): '''' Cancel urself boi. ''' await asyncio.sleep(0.5) # cancel and enter sleep task = asyncio.current_task() task.cancel() await aio_sleep_forever() def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): ''' When the `asyncio.Task` cancels itself the `trio` side cshould also cancel and teardown and relay the cancellation cross-process to the caller (parent). ''' async def main(): an: tractor.ActorNursery async with tractor.open_nursery() as an: p: tractor.Portal = await an.run_in_actor( asyncio_actor, target='aio_cancel', expect_err='tractor.to_asyncio.AsyncioCancelled', infect_asyncio=True, ) # NOTE: normally the `an.__aexit__()` waits on the # portal's result but we do it explicitly here # to avoid indent levels. with trio.fail_after(1): await p.wait_for_result() with pytest.raises( expected_exception=(RemoteActorError, ExceptionGroup), ) as excinfo: trio.run(main) # might get multiple `trio.Cancelled`s as well inside an inception err: RemoteActorError|ExceptionGroup = excinfo.value if isinstance(err, ExceptionGroup): err = next(itertools.dropwhile( lambda exc: not isinstance(exc, tractor.RemoteActorError), err.exceptions )) assert err # relayed boxed error should be our `trio`-task's # cancel-signal-proxy-equivalent of `asyncio.CancelledError`. assert err.boxed_type == to_asyncio.AsyncioCancelled # TODO: verify open_channel_from will fail on this.. async def no_to_trio_in_args(): pass async def push_from_aio_task( sequence: Iterable, to_trio: trio.abc.SendChannel, expect_cancel: False, fail_early: bool, ) -> None: try: # sync caller ctx manager to_trio.send_nowait(True) for i in sequence: print(f'asyncio sending {i}') to_trio.send_nowait(i) await asyncio.sleep(0.001) if i == 50 and fail_early: raise Exception print('asyncio streamer complete!') except asyncio.CancelledError: if not expect_cancel: pytest.fail("aio task was cancelled unexpectedly") raise else: if expect_cancel: pytest.fail("aio task wasn't cancelled as expected!?") async def stream_from_aio( exit_early: bool = False, raise_err: bool = False, aio_raise_err: bool = False, fan_out: bool = False, ) -> None: seq = range(100) expect = list(seq) try: pulled = [] async with to_asyncio.open_channel_from( push_from_aio_task, sequence=seq, expect_cancel=raise_err or exit_early, fail_early=aio_raise_err, ) as (first, chan): assert first is True async def consume( chan: Union[ to_asyncio.LinkedTaskChannel, BroadcastReceiver, ], ): async for value in chan: print(f'trio received {value}') pulled.append(value) if value == 50: if raise_err: raise Exception elif exit_early: break if fan_out: # start second task that get's the same stream value set. async with ( # NOTE: this has to come first to avoid # the channel being closed before the nursery # tasks are joined.. chan.subscribe() as br, trio.open_nursery() as n, ): n.start_soon(consume, br) await consume(chan) else: await consume(chan) finally: if ( not raise_err and not exit_early and not aio_raise_err ): if fan_out: # we get double the pulled values in the # ``.subscribe()`` fan out case. doubled = list(itertools.chain(*zip(expect, expect))) expect = doubled[:len(pulled)] assert list(sorted(pulled)) == expect else: assert pulled == expect else: assert not fan_out assert pulled == expect[:51] print('trio guest mode task completed!') @pytest.mark.parametrize( 'fan_out', [False, True], ids='fan_out_w_chan_subscribe={}'.format ) def test_basic_interloop_channel_stream(reg_addr, fan_out): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( stream_from_aio, infect_asyncio=True, fan_out=fan_out, ) # should raise RAE diectly await portal.result() trio.run(main) # TODO: parametrize the above test and avoid the duplication here? def test_trio_error_cancels_intertask_chan(reg_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( stream_from_aio, raise_err=True, infect_asyncio=True, ) # should trigger remote actor error await portal.result() with pytest.raises(RemoteActorError) as excinfo: trio.run(main) # ensure boxed error type excinfo.value.boxed_type is Exception def test_trio_closes_early_and_channel_exits(reg_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( stream_from_aio, exit_early=True, infect_asyncio=True, ) # should raise RAE diectly await portal.result() # should be a quiet exit on a simple channel exit trio.run(main) def test_aio_errors_and_channel_propagates_and_closes(reg_addr): async def main(): async with tractor.open_nursery() as n: portal = await n.run_in_actor( stream_from_aio, aio_raise_err=True, infect_asyncio=True, ) # should trigger RAE directly, not an eg. await portal.result() with pytest.raises( # NOTE: bc we directly wait on `Portal.result()` instead # of capturing it inside the `ActorNursery` machinery. expected_exception=RemoteActorError, ) as excinfo: trio.run(main) excinfo.value.boxed_type is Exception async def aio_echo_server( to_trio: trio.MemorySendChannel, from_trio: asyncio.Queue, ) -> None: to_trio.send_nowait('start') while True: msg = await from_trio.get() # echo the msg back to_trio.send_nowait(msg) # if we get the terminate sentinel # break the echo loop if msg is None: print('breaking aio echo loop') break print('exiting asyncio task') @tractor.context async def trio_to_aio_echo_server( ctx: tractor.Context|None, ): async with to_asyncio.open_channel_from( aio_echo_server, ) as (first, chan): assert first == 'start' await ctx.started(first) async with ctx.open_stream() as stream: async for msg in stream: print(f'asyncio echoing {msg}') await chan.send(msg) out = await chan.receive() # echo back to parent actor-task await stream.send(out) if out is None: try: out = await chan.receive() except trio.EndOfChannel: break else: raise RuntimeError('aio channel never stopped?') @pytest.mark.parametrize( 'raise_error_mid_stream', [False, Exception, KeyboardInterrupt], ids='raise_error={}'.format, ) def test_echoserver_detailed_mechanics( reg_addr, raise_error_mid_stream, ): async def main(): async with tractor.open_nursery() as n: p = await n.start_actor( 'aio_server', enable_modules=[__name__], infect_asyncio=True, ) async with p.open_context( trio_to_aio_echo_server, ) as (ctx, first): assert first == 'start' async with ctx.open_stream() as stream: for i in range(100): await stream.send(i) out = await stream.receive() assert i == out if raise_error_mid_stream and i == 50: raise raise_error_mid_stream # send terminate msg await stream.send(None) out = await stream.receive() assert out is None if out is None: # ensure the stream is stopped # with trio.fail_after(0.1): try: await stream.receive() except trio.EndOfChannel: pass else: pytest.fail( 'stream not stopped after sentinel ?!' ) # TODO: the case where this blocks and # is cancelled by kbi or out of task cancellation await p.cancel_actor() if raise_error_mid_stream: with pytest.raises(raise_error_mid_stream): trio.run(main) else: trio.run(main) @pytest.mark.parametrize( 'raise_error_mid_stream', [ False, Exception, KeyboardInterrupt, ], ids='raise_error={}'.format, ) def test_infected_root_actor( raise_error_mid_stream: bool|Exception, # conftest wide loglevel: str, debug_mode: bool, ): ''' Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True` in the root actor. ''' async def _trio_main(): first: str chan: to_asyncio.LinkedTaskChannel async with ( tractor.open_root_actor( debug_mode=debug_mode, loglevel=loglevel, ), to_asyncio.open_channel_from( aio_echo_server, ) as (first, chan), ): assert first == 'start' for i in range(1000): await chan.send(i) out = await chan.receive() assert out == i print(f'asyncio echoing {i}') if raise_error_mid_stream and i == 500: raise raise_error_mid_stream if out is None: try: out = await chan.receive() except trio.EndOfChannel: break else: raise RuntimeError('aio channel never stopped?') if raise_error_mid_stream: with pytest.raises(raise_error_mid_stream): tractor.to_asyncio.run_as_asyncio_guest( trio_main=_trio_main, ) else: tractor.to_asyncio.run_as_asyncio_guest( trio_main=_trio_main, ) @tractor.context async def manage_file( ctx: tractor.Context, tmp_path_str: str, send_sigint_to: str, trio_side_is_shielded: bool = True, bg_aio_task: bool = False, ): ''' Start an `asyncio` task that just sleeps after registering a context with `Actor.lifetime_stack`. Trigger a SIGINT to kill the actor tree and ensure the stack is closed in the infected mode child. To verify the teardown state just write a tmpfile to the `testdir` and delete it on actor close. ''' tmp_path: Path = Path(tmp_path_str) tmp_file: Path = tmp_path / f'{" ".join(ctx._actor.uid)}.file' # create a the tmp file and tell the parent where it's at assert not tmp_file.is_file() tmp_file.touch() stack: ExitStack = current_actor().lifetime_stack stack.callback(tmp_file.unlink) await ctx.started(( str(tmp_file), os.getpid(), )) # expect to be cancelled from here! try: # NOTE: turns out you don't even need to sched an aio task # since the original issue, even though seemingly was due to # the guest-run being abandoned + a `._debug.pause()` inside # `._runtime._async_main()` (which was originally trying to # debug the `.lifetime_stack` not closing), IS NOT actually # the core issue? # # further notes: # # - `trio` only issues the " RuntimeWarning: Trio guest run # got abandoned without properly finishing... weird stuff # might happen" IFF you DO run a asyncio task here, BUT # - the original issue of the `.lifetime_stack` not closing # will still happen even if you don't run an `asyncio` task # here even though the "abandon" messgage won't be shown.. # # => ????? honestly i'm lost but it seems to be some issue # with `asyncio` and SIGINT.. # # honestly, this REALLY reminds me why i haven't used # `asyncio` by choice in years.. XD # async with trio.open_nursery() as tn: if bg_aio_task: tn.start_soon( tractor.to_asyncio.run_task, aio_sleep_forever, ) # XXX don't-need/doesn't-make-a-diff right # since we're already doing it from parent? # if send_sigint_to == 'child': # os.kill( # os.getpid(), # signal.SIGINT, # ) # XXX spend a half sec doing shielded checkpointing to # ensure that despite the `trio`-side task ignoring the # SIGINT, the `asyncio` side won't abandon the guest-run! if trio_side_is_shielded: with trio.CancelScope(shield=True): for i in range(5): await trio.sleep(0.1) await trio.sleep_forever() # signalled manually at the OS level (aka KBI) by the parent actor. except KeyboardInterrupt: print('child raised KBI..') assert tmp_file.exists() raise raise RuntimeError('shoulda received a KBI?') @pytest.mark.parametrize( 'trio_side_is_shielded', [ False, True, ], ids=[ 'trio_side_no_shielding', 'trio_side_does_shielded_work', ], ) @pytest.mark.parametrize( 'send_sigint_to', [ 'child', 'parent', ], ids='send_SIGINT_to={}'.format, ) @pytest.mark.parametrize( 'bg_aio_task', [ False, # NOTE: (and see notes in `manage_file()` above as well) if # we FOR SURE SPAWN AN AIO TASK in the child it seems the # "silent-abandon" case (as is described in detail in # `to_asyncio.run_as_asyncio_guest()`) does not happen and # `asyncio`'s loop will at least abandon the `trio` side # loudly? .. prolly the state-spot to start looking for # a soln that results in NO ABANDONMENT.. XD True, ], ids=[ 'bg_aio_task', 'just_trio_slee', ], ) @pytest.mark.parametrize( 'wait_for_ctx', [ False, True, ], ids=[ 'raise_KBI_in_rent', 'wait_for_ctx', ], ) def test_sigint_closes_lifetime_stack( tmp_path: Path, wait_for_ctx: bool, bg_aio_task: bool, trio_side_is_shielded: bool, debug_mode: bool, send_sigint_to: str, ): ''' Ensure that an infected child can use the `Actor.lifetime_stack` to make a file on boot and it's automatically cleaned up by the actor-lifetime-linked exit stack closure. ''' async def main(): try: an: tractor.ActorNursery async with tractor.open_nursery( debug_mode=debug_mode, ) as an: p: tractor.Portal = await an.start_actor( 'file_mngr', enable_modules=[__name__], infect_asyncio=True, ) async with p.open_context( manage_file, tmp_path_str=str(tmp_path), send_sigint_to=send_sigint_to, bg_aio_task=bg_aio_task, trio_side_is_shielded=trio_side_is_shielded, ) as (ctx, first): path_str, cpid = first tmp_file: Path = Path(path_str) assert tmp_file.exists() # XXX originally to simulate what (hopefully) # the below now triggers.. had to manually # trigger a SIGINT from a ctl-c in the root. # await trio.sleep_forever() # XXX NOTE XXX signal infected-`asyncio` child to # OS-cancel with SIGINT; this should trigger the # bad `asyncio` cancel behaviour that can cause # a guest-run abandon as was seen causing # shm-buffer leaks in `piker`'s live quote stream # susbys! # await trio.sleep(.2) pid: int = ( cpid if send_sigint_to == 'child' else os.getpid() ) os.kill( pid, signal.SIGINT, ) # XXX CASE 1: without the bug fixed, in # the non-KBI-raised-in-parent case, this # timeout should trigger! if wait_for_ctx: print('waiting for ctx outcome in parent..') try: with trio.fail_after(1): await ctx.wait_for_result() except tractor.ContextCancelled as ctxc: assert ctxc.canceller == ctx.chan.uid raise # XXX CASE 2: this seems to be the source of the # original issue which exhibited BEFORE we put # a `Actor.cancel_soon()` inside # `run_as_asyncio_guest()`.. else: raise KeyboardInterrupt pytest.fail('should have raised some kinda error?!?') except ( KeyboardInterrupt, ContextCancelled, ): # XXX CASE 2: without the bug fixed, in the # KBI-raised-in-parent case, the actor teardown should # never get run (silently abaondoned by `asyncio`..) and # thus the file should leak! assert not tmp_file.exists() assert ctx.maybe_error trio.run(main) # TODO: debug_mode tests once we get support for `asyncio`! # # -[ ] need tests to wrap both scripts: # - [ ] infected_asyncio_echo_server.py # - [ ] debugging/asyncio_bp.py # -[ ] consider moving ^ (some of) these ^ to `test_debugger`? # # -[ ] missing impl outstanding includes: # - [x] for sync pauses we need to ensure we open yet another # `greenback` portal in the asyncio task # => completed using `.bestow_portal(task)` inside # `.to_asyncio._run_asyncio_task()` right? # -[ ] translation func to get from `asyncio` task calling to # `._debug.wait_for_parent_stdin_hijack()` which does root # call to do TTY locking. # def test_sync_breakpoint(): ''' Verify we can do sync-func/code breakpointing using the `breakpoint()` builtin inside infected mode actors. ''' pytest.xfail('This support is not implemented yet!') def test_debug_mode_crash_handling(): ''' Verify mult-actor crash handling works with a combo of infected-`asyncio`-mode and normal `trio` actors despite nested process trees. ''' pytest.xfail('This support is not implemented yet!')