Support and test infected-`asyncio`-mode for root
Such that you can use, ```python tractor.to_asyncio.run_as_asyncio_guest( trio_main=_trio_main, ) ``` to boostrap the root actor (and thus main parent process) to embed the actor-rumtime into an `asyncio` loop. Prove it all works with an subactor-free version of the aio echo-server test suite B)hilevel_serman
parent
dcb6706489
commit
0a0d30d108
|
@ -5,6 +5,7 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'.
|
||||||
import asyncio
|
import asyncio
|
||||||
import builtins
|
import builtins
|
||||||
from contextlib import ExitStack
|
from contextlib import ExitStack
|
||||||
|
from functools import partial
|
||||||
import itertools
|
import itertools
|
||||||
import importlib
|
import importlib
|
||||||
import os
|
import os
|
||||||
|
@ -536,41 +537,40 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||||
excinfo.value.boxed_type is Exception
|
excinfo.value.boxed_type is Exception
|
||||||
|
|
||||||
|
|
||||||
|
async def aio_echo_server(
|
||||||
|
to_trio: trio.MemorySendChannel,
|
||||||
|
from_trio: asyncio.Queue,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
to_trio.send_nowait('start')
|
||||||
|
|
||||||
|
while True:
|
||||||
|
msg = await from_trio.get()
|
||||||
|
|
||||||
|
# echo the msg back
|
||||||
|
to_trio.send_nowait(msg)
|
||||||
|
|
||||||
|
# if we get the terminate sentinel
|
||||||
|
# break the echo loop
|
||||||
|
if msg is None:
|
||||||
|
print('breaking aio echo loop')
|
||||||
|
break
|
||||||
|
|
||||||
|
print('exiting asyncio task')
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def trio_to_aio_echo_server(
|
async def trio_to_aio_echo_server(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context|None,
|
||||||
):
|
):
|
||||||
|
|
||||||
async def aio_echo_server(
|
|
||||||
to_trio: trio.MemorySendChannel,
|
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
to_trio.send_nowait('start')
|
|
||||||
|
|
||||||
while True:
|
|
||||||
msg = await from_trio.get()
|
|
||||||
|
|
||||||
# echo the msg back
|
|
||||||
to_trio.send_nowait(msg)
|
|
||||||
|
|
||||||
# if we get the terminate sentinel
|
|
||||||
# break the echo loop
|
|
||||||
if msg is None:
|
|
||||||
print('breaking aio echo loop')
|
|
||||||
break
|
|
||||||
|
|
||||||
print('exiting asyncio task')
|
|
||||||
|
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
aio_echo_server,
|
aio_echo_server,
|
||||||
) as (first, chan):
|
) as (first, chan):
|
||||||
|
|
||||||
assert first == 'start'
|
assert first == 'start'
|
||||||
|
|
||||||
await ctx.started(first)
|
await ctx.started(first)
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
print(f'asyncio echoing {msg}')
|
print(f'asyncio echoing {msg}')
|
||||||
await chan.send(msg)
|
await chan.send(msg)
|
||||||
|
@ -649,6 +649,68 @@ def test_echoserver_detailed_mechanics(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'raise_error_mid_stream',
|
||||||
|
[
|
||||||
|
False,
|
||||||
|
Exception,
|
||||||
|
KeyboardInterrupt,
|
||||||
|
],
|
||||||
|
ids='raise_error={}'.format,
|
||||||
|
)
|
||||||
|
def test_infected_root_actor(
|
||||||
|
raise_error_mid_stream: bool|Exception,
|
||||||
|
# conftest wide
|
||||||
|
loglevel: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True`
|
||||||
|
in the root actor.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def _trio_main():
|
||||||
|
|
||||||
|
first: str
|
||||||
|
chan: to_asyncio.LinkedTaskChannel
|
||||||
|
async with (
|
||||||
|
tractor.open_root_actor(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
),
|
||||||
|
to_asyncio.open_channel_from(
|
||||||
|
aio_echo_server,
|
||||||
|
) as (first, chan),
|
||||||
|
):
|
||||||
|
assert first == 'start'
|
||||||
|
|
||||||
|
for i in range(1000):
|
||||||
|
await chan.send(i)
|
||||||
|
out = await chan.receive()
|
||||||
|
assert out == i
|
||||||
|
print(f'asyncio echoing {i}')
|
||||||
|
|
||||||
|
if raise_error_mid_stream and i == 500:
|
||||||
|
raise raise_error_mid_stream
|
||||||
|
|
||||||
|
if out is None:
|
||||||
|
try:
|
||||||
|
out = await chan.receive()
|
||||||
|
except trio.EndOfChannel:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise RuntimeError('aio channel never stopped?')
|
||||||
|
|
||||||
|
if raise_error_mid_stream:
|
||||||
|
with pytest.raises(raise_error_mid_stream):
|
||||||
|
tractor.to_asyncio.run_as_asyncio_guest(
|
||||||
|
trio_main=_trio_main,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
tractor.to_asyncio.run_as_asyncio_guest(
|
||||||
|
trio_main=_trio_main,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def manage_file(
|
async def manage_file(
|
||||||
|
|
|
@ -334,6 +334,10 @@ async def open_root_actor(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
enable_modules=enable_modules,
|
enable_modules=enable_modules,
|
||||||
)
|
)
|
||||||
|
# XXX, in case the root actor runtime was actually run from
|
||||||
|
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
|
||||||
|
# `.trio.run()`.
|
||||||
|
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
|
||||||
|
|
||||||
# Start up main task set via core actor-runtime nurseries.
|
# Start up main task set via core actor-runtime nurseries.
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -36,6 +36,7 @@ import tractor
|
||||||
from tractor._exceptions import AsyncioCancelled
|
from tractor._exceptions import AsyncioCancelled
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
debug_mode,
|
debug_mode,
|
||||||
|
_runtime_vars,
|
||||||
)
|
)
|
||||||
from tractor.devx import _debug
|
from tractor.devx import _debug
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
@ -767,12 +768,16 @@ def run_as_asyncio_guest(
|
||||||
'Infecting `asyncio`-process with a `trio` guest-run!\n'
|
'Infecting `asyncio`-process with a `trio` guest-run!\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO, somehow bootstrap this!
|
||||||
|
_runtime_vars['_is_infected_aio'] = True
|
||||||
|
|
||||||
trio.lowlevel.start_guest_run(
|
trio.lowlevel.start_guest_run(
|
||||||
trio_main,
|
trio_main,
|
||||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||||
done_callback=trio_done_callback,
|
done_callback=trio_done_callback,
|
||||||
)
|
)
|
||||||
fute_err: BaseException|None = None
|
fute_err: BaseException|None = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
out: Outcome = await asyncio.shield(trio_done_fute)
|
out: Outcome = await asyncio.shield(trio_done_fute)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue