Compare commits
3 Commits
840c328f19
...
0a0d30d108
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 0a0d30d108 | |
Tyler Goodlet | dcb6706489 | |
Tyler Goodlet | 170e198683 |
|
@ -5,6 +5,7 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'.
|
||||||
import asyncio
|
import asyncio
|
||||||
import builtins
|
import builtins
|
||||||
from contextlib import ExitStack
|
from contextlib import ExitStack
|
||||||
|
from functools import partial
|
||||||
import itertools
|
import itertools
|
||||||
import importlib
|
import importlib
|
||||||
import os
|
import os
|
||||||
|
@ -536,15 +537,10 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||||
excinfo.value.boxed_type is Exception
|
excinfo.value.boxed_type is Exception
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
async def aio_echo_server(
|
||||||
async def trio_to_aio_echo_server(
|
|
||||||
ctx: tractor.Context,
|
|
||||||
):
|
|
||||||
|
|
||||||
async def aio_echo_server(
|
|
||||||
to_trio: trio.MemorySendChannel,
|
to_trio: trio.MemorySendChannel,
|
||||||
from_trio: asyncio.Queue,
|
from_trio: asyncio.Queue,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
to_trio.send_nowait('start')
|
to_trio.send_nowait('start')
|
||||||
|
|
||||||
|
@ -562,15 +558,19 @@ async def trio_to_aio_echo_server(
|
||||||
|
|
||||||
print('exiting asyncio task')
|
print('exiting asyncio task')
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def trio_to_aio_echo_server(
|
||||||
|
ctx: tractor.Context|None,
|
||||||
|
):
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
aio_echo_server,
|
aio_echo_server,
|
||||||
) as (first, chan):
|
) as (first, chan):
|
||||||
|
|
||||||
assert first == 'start'
|
assert first == 'start'
|
||||||
|
|
||||||
await ctx.started(first)
|
await ctx.started(first)
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
print(f'asyncio echoing {msg}')
|
print(f'asyncio echoing {msg}')
|
||||||
await chan.send(msg)
|
await chan.send(msg)
|
||||||
|
@ -649,6 +649,68 @@ def test_echoserver_detailed_mechanics(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'raise_error_mid_stream',
|
||||||
|
[
|
||||||
|
False,
|
||||||
|
Exception,
|
||||||
|
KeyboardInterrupt,
|
||||||
|
],
|
||||||
|
ids='raise_error={}'.format,
|
||||||
|
)
|
||||||
|
def test_infected_root_actor(
|
||||||
|
raise_error_mid_stream: bool|Exception,
|
||||||
|
# conftest wide
|
||||||
|
loglevel: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True`
|
||||||
|
in the root actor.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def _trio_main():
|
||||||
|
|
||||||
|
first: str
|
||||||
|
chan: to_asyncio.LinkedTaskChannel
|
||||||
|
async with (
|
||||||
|
tractor.open_root_actor(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
),
|
||||||
|
to_asyncio.open_channel_from(
|
||||||
|
aio_echo_server,
|
||||||
|
) as (first, chan),
|
||||||
|
):
|
||||||
|
assert first == 'start'
|
||||||
|
|
||||||
|
for i in range(1000):
|
||||||
|
await chan.send(i)
|
||||||
|
out = await chan.receive()
|
||||||
|
assert out == i
|
||||||
|
print(f'asyncio echoing {i}')
|
||||||
|
|
||||||
|
if raise_error_mid_stream and i == 500:
|
||||||
|
raise raise_error_mid_stream
|
||||||
|
|
||||||
|
if out is None:
|
||||||
|
try:
|
||||||
|
out = await chan.receive()
|
||||||
|
except trio.EndOfChannel:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise RuntimeError('aio channel never stopped?')
|
||||||
|
|
||||||
|
if raise_error_mid_stream:
|
||||||
|
with pytest.raises(raise_error_mid_stream):
|
||||||
|
tractor.to_asyncio.run_as_asyncio_guest(
|
||||||
|
trio_main=_trio_main,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
tractor.to_asyncio.run_as_asyncio_guest(
|
||||||
|
trio_main=_trio_main,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def manage_file(
|
async def manage_file(
|
||||||
|
|
|
@ -47,6 +47,9 @@ from functools import partial
|
||||||
import inspect
|
import inspect
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import textwrap
|
import textwrap
|
||||||
|
from types import (
|
||||||
|
UnionType,
|
||||||
|
)
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
|
@ -2544,7 +2547,14 @@ def context(
|
||||||
name: str
|
name: str
|
||||||
param: Type
|
param: Type
|
||||||
for name, param in annots.items():
|
for name, param in annots.items():
|
||||||
if param is Context:
|
if (
|
||||||
|
param is Context
|
||||||
|
or (
|
||||||
|
isinstance(param, UnionType)
|
||||||
|
and
|
||||||
|
Context in param.__args__
|
||||||
|
)
|
||||||
|
):
|
||||||
ctx_var_name: str = name
|
ctx_var_name: str = name
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -334,6 +334,10 @@ async def open_root_actor(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
enable_modules=enable_modules,
|
enable_modules=enable_modules,
|
||||||
)
|
)
|
||||||
|
# XXX, in case the root actor runtime was actually run from
|
||||||
|
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
|
||||||
|
# `.trio.run()`.
|
||||||
|
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
|
||||||
|
|
||||||
# Start up main task set via core actor-runtime nurseries.
|
# Start up main task set via core actor-runtime nurseries.
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -160,8 +160,8 @@ async def open_service_mngr(
|
||||||
):
|
):
|
||||||
# impl specific obvi..
|
# impl specific obvi..
|
||||||
init_kwargs.update({
|
init_kwargs.update({
|
||||||
'actor_n': an,
|
'an': an,
|
||||||
'service_n': tn,
|
'tn': tn,
|
||||||
})
|
})
|
||||||
|
|
||||||
mngr: ServiceMngr|None
|
mngr: ServiceMngr|None
|
||||||
|
@ -174,15 +174,11 @@ async def open_service_mngr(
|
||||||
# eventual `@singleton_acm` API wrapper.
|
# eventual `@singleton_acm` API wrapper.
|
||||||
#
|
#
|
||||||
# assign globally for future daemon/task creation
|
# assign globally for future daemon/task creation
|
||||||
mngr.actor_n = an
|
mngr.an = an
|
||||||
mngr.service_n = tn
|
mngr.tn = tn
|
||||||
|
|
||||||
else:
|
else:
|
||||||
assert (
|
assert (mngr.an and mngr.tn)
|
||||||
mngr.actor_n
|
|
||||||
and
|
|
||||||
mngr.service_tn
|
|
||||||
)
|
|
||||||
log.info(
|
log.info(
|
||||||
'Using extant service mngr!\n\n'
|
'Using extant service mngr!\n\n'
|
||||||
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
|
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
|
||||||
|
@ -349,8 +345,8 @@ class ServiceMngr:
|
||||||
process tree.
|
process tree.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor_n: ActorNursery
|
an: ActorNursery
|
||||||
service_n: trio.Nursery
|
tn: trio.Nursery
|
||||||
debug_mode: bool = False # tractor sub-actor debug mode flag
|
debug_mode: bool = False # tractor sub-actor debug mode flag
|
||||||
|
|
||||||
service_tasks: dict[
|
service_tasks: dict[
|
||||||
|
@ -423,7 +419,7 @@ class ServiceMngr:
|
||||||
(
|
(
|
||||||
cs,
|
cs,
|
||||||
complete,
|
complete,
|
||||||
) = await self.service_n.start(_task_manager_start)
|
) = await self.tn.start(_task_manager_start)
|
||||||
|
|
||||||
# store the cancel scope and portal for later cancellation or
|
# store the cancel scope and portal for later cancellation or
|
||||||
# retstart if needed.
|
# retstart if needed.
|
||||||
|
@ -485,7 +481,7 @@ class ServiceMngr:
|
||||||
for details.
|
for details.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
cs, ipc_ctx, complete, started = await self.service_n.start(
|
cs, ipc_ctx, complete, started = await self.tn.start(
|
||||||
functools.partial(
|
functools.partial(
|
||||||
_open_and_supervise_service_ctx,
|
_open_and_supervise_service_ctx,
|
||||||
serman=self,
|
serman=self,
|
||||||
|
@ -542,7 +538,7 @@ class ServiceMngr:
|
||||||
return sub_ctx
|
return sub_ctx
|
||||||
|
|
||||||
if daemon_name not in self.service_ctxs:
|
if daemon_name not in self.service_ctxs:
|
||||||
portal: Portal = await self.actor_n.start_actor(
|
portal: Portal = await self.an.start_actor(
|
||||||
daemon_name,
|
daemon_name,
|
||||||
debug_mode=( # maybe set globally during allocate
|
debug_mode=( # maybe set globally during allocate
|
||||||
debug_mode
|
debug_mode
|
||||||
|
|
|
@ -36,6 +36,7 @@ import tractor
|
||||||
from tractor._exceptions import AsyncioCancelled
|
from tractor._exceptions import AsyncioCancelled
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
debug_mode,
|
debug_mode,
|
||||||
|
_runtime_vars,
|
||||||
)
|
)
|
||||||
from tractor.devx import _debug
|
from tractor.devx import _debug
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
@ -767,12 +768,16 @@ def run_as_asyncio_guest(
|
||||||
'Infecting `asyncio`-process with a `trio` guest-run!\n'
|
'Infecting `asyncio`-process with a `trio` guest-run!\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO, somehow bootstrap this!
|
||||||
|
_runtime_vars['_is_infected_aio'] = True
|
||||||
|
|
||||||
trio.lowlevel.start_guest_run(
|
trio.lowlevel.start_guest_run(
|
||||||
trio_main,
|
trio_main,
|
||||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||||
done_callback=trio_done_callback,
|
done_callback=trio_done_callback,
|
||||||
)
|
)
|
||||||
fute_err: BaseException|None = None
|
fute_err: BaseException|None = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
out: Outcome = await asyncio.shield(trio_done_fute)
|
out: Outcome = await asyncio.shield(trio_done_fute)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue