From 0a0d30d1081fe1404e0b17a911627519db86ca99 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Dec 2024 22:23:17 -0500 Subject: [PATCH] Support and test infected-`asyncio`-mode for root Such that you can use, ```python tractor.to_asyncio.run_as_asyncio_guest( trio_main=_trio_main, ) ``` to boostrap the root actor (and thus main parent process) to embed the actor-rumtime into an `asyncio` loop. Prove it all works with an subactor-free version of the aio echo-server test suite B) --- tests/test_infected_asyncio.py | 112 +++++++++++++++++++++++++-------- tractor/_root.py | 4 ++ tractor/to_asyncio.py | 5 ++ 3 files changed, 96 insertions(+), 25 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index f5fa0aa..b0a1171 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -5,6 +5,7 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'. import asyncio import builtins from contextlib import ExitStack +from functools import partial import itertools import importlib import os @@ -536,41 +537,40 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr): excinfo.value.boxed_type is Exception +async def aio_echo_server( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, +) -> None: + + to_trio.send_nowait('start') + + while True: + msg = await from_trio.get() + + # echo the msg back + to_trio.send_nowait(msg) + + # if we get the terminate sentinel + # break the echo loop + if msg is None: + print('breaking aio echo loop') + break + + print('exiting asyncio task') + + @tractor.context async def trio_to_aio_echo_server( - ctx: tractor.Context, + ctx: tractor.Context|None, ): - - async def aio_echo_server( - to_trio: trio.MemorySendChannel, - from_trio: asyncio.Queue, - ) -> None: - - to_trio.send_nowait('start') - - while True: - msg = await from_trio.get() - - # echo the msg back - to_trio.send_nowait(msg) - - # if we get the terminate sentinel - # break the echo loop - if msg is None: - print('breaking aio echo loop') - break - - print('exiting asyncio task') - async with to_asyncio.open_channel_from( aio_echo_server, ) as (first, chan): - assert first == 'start' + await ctx.started(first) async with ctx.open_stream() as stream: - async for msg in stream: print(f'asyncio echoing {msg}') await chan.send(msg) @@ -649,6 +649,68 @@ def test_echoserver_detailed_mechanics( trio.run(main) +@pytest.mark.parametrize( + 'raise_error_mid_stream', + [ + False, + Exception, + KeyboardInterrupt, + ], + ids='raise_error={}'.format, +) +def test_infected_root_actor( + raise_error_mid_stream: bool|Exception, + # conftest wide + loglevel: str, + debug_mode: bool, +): + ''' + Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True` + in the root actor. + + ''' + async def _trio_main(): + + first: str + chan: to_asyncio.LinkedTaskChannel + async with ( + tractor.open_root_actor( + debug_mode=debug_mode, + loglevel=loglevel, + ), + to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan), + ): + assert first == 'start' + + for i in range(1000): + await chan.send(i) + out = await chan.receive() + assert out == i + print(f'asyncio echoing {i}') + + if raise_error_mid_stream and i == 500: + raise raise_error_mid_stream + + if out is None: + try: + out = await chan.receive() + except trio.EndOfChannel: + break + else: + raise RuntimeError('aio channel never stopped?') + + if raise_error_mid_stream: + with pytest.raises(raise_error_mid_stream): + tractor.to_asyncio.run_as_asyncio_guest( + trio_main=_trio_main, + ) + else: + tractor.to_asyncio.run_as_asyncio_guest( + trio_main=_trio_main, + ) + @tractor.context async def manage_file( diff --git a/tractor/_root.py b/tractor/_root.py index bcdee3e..38ddbe2 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -334,6 +334,10 @@ async def open_root_actor( loglevel=loglevel, enable_modules=enable_modules, ) + # XXX, in case the root actor runtime was actually run from + # `tractor.to_asyncio.run_as_asyncio_guest()` and NOt + # `.trio.run()`. + actor._infected_aio = _state._runtime_vars['_is_infected_aio'] # Start up main task set via core actor-runtime nurseries. try: diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index f2a8570..24f1ace 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -36,6 +36,7 @@ import tractor from tractor._exceptions import AsyncioCancelled from tractor._state import ( debug_mode, + _runtime_vars, ) from tractor.devx import _debug from tractor.log import get_logger @@ -767,12 +768,16 @@ def run_as_asyncio_guest( 'Infecting `asyncio`-process with a `trio` guest-run!\n' ) + # TODO, somehow bootstrap this! + _runtime_vars['_is_infected_aio'] = True + trio.lowlevel.start_guest_run( trio_main, run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, ) fute_err: BaseException|None = None + try: out: Outcome = await asyncio.shield(trio_done_fute)