Compare commits
3 Commits
840c328f19
...
0a0d30d108
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 0a0d30d108 | |
Tyler Goodlet | dcb6706489 | |
Tyler Goodlet | 170e198683 |
|
@ -5,6 +5,7 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'.
|
|||
import asyncio
|
||||
import builtins
|
||||
from contextlib import ExitStack
|
||||
from functools import partial
|
||||
import itertools
|
||||
import importlib
|
||||
import os
|
||||
|
@ -536,15 +537,10 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
|||
excinfo.value.boxed_type is Exception
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def trio_to_aio_echo_server(
|
||||
ctx: tractor.Context,
|
||||
):
|
||||
|
||||
async def aio_echo_server(
|
||||
async def aio_echo_server(
|
||||
to_trio: trio.MemorySendChannel,
|
||||
from_trio: asyncio.Queue,
|
||||
) -> None:
|
||||
) -> None:
|
||||
|
||||
to_trio.send_nowait('start')
|
||||
|
||||
|
@ -562,15 +558,19 @@ async def trio_to_aio_echo_server(
|
|||
|
||||
print('exiting asyncio task')
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def trio_to_aio_echo_server(
|
||||
ctx: tractor.Context|None,
|
||||
):
|
||||
async with to_asyncio.open_channel_from(
|
||||
aio_echo_server,
|
||||
) as (first, chan):
|
||||
|
||||
assert first == 'start'
|
||||
|
||||
await ctx.started(first)
|
||||
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
async for msg in stream:
|
||||
print(f'asyncio echoing {msg}')
|
||||
await chan.send(msg)
|
||||
|
@ -649,6 +649,68 @@ def test_echoserver_detailed_mechanics(
|
|||
trio.run(main)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'raise_error_mid_stream',
|
||||
[
|
||||
False,
|
||||
Exception,
|
||||
KeyboardInterrupt,
|
||||
],
|
||||
ids='raise_error={}'.format,
|
||||
)
|
||||
def test_infected_root_actor(
|
||||
raise_error_mid_stream: bool|Exception,
|
||||
# conftest wide
|
||||
loglevel: str,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True`
|
||||
in the root actor.
|
||||
|
||||
'''
|
||||
async def _trio_main():
|
||||
|
||||
first: str
|
||||
chan: to_asyncio.LinkedTaskChannel
|
||||
async with (
|
||||
tractor.open_root_actor(
|
||||
debug_mode=debug_mode,
|
||||
loglevel=loglevel,
|
||||
),
|
||||
to_asyncio.open_channel_from(
|
||||
aio_echo_server,
|
||||
) as (first, chan),
|
||||
):
|
||||
assert first == 'start'
|
||||
|
||||
for i in range(1000):
|
||||
await chan.send(i)
|
||||
out = await chan.receive()
|
||||
assert out == i
|
||||
print(f'asyncio echoing {i}')
|
||||
|
||||
if raise_error_mid_stream and i == 500:
|
||||
raise raise_error_mid_stream
|
||||
|
||||
if out is None:
|
||||
try:
|
||||
out = await chan.receive()
|
||||
except trio.EndOfChannel:
|
||||
break
|
||||
else:
|
||||
raise RuntimeError('aio channel never stopped?')
|
||||
|
||||
if raise_error_mid_stream:
|
||||
with pytest.raises(raise_error_mid_stream):
|
||||
tractor.to_asyncio.run_as_asyncio_guest(
|
||||
trio_main=_trio_main,
|
||||
)
|
||||
else:
|
||||
tractor.to_asyncio.run_as_asyncio_guest(
|
||||
trio_main=_trio_main,
|
||||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def manage_file(
|
||||
|
|
|
@ -47,6 +47,9 @@ from functools import partial
|
|||
import inspect
|
||||
from pprint import pformat
|
||||
import textwrap
|
||||
from types import (
|
||||
UnionType,
|
||||
)
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncGenerator,
|
||||
|
@ -2544,7 +2547,14 @@ def context(
|
|||
name: str
|
||||
param: Type
|
||||
for name, param in annots.items():
|
||||
if param is Context:
|
||||
if (
|
||||
param is Context
|
||||
or (
|
||||
isinstance(param, UnionType)
|
||||
and
|
||||
Context in param.__args__
|
||||
)
|
||||
):
|
||||
ctx_var_name: str = name
|
||||
break
|
||||
else:
|
||||
|
|
|
@ -334,6 +334,10 @@ async def open_root_actor(
|
|||
loglevel=loglevel,
|
||||
enable_modules=enable_modules,
|
||||
)
|
||||
# XXX, in case the root actor runtime was actually run from
|
||||
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
|
||||
# `.trio.run()`.
|
||||
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
|
||||
|
||||
# Start up main task set via core actor-runtime nurseries.
|
||||
try:
|
||||
|
|
|
@ -160,8 +160,8 @@ async def open_service_mngr(
|
|||
):
|
||||
# impl specific obvi..
|
||||
init_kwargs.update({
|
||||
'actor_n': an,
|
||||
'service_n': tn,
|
||||
'an': an,
|
||||
'tn': tn,
|
||||
})
|
||||
|
||||
mngr: ServiceMngr|None
|
||||
|
@ -174,15 +174,11 @@ async def open_service_mngr(
|
|||
# eventual `@singleton_acm` API wrapper.
|
||||
#
|
||||
# assign globally for future daemon/task creation
|
||||
mngr.actor_n = an
|
||||
mngr.service_n = tn
|
||||
mngr.an = an
|
||||
mngr.tn = tn
|
||||
|
||||
else:
|
||||
assert (
|
||||
mngr.actor_n
|
||||
and
|
||||
mngr.service_tn
|
||||
)
|
||||
assert (mngr.an and mngr.tn)
|
||||
log.info(
|
||||
'Using extant service mngr!\n\n'
|
||||
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
|
||||
|
@ -349,8 +345,8 @@ class ServiceMngr:
|
|||
process tree.
|
||||
|
||||
'''
|
||||
actor_n: ActorNursery
|
||||
service_n: trio.Nursery
|
||||
an: ActorNursery
|
||||
tn: trio.Nursery
|
||||
debug_mode: bool = False # tractor sub-actor debug mode flag
|
||||
|
||||
service_tasks: dict[
|
||||
|
@ -423,7 +419,7 @@ class ServiceMngr:
|
|||
(
|
||||
cs,
|
||||
complete,
|
||||
) = await self.service_n.start(_task_manager_start)
|
||||
) = await self.tn.start(_task_manager_start)
|
||||
|
||||
# store the cancel scope and portal for later cancellation or
|
||||
# retstart if needed.
|
||||
|
@ -485,7 +481,7 @@ class ServiceMngr:
|
|||
for details.
|
||||
|
||||
'''
|
||||
cs, ipc_ctx, complete, started = await self.service_n.start(
|
||||
cs, ipc_ctx, complete, started = await self.tn.start(
|
||||
functools.partial(
|
||||
_open_and_supervise_service_ctx,
|
||||
serman=self,
|
||||
|
@ -542,7 +538,7 @@ class ServiceMngr:
|
|||
return sub_ctx
|
||||
|
||||
if daemon_name not in self.service_ctxs:
|
||||
portal: Portal = await self.actor_n.start_actor(
|
||||
portal: Portal = await self.an.start_actor(
|
||||
daemon_name,
|
||||
debug_mode=( # maybe set globally during allocate
|
||||
debug_mode
|
||||
|
|
|
@ -36,6 +36,7 @@ import tractor
|
|||
from tractor._exceptions import AsyncioCancelled
|
||||
from tractor._state import (
|
||||
debug_mode,
|
||||
_runtime_vars,
|
||||
)
|
||||
from tractor.devx import _debug
|
||||
from tractor.log import get_logger
|
||||
|
@ -767,12 +768,16 @@ def run_as_asyncio_guest(
|
|||
'Infecting `asyncio`-process with a `trio` guest-run!\n'
|
||||
)
|
||||
|
||||
# TODO, somehow bootstrap this!
|
||||
_runtime_vars['_is_infected_aio'] = True
|
||||
|
||||
trio.lowlevel.start_guest_run(
|
||||
trio_main,
|
||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||
done_callback=trio_done_callback,
|
||||
)
|
||||
fute_err: BaseException|None = None
|
||||
|
||||
try:
|
||||
out: Outcome = await asyncio.shield(trio_done_fute)
|
||||
|
||||
|
|
Loading…
Reference in New Issue