Compare commits

..

3 Commits

Author SHA1 Message Date
Tyler Goodlet 0a0d30d108 Support and test infected-`asyncio`-mode for root
Such that you can use,

```python

    tractor.to_asyncio.run_as_asyncio_guest(
        trio_main=_trio_main,
    )
```

to boostrap the root actor (and thus main parent process) to embed
the actor-rumtime into an `asyncio` loop. Prove it all works with an
subactor-free version of the aio echo-server test suite B)
2024-12-11 22:23:17 -05:00
Tyler Goodlet dcb6706489 Support `ctx: UnionType` annots for `@tractor.context` eps 2024-12-11 22:22:26 -05:00
Tyler Goodlet 170e198683 Use shorthand nursery var-names per convention in codebase 2024-12-11 20:26:13 -05:00
5 changed files with 117 additions and 40 deletions

View File

@ -5,6 +5,7 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'.
import asyncio
import builtins
from contextlib import ExitStack
from functools import partial
import itertools
import importlib
import os
@ -536,15 +537,10 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
excinfo.value.boxed_type is Exception
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
async def aio_echo_server(
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
) -> None:
) -> None:
to_trio.send_nowait('start')
@ -562,15 +558,19 @@ async def trio_to_aio_echo_server(
print('exiting asyncio task')
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context|None,
):
async with to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):
assert first == 'start'
await ctx.started(first)
async with ctx.open_stream() as stream:
async for msg in stream:
print(f'asyncio echoing {msg}')
await chan.send(msg)
@ -649,6 +649,68 @@ def test_echoserver_detailed_mechanics(
trio.run(main)
@pytest.mark.parametrize(
'raise_error_mid_stream',
[
False,
Exception,
KeyboardInterrupt,
],
ids='raise_error={}'.format,
)
def test_infected_root_actor(
raise_error_mid_stream: bool|Exception,
# conftest wide
loglevel: str,
debug_mode: bool,
):
'''
Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True`
in the root actor.
'''
async def _trio_main():
first: str
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan),
):
assert first == 'start'
for i in range(1000):
await chan.send(i)
out = await chan.receive()
assert out == i
print(f'asyncio echoing {i}')
if raise_error_mid_stream and i == 500:
raise raise_error_mid_stream
if out is None:
try:
out = await chan.receive()
except trio.EndOfChannel:
break
else:
raise RuntimeError('aio channel never stopped?')
if raise_error_mid_stream:
with pytest.raises(raise_error_mid_stream):
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
else:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
@tractor.context
async def manage_file(

View File

@ -47,6 +47,9 @@ from functools import partial
import inspect
from pprint import pformat
import textwrap
from types import (
UnionType,
)
from typing import (
Any,
AsyncGenerator,
@ -2544,7 +2547,14 @@ def context(
name: str
param: Type
for name, param in annots.items():
if param is Context:
if (
param is Context
or (
isinstance(param, UnionType)
and
Context in param.__args__
)
):
ctx_var_name: str = name
break
else:

View File

@ -334,6 +334,10 @@ async def open_root_actor(
loglevel=loglevel,
enable_modules=enable_modules,
)
# XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# Start up main task set via core actor-runtime nurseries.
try:

View File

@ -160,8 +160,8 @@ async def open_service_mngr(
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
'an': an,
'tn': tn,
})
mngr: ServiceMngr|None
@ -174,15 +174,11 @@ async def open_service_mngr(
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
mngr.an = an
mngr.tn = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
assert (mngr.an and mngr.tn)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
@ -349,8 +345,8 @@ class ServiceMngr:
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery
an: ActorNursery
tn: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
@ -423,7 +419,7 @@ class ServiceMngr:
(
cs,
complete,
) = await self.service_n.start(_task_manager_start)
) = await self.tn.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
@ -485,7 +481,7 @@ class ServiceMngr:
for details.
'''
cs, ipc_ctx, complete, started = await self.service_n.start(
cs, ipc_ctx, complete, started = await self.tn.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
@ -542,7 +538,7 @@ class ServiceMngr:
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.actor_n.start_actor(
portal: Portal = await self.an.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode

View File

@ -36,6 +36,7 @@ import tractor
from tractor._exceptions import AsyncioCancelled
from tractor._state import (
debug_mode,
_runtime_vars,
)
from tractor.devx import _debug
from tractor.log import get_logger
@ -767,12 +768,16 @@ def run_as_asyncio_guest(
'Infecting `asyncio`-process with a `trio` guest-run!\n'
)
# TODO, somehow bootstrap this!
_runtime_vars['_is_infected_aio'] = True
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
fute_err: BaseException|None = None
try:
out: Outcome = await asyncio.shield(trio_done_fute)