Compare commits

..

No commits in common. "0a0d30d1081fe1404e0b17a911627519db86ca99" and "840c328f192ee7c359a05a4f0870c1d1370f8ada" have entirely different histories.

5 changed files with 40 additions and 117 deletions

View File

@ -5,7 +5,6 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'.
import asyncio
import builtins
from contextlib import ExitStack
from functools import partial
import itertools
import importlib
import os
@ -537,6 +536,11 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
excinfo.value.boxed_type is Exception
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
@ -558,19 +562,15 @@ async def aio_echo_server(
print('exiting asyncio task')
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context|None,
):
async with to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):
assert first == 'start'
assert first == 'start'
await ctx.started(first)
async with ctx.open_stream() as stream:
async for msg in stream:
print(f'asyncio echoing {msg}')
await chan.send(msg)
@ -649,68 +649,6 @@ def test_echoserver_detailed_mechanics(
trio.run(main)
@pytest.mark.parametrize(
'raise_error_mid_stream',
[
False,
Exception,
KeyboardInterrupt,
],
ids='raise_error={}'.format,
)
def test_infected_root_actor(
raise_error_mid_stream: bool|Exception,
# conftest wide
loglevel: str,
debug_mode: bool,
):
'''
Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True`
in the root actor.
'''
async def _trio_main():
first: str
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan),
):
assert first == 'start'
for i in range(1000):
await chan.send(i)
out = await chan.receive()
assert out == i
print(f'asyncio echoing {i}')
if raise_error_mid_stream and i == 500:
raise raise_error_mid_stream
if out is None:
try:
out = await chan.receive()
except trio.EndOfChannel:
break
else:
raise RuntimeError('aio channel never stopped?')
if raise_error_mid_stream:
with pytest.raises(raise_error_mid_stream):
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
else:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
@tractor.context
async def manage_file(

View File

@ -47,9 +47,6 @@ from functools import partial
import inspect
from pprint import pformat
import textwrap
from types import (
UnionType,
)
from typing import (
Any,
AsyncGenerator,
@ -2547,14 +2544,7 @@ def context(
name: str
param: Type
for name, param in annots.items():
if (
param is Context
or (
isinstance(param, UnionType)
and
Context in param.__args__
)
):
if param is Context:
ctx_var_name: str = name
break
else:

View File

@ -334,10 +334,6 @@ async def open_root_actor(
loglevel=loglevel,
enable_modules=enable_modules,
)
# XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# Start up main task set via core actor-runtime nurseries.
try:

View File

@ -160,8 +160,8 @@ async def open_service_mngr(
):
# impl specific obvi..
init_kwargs.update({
'an': an,
'tn': tn,
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
@ -174,11 +174,15 @@ async def open_service_mngr(
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.an = an
mngr.tn = tn
mngr.actor_n = an
mngr.service_n = tn
else:
assert (mngr.an and mngr.tn)
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
@ -345,8 +349,8 @@ class ServiceMngr:
process tree.
'''
an: ActorNursery
tn: trio.Nursery
actor_n: ActorNursery
service_n: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
@ -419,7 +423,7 @@ class ServiceMngr:
(
cs,
complete,
) = await self.tn.start(_task_manager_start)
) = await self.service_n.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
@ -481,7 +485,7 @@ class ServiceMngr:
for details.
'''
cs, ipc_ctx, complete, started = await self.tn.start(
cs, ipc_ctx, complete, started = await self.service_n.start(
functools.partial(
_open_and_supervise_service_ctx,
serman=self,
@ -538,7 +542,7 @@ class ServiceMngr:
return sub_ctx
if daemon_name not in self.service_ctxs:
portal: Portal = await self.an.start_actor(
portal: Portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode

View File

@ -36,7 +36,6 @@ import tractor
from tractor._exceptions import AsyncioCancelled
from tractor._state import (
debug_mode,
_runtime_vars,
)
from tractor.devx import _debug
from tractor.log import get_logger
@ -768,16 +767,12 @@ def run_as_asyncio_guest(
'Infecting `asyncio`-process with a `trio` guest-run!\n'
)
# TODO, somehow bootstrap this!
_runtime_vars['_is_infected_aio'] = True
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
fute_err: BaseException|None = None
try:
out: Outcome = await asyncio.shield(trio_done_fute)