diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 42eb35b..fca971d 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -21,9 +21,11 @@ import trio import tractor from tractor import ( current_actor, + Actor, to_asyncio, RemoteActorError, ContextCancelled, + _state, ) from tractor.trionics import BroadcastReceiver from tractor._testing import expect_ctxc @@ -80,7 +82,16 @@ async def asyncio_actor( ) -> None: - assert tractor.current_actor().is_infected_aio() + # ensure internal runtime state is consistent + actor: Actor = tractor.current_actor() + assert ( + actor.is_infected_aio() + and + actor._infected_aio + and + _state._runtime_vars['_is_infected_aio'] + ) + target: Callable = globals()[target] if '.' in expect_err: @@ -136,7 +147,7 @@ def test_aio_simple_error(reg_addr): assert err assert isinstance(err, RemoteActorError) - assert err.boxed_type == AssertionError + assert err.boxed_type is AssertionError def test_tractor_cancels_aio(reg_addr): diff --git a/tractor/_entry.py b/tractor/_entry.py index a072706..19dcb9f 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -20,6 +20,7 @@ Sub-process entry points. """ from __future__ import annotations from functools import partial +import multiprocessing as mp import os import textwrap from typing import ( @@ -64,20 +65,22 @@ def _mp_main( ''' actor._forkserver_info = forkserver_info from ._spawn import try_set_start_method - spawn_ctx = try_set_start_method(start_method) + spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method) + assert spawn_ctx if actor.loglevel is not None: log.info( - f"Setting loglevel for {actor.uid} to {actor.loglevel}") + f'Setting loglevel for {actor.uid} to {actor.loglevel}' + ) get_console_log(actor.loglevel) - assert spawn_ctx + # TODO: use scops headers like for `trio` below! + # (well after we libify it maybe..) log.info( - f"Started new {spawn_ctx.current_process()} for {actor.uid}") - - _state._current_actor = actor - - log.debug(f"parent_addr is {parent_addr}") + f'Started new {spawn_ctx.current_process()} for {actor.uid}' + # f"parent_addr is {parent_addr}" + ) + _state._current_actor: Actor = actor trio_main = partial( async_main, actor=actor, @@ -94,7 +97,9 @@ def _mp_main( pass # handle it the same way trio does? finally: - log.info(f"Subactor {actor.uid} terminated") + log.info( + f'`mp`-subactor {actor.uid} exited' + ) # TODO: move this func to some kinda `.devx._conc_lang.py` eventually diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 5615373..8cbf855 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -59,6 +59,7 @@ import os import warnings import trio +from trio._core import _run as trio_runtime from trio import ( CancelScope, Nursery, @@ -80,6 +81,7 @@ from ._context import ( from .log import get_logger from ._exceptions import ( ContextCancelled, + InternalError, ModuleNotExposed, MsgTypeError, unpack_error, @@ -102,6 +104,7 @@ from ._rpc import ( if TYPE_CHECKING: from ._supervise import ActorNursery + from trio._channel import MemoryChannelState log = get_logger('tractor') @@ -897,11 +900,15 @@ class Actor: f'peer: {chan.uid}\n' f'cid:{cid}\n' ) - ctx._allow_overruns = allow_overruns + ctx._allow_overruns: bool = allow_overruns # adjust buffer size if specified - state = ctx._send_chan._state # type: ignore - if msg_buffer_size and state.max_buffer_size != msg_buffer_size: + state: MemoryChannelState = ctx._send_chan._state # type: ignore + if ( + msg_buffer_size + and + state.max_buffer_size != msg_buffer_size + ): state.max_buffer_size = msg_buffer_size except KeyError: @@ -1095,7 +1102,36 @@ class Actor: '`tractor.pause_from_sync()` not available!' ) - rvs['_is_root'] = False + # XXX ensure the "infected `asyncio` mode" setting + # passed down from our spawning parent is consistent + # with `trio`-runtime initialization: + # - during sub-proc boot, the entrypoint func + # (`._entry._main()`) should set + # `._infected_aio = True` before calling + # `run_as_asyncio_guest()`, + # - the value of `infect_asyncio: bool = True` as + # passed to `ActorNursery.start_actor()` must be + # the same as `_runtime_vars['_is_infected_aio']` + if ( + (aio_rtv := rvs['_is_infected_aio']) + != + (aio_attr := self._infected_aio) + ): + raise InternalError( + 'Parent sent runtime-vars that mismatch for the ' + '"infected `asyncio` mode" settings ?!?\n\n' + + f'rvs["_is_infected_aio"] = {aio_rtv}\n' + f'self._infected_aio = {aio_attr}\n' + ) + if aio_rtv: + assert trio_runtime.GLOBAL_RUN_CONTEXT.runner.is_guest + # ^TODO^ possibly add a `sniffio` or + # `trio` pub-API for `is_guest_mode()`? + + rvs['_is_root'] = False # obvi XD + + # update process-wide globals _state._runtime_vars.update(rvs) # XXX: ``msgspec`` doesn't support serializing tuples diff --git a/tractor/_state.py b/tractor/_state.py index 9f89600..a87ad36 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -44,6 +44,8 @@ _runtime_vars: dict[str, Any] = { '_root_mailbox': (None, None), '_registry_addrs': [], + '_is_infected_aio': False, + # for `tractor.pause_from_sync()` & `breakpoint()` support 'use_greenback': False, } @@ -70,7 +72,8 @@ def current_actor( ''' if ( err_on_no_runtime - and _current_actor is None + and + _current_actor is None ): msg: str = 'No local actor has been initialized yet?\n' from ._exceptions import NoRuntime diff --git a/tractor/_supervise.py b/tractor/_supervise.py index fb737c1..b644ef2 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -158,6 +158,7 @@ class ActorNursery: # configure and pass runtime state _rtv = _state._runtime_vars.copy() _rtv['_is_root'] = False + _rtv['_is_infected_aio'] = infect_asyncio # allow setting debug policy per actor if debug_mode is not None: