Compare commits
No commits in common. "0a0d30d1081fe1404e0b17a911627519db86ca99" and "840c328f192ee7c359a05a4f0870c1d1370f8ada" have entirely different histories.
0a0d30d108
...
840c328f19
|
@ -5,7 +5,6 @@ The hipster way to force SC onto the stdlib's "async": 'infection mode'.
|
||||||
import asyncio
|
import asyncio
|
||||||
import builtins
|
import builtins
|
||||||
from contextlib import ExitStack
|
from contextlib import ExitStack
|
||||||
from functools import partial
|
|
||||||
import itertools
|
import itertools
|
||||||
import importlib
|
import importlib
|
||||||
import os
|
import os
|
||||||
|
@ -537,40 +536,41 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
|
||||||
excinfo.value.boxed_type is Exception
|
excinfo.value.boxed_type is Exception
|
||||||
|
|
||||||
|
|
||||||
async def aio_echo_server(
|
|
||||||
to_trio: trio.MemorySendChannel,
|
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
to_trio.send_nowait('start')
|
|
||||||
|
|
||||||
while True:
|
|
||||||
msg = await from_trio.get()
|
|
||||||
|
|
||||||
# echo the msg back
|
|
||||||
to_trio.send_nowait(msg)
|
|
||||||
|
|
||||||
# if we get the terminate sentinel
|
|
||||||
# break the echo loop
|
|
||||||
if msg is None:
|
|
||||||
print('breaking aio echo loop')
|
|
||||||
break
|
|
||||||
|
|
||||||
print('exiting asyncio task')
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def trio_to_aio_echo_server(
|
async def trio_to_aio_echo_server(
|
||||||
ctx: tractor.Context|None,
|
ctx: tractor.Context,
|
||||||
):
|
):
|
||||||
|
|
||||||
|
async def aio_echo_server(
|
||||||
|
to_trio: trio.MemorySendChannel,
|
||||||
|
from_trio: asyncio.Queue,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
to_trio.send_nowait('start')
|
||||||
|
|
||||||
|
while True:
|
||||||
|
msg = await from_trio.get()
|
||||||
|
|
||||||
|
# echo the msg back
|
||||||
|
to_trio.send_nowait(msg)
|
||||||
|
|
||||||
|
# if we get the terminate sentinel
|
||||||
|
# break the echo loop
|
||||||
|
if msg is None:
|
||||||
|
print('breaking aio echo loop')
|
||||||
|
break
|
||||||
|
|
||||||
|
print('exiting asyncio task')
|
||||||
|
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
aio_echo_server,
|
aio_echo_server,
|
||||||
) as (first, chan):
|
) as (first, chan):
|
||||||
assert first == 'start'
|
|
||||||
|
|
||||||
|
assert first == 'start'
|
||||||
await ctx.started(first)
|
await ctx.started(first)
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
print(f'asyncio echoing {msg}')
|
print(f'asyncio echoing {msg}')
|
||||||
await chan.send(msg)
|
await chan.send(msg)
|
||||||
|
@ -649,68 +649,6 @@ def test_echoserver_detailed_mechanics(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'raise_error_mid_stream',
|
|
||||||
[
|
|
||||||
False,
|
|
||||||
Exception,
|
|
||||||
KeyboardInterrupt,
|
|
||||||
],
|
|
||||||
ids='raise_error={}'.format,
|
|
||||||
)
|
|
||||||
def test_infected_root_actor(
|
|
||||||
raise_error_mid_stream: bool|Exception,
|
|
||||||
# conftest wide
|
|
||||||
loglevel: str,
|
|
||||||
debug_mode: bool,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Verify you can run the `tractor` runtime with `Actor.is_infected_aio() == True`
|
|
||||||
in the root actor.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async def _trio_main():
|
|
||||||
|
|
||||||
first: str
|
|
||||||
chan: to_asyncio.LinkedTaskChannel
|
|
||||||
async with (
|
|
||||||
tractor.open_root_actor(
|
|
||||||
debug_mode=debug_mode,
|
|
||||||
loglevel=loglevel,
|
|
||||||
),
|
|
||||||
to_asyncio.open_channel_from(
|
|
||||||
aio_echo_server,
|
|
||||||
) as (first, chan),
|
|
||||||
):
|
|
||||||
assert first == 'start'
|
|
||||||
|
|
||||||
for i in range(1000):
|
|
||||||
await chan.send(i)
|
|
||||||
out = await chan.receive()
|
|
||||||
assert out == i
|
|
||||||
print(f'asyncio echoing {i}')
|
|
||||||
|
|
||||||
if raise_error_mid_stream and i == 500:
|
|
||||||
raise raise_error_mid_stream
|
|
||||||
|
|
||||||
if out is None:
|
|
||||||
try:
|
|
||||||
out = await chan.receive()
|
|
||||||
except trio.EndOfChannel:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
raise RuntimeError('aio channel never stopped?')
|
|
||||||
|
|
||||||
if raise_error_mid_stream:
|
|
||||||
with pytest.raises(raise_error_mid_stream):
|
|
||||||
tractor.to_asyncio.run_as_asyncio_guest(
|
|
||||||
trio_main=_trio_main,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
tractor.to_asyncio.run_as_asyncio_guest(
|
|
||||||
trio_main=_trio_main,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def manage_file(
|
async def manage_file(
|
||||||
|
|
|
@ -47,9 +47,6 @@ from functools import partial
|
||||||
import inspect
|
import inspect
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import textwrap
|
import textwrap
|
||||||
from types import (
|
|
||||||
UnionType,
|
|
||||||
)
|
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
|
@ -2547,14 +2544,7 @@ def context(
|
||||||
name: str
|
name: str
|
||||||
param: Type
|
param: Type
|
||||||
for name, param in annots.items():
|
for name, param in annots.items():
|
||||||
if (
|
if param is Context:
|
||||||
param is Context
|
|
||||||
or (
|
|
||||||
isinstance(param, UnionType)
|
|
||||||
and
|
|
||||||
Context in param.__args__
|
|
||||||
)
|
|
||||||
):
|
|
||||||
ctx_var_name: str = name
|
ctx_var_name: str = name
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -334,10 +334,6 @@ async def open_root_actor(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
enable_modules=enable_modules,
|
enable_modules=enable_modules,
|
||||||
)
|
)
|
||||||
# XXX, in case the root actor runtime was actually run from
|
|
||||||
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
|
|
||||||
# `.trio.run()`.
|
|
||||||
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
|
|
||||||
|
|
||||||
# Start up main task set via core actor-runtime nurseries.
|
# Start up main task set via core actor-runtime nurseries.
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -160,8 +160,8 @@ async def open_service_mngr(
|
||||||
):
|
):
|
||||||
# impl specific obvi..
|
# impl specific obvi..
|
||||||
init_kwargs.update({
|
init_kwargs.update({
|
||||||
'an': an,
|
'actor_n': an,
|
||||||
'tn': tn,
|
'service_n': tn,
|
||||||
})
|
})
|
||||||
|
|
||||||
mngr: ServiceMngr|None
|
mngr: ServiceMngr|None
|
||||||
|
@ -174,11 +174,15 @@ async def open_service_mngr(
|
||||||
# eventual `@singleton_acm` API wrapper.
|
# eventual `@singleton_acm` API wrapper.
|
||||||
#
|
#
|
||||||
# assign globally for future daemon/task creation
|
# assign globally for future daemon/task creation
|
||||||
mngr.an = an
|
mngr.actor_n = an
|
||||||
mngr.tn = tn
|
mngr.service_n = tn
|
||||||
|
|
||||||
else:
|
else:
|
||||||
assert (mngr.an and mngr.tn)
|
assert (
|
||||||
|
mngr.actor_n
|
||||||
|
and
|
||||||
|
mngr.service_tn
|
||||||
|
)
|
||||||
log.info(
|
log.info(
|
||||||
'Using extant service mngr!\n\n'
|
'Using extant service mngr!\n\n'
|
||||||
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
|
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
|
||||||
|
@ -345,8 +349,8 @@ class ServiceMngr:
|
||||||
process tree.
|
process tree.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
an: ActorNursery
|
actor_n: ActorNursery
|
||||||
tn: trio.Nursery
|
service_n: trio.Nursery
|
||||||
debug_mode: bool = False # tractor sub-actor debug mode flag
|
debug_mode: bool = False # tractor sub-actor debug mode flag
|
||||||
|
|
||||||
service_tasks: dict[
|
service_tasks: dict[
|
||||||
|
@ -419,7 +423,7 @@ class ServiceMngr:
|
||||||
(
|
(
|
||||||
cs,
|
cs,
|
||||||
complete,
|
complete,
|
||||||
) = await self.tn.start(_task_manager_start)
|
) = await self.service_n.start(_task_manager_start)
|
||||||
|
|
||||||
# store the cancel scope and portal for later cancellation or
|
# store the cancel scope and portal for later cancellation or
|
||||||
# retstart if needed.
|
# retstart if needed.
|
||||||
|
@ -481,7 +485,7 @@ class ServiceMngr:
|
||||||
for details.
|
for details.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
cs, ipc_ctx, complete, started = await self.tn.start(
|
cs, ipc_ctx, complete, started = await self.service_n.start(
|
||||||
functools.partial(
|
functools.partial(
|
||||||
_open_and_supervise_service_ctx,
|
_open_and_supervise_service_ctx,
|
||||||
serman=self,
|
serman=self,
|
||||||
|
@ -538,7 +542,7 @@ class ServiceMngr:
|
||||||
return sub_ctx
|
return sub_ctx
|
||||||
|
|
||||||
if daemon_name not in self.service_ctxs:
|
if daemon_name not in self.service_ctxs:
|
||||||
portal: Portal = await self.an.start_actor(
|
portal: Portal = await self.actor_n.start_actor(
|
||||||
daemon_name,
|
daemon_name,
|
||||||
debug_mode=( # maybe set globally during allocate
|
debug_mode=( # maybe set globally during allocate
|
||||||
debug_mode
|
debug_mode
|
||||||
|
|
|
@ -36,7 +36,6 @@ import tractor
|
||||||
from tractor._exceptions import AsyncioCancelled
|
from tractor._exceptions import AsyncioCancelled
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
debug_mode,
|
debug_mode,
|
||||||
_runtime_vars,
|
|
||||||
)
|
)
|
||||||
from tractor.devx import _debug
|
from tractor.devx import _debug
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
@ -768,16 +767,12 @@ def run_as_asyncio_guest(
|
||||||
'Infecting `asyncio`-process with a `trio` guest-run!\n'
|
'Infecting `asyncio`-process with a `trio` guest-run!\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO, somehow bootstrap this!
|
|
||||||
_runtime_vars['_is_infected_aio'] = True
|
|
||||||
|
|
||||||
trio.lowlevel.start_guest_run(
|
trio.lowlevel.start_guest_run(
|
||||||
trio_main,
|
trio_main,
|
||||||
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
|
||||||
done_callback=trio_done_callback,
|
done_callback=trio_done_callback,
|
||||||
)
|
)
|
||||||
fute_err: BaseException|None = None
|
fute_err: BaseException|None = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
out: Outcome = await asyncio.shield(trio_done_fute)
|
out: Outcome = await asyncio.shield(trio_done_fute)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue