Compare commits

..

No commits in common. "0a0d30d1081fe1404e0b17a911627519db86ca99" and "840c328f192ee7c359a05a4f0870c1d1370f8ada" have entirely different histories.

5 changed files with 40 additions and 117 deletions

View File

@ -5,7 +5,6 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'.
import asyncio import asyncio
import builtins import builtins
from contextlib import ExitStack from contextlib import ExitStack
from functools import partial
import itertools import itertools
import importlib import importlib
import os import os
@ -537,10 +536,15 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
excinfo.value.boxed_type is Exception excinfo.value.boxed_type is Exception
async def aio_echo_server( @tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
async def aio_echo_server(
to_trio: trio.MemorySendChannel, to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
) -> None: ) -> None:
to_trio.send_nowait('start') to_trio.send_nowait('start')
@ -558,19 +562,15 @@ async def aio_echo_server(
print('exiting asyncio task') print('exiting asyncio task')
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context|None,
):
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
aio_echo_server, aio_echo_server,
) as (first, chan): ) as (first, chan):
assert first == 'start'
assert first == 'start'
await ctx.started(first) await ctx.started(first)
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for msg in stream: async for msg in stream:
print(f'asyncio echoing {msg}') print(f'asyncio echoing {msg}')
await chan.send(msg) await chan.send(msg)
@ -649,68 +649,6 @@ def test_echoserver_detailed_mechanics(
trio.run(main) trio.run(main)
@pytest.mark.parametrize(
'raise_error_mid_stream',
[
False,
Exception,
KeyboardInterrupt,
],
ids='raise_error={}'.format,
)
def test_infected_root_actor(
raise_error_mid_stream: bool|Exception,
# conftest wide
loglevel: str,
debug_mode: bool,
):
'''
Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True`
in the root actor.
'''
async def _trio_main():
first: str
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan),
):
assert first == 'start'
for i in range(1000):
await chan.send(i)
out = await chan.receive()
assert out == i
print(f'asyncio echoing {i}')
if raise_error_mid_stream and i == 500:
raise raise_error_mid_stream
if out is None:
try:
out = await chan.receive()
except trio.EndOfChannel:
break
else:
raise RuntimeError('aio channel never stopped?')
if raise_error_mid_stream:
with pytest.raises(raise_error_mid_stream):
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
else:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
@tractor.context @tractor.context
async def manage_file( async def manage_file(

View File

@ -47,9 +47,6 @@ from functools import partial
import inspect import inspect
from pprint import pformat from pprint import pformat
import textwrap import textwrap
from types import (
UnionType,
)
from typing import ( from typing import (
Any, Any,
AsyncGenerator, AsyncGenerator,
@ -2547,14 +2544,7 @@ def context(
name: str name: str
param: Type param: Type
for name, param in annots.items(): for name, param in annots.items():
if ( if param is Context:
param is Context
or (
isinstance(param, UnionType)
and
Context in param.__args__
)
):
ctx_var_name: str = name ctx_var_name: str = name
break break
else: else:

View File

@ -334,10 +334,6 @@ async def open_root_actor(
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
) )
# XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# Start up main task set via core actor-runtime nurseries. # Start up main task set via core actor-runtime nurseries.
try: try:

View File

@ -160,8 +160,8 @@ async def open_service_mngr(
): ):
# impl specific obvi.. # impl specific obvi..
init_kwargs.update({ init_kwargs.update({
'an': an, 'actor_n': an,
'tn': tn, 'service_n': tn,
}) })
mngr: ServiceMngr|None mngr: ServiceMngr|None
@ -174,11 +174,15 @@ async def open_service_mngr(
# eventual `@singleton_acm` API wrapper. # eventual `@singleton_acm` API wrapper.
# #
# assign globally for future daemon/task creation # assign globally for future daemon/task creation
mngr.an = an mngr.actor_n = an
mngr.tn = tn mngr.service_n = tn
else: else:
assert (mngr.an and mngr.tn) assert (
mngr.actor_n
and
mngr.service_tn
)
log.info( log.info(
'Using extant service mngr!\n\n' 'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
@ -345,8 +349,8 @@ class ServiceMngr:
process tree. process tree.
''' '''
an: ActorNursery actor_n: ActorNursery
tn: trio.Nursery service_n: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[
@ -419,7 +423,7 @@ class ServiceMngr:
( (
cs, cs,
complete, complete,
) = await self.tn.start(_task_manager_start) ) = await self.service_n.start(_task_manager_start)
# store the cancel scope and portal for later cancellation or # store the cancel scope and portal for later cancellation or
# retstart if needed. # retstart if needed.
@ -481,7 +485,7 @@ class ServiceMngr:
for details. for details.
''' '''
cs, ipc_ctx, complete, started = await self.tn.start( cs, ipc_ctx, complete, started = await self.service_n.start(
functools.partial( functools.partial(
_open_and_supervise_service_ctx, _open_and_supervise_service_ctx,
serman=self, serman=self,
@ -538,7 +542,7 @@ class ServiceMngr:
return sub_ctx return sub_ctx
if daemon_name not in self.service_ctxs: if daemon_name not in self.service_ctxs:
portal: Portal = await self.an.start_actor( portal: Portal = await self.actor_n.start_actor(
daemon_name, daemon_name,
debug_mode=( # maybe set globally during allocate debug_mode=( # maybe set globally during allocate
debug_mode debug_mode

View File

@ -36,7 +36,6 @@ import tractor
from tractor._exceptions import AsyncioCancelled from tractor._exceptions import AsyncioCancelled
from tractor._state import ( from tractor._state import (
debug_mode, debug_mode,
_runtime_vars,
) )
from tractor.devx import _debug from tractor.devx import _debug
from tractor.log import get_logger from tractor.log import get_logger
@ -768,16 +767,12 @@ def run_as_asyncio_guest(
'Infecting `asyncio`-process with a `trio` guest-run!\n' 'Infecting `asyncio`-process with a `trio` guest-run!\n'
) )
# TODO, somehow bootstrap this!
_runtime_vars['_is_infected_aio'] = True
trio.lowlevel.start_guest_run( trio.lowlevel.start_guest_run(
trio_main, trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe, run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback, done_callback=trio_done_callback,
) )
fute_err: BaseException|None = None fute_err: BaseException|None = None
try: try:
out: Outcome = await asyncio.shield(trio_done_fute) out: Outcome = await asyncio.shield(trio_done_fute)