forked from goodboy/tractor
				
			Pass `infect_asyncio` setting via runtime-vars
The reason for this "duplication" with the `--asyncio` CLI flag (passed to the child during spawn) is 2-fold: - allows verifying inside `Actor._from_parent()` that the `trio` runtime was started via `.start_guest_run()` as well as if the `Actor._infected_aio` spawn-entrypoint value has been set (by the `._entry.<spawn-backend>_main()` whenever `--asyncio` is passed) such that any mismatch can be signaled via an `InternalError`. - enables checking the `._state._runtime_vars['_is_infected_aio']` value directly (say from a non-actor/`trio`-thread) instead of calling `._state.current_actor(err_on_no_runtime=False)` in certain edge cases. Impl/testing deats: - add `._state._runtime_vars['_is_infected_aio'] = False` default. - raise `InternalError` on any `--asyncio`-flag-passed vs. `_runtime_vars`-value-relayed-from-parent inside `Actor._from_parent()` and include a `Runner.is_guest` assert for good measure B) - set and relay `infect_asyncio: bool` via runtime-vars to child in `ActorNursery.start_actor()`. - verify `actor.is_infected_aio()`, `actor._infected_aio` and `_state._runtime_vars['_is_infected_aio']` are all set in test suite's `asyncio_actor()` endpoint.remotes/1757153874605917753/main
							parent
							
								
									64d506970a
								
							
						
					
					
						commit
						5cdfee3bcf
					
				|  | @ -21,9 +21,11 @@ import trio | |||
| import tractor | ||||
| from tractor import ( | ||||
|     current_actor, | ||||
|     Actor, | ||||
|     to_asyncio, | ||||
|     RemoteActorError, | ||||
|     ContextCancelled, | ||||
|     _state, | ||||
| ) | ||||
| from tractor.trionics import BroadcastReceiver | ||||
| from tractor._testing import expect_ctxc | ||||
|  | @ -80,7 +82,16 @@ async def asyncio_actor( | |||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     assert tractor.current_actor().is_infected_aio() | ||||
|     # ensure internal runtime state is consistent | ||||
|     actor: Actor = tractor.current_actor() | ||||
|     assert ( | ||||
|         actor.is_infected_aio() | ||||
|         and | ||||
|         actor._infected_aio | ||||
|         and | ||||
|         _state._runtime_vars['_is_infected_aio'] | ||||
|     ) | ||||
| 
 | ||||
|     target: Callable = globals()[target] | ||||
| 
 | ||||
|     if '.' in expect_err: | ||||
|  | @ -136,7 +147,7 @@ def test_aio_simple_error(reg_addr): | |||
|         assert err | ||||
| 
 | ||||
|     assert isinstance(err, RemoteActorError) | ||||
|     assert err.boxed_type == AssertionError | ||||
|     assert err.boxed_type is AssertionError | ||||
| 
 | ||||
| 
 | ||||
| def test_tractor_cancels_aio(reg_addr): | ||||
|  |  | |||
|  | @ -20,6 +20,7 @@ Sub-process entry points. | |||
| """ | ||||
| from __future__ import annotations | ||||
| from functools import partial | ||||
| import multiprocessing as mp | ||||
| import os | ||||
| import textwrap | ||||
| from typing import ( | ||||
|  | @ -64,20 +65,22 @@ def _mp_main( | |||
|     ''' | ||||
|     actor._forkserver_info = forkserver_info | ||||
|     from ._spawn import try_set_start_method | ||||
|     spawn_ctx = try_set_start_method(start_method) | ||||
|     spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method) | ||||
|     assert spawn_ctx | ||||
| 
 | ||||
|     if actor.loglevel is not None: | ||||
|         log.info( | ||||
|             f"Setting loglevel for {actor.uid} to {actor.loglevel}") | ||||
|             f'Setting loglevel for {actor.uid} to {actor.loglevel}' | ||||
|         ) | ||||
|         get_console_log(actor.loglevel) | ||||
| 
 | ||||
|     assert spawn_ctx | ||||
|     # TODO: use scops headers like for `trio` below! | ||||
|     # (well after we libify it maybe..) | ||||
|     log.info( | ||||
|         f"Started new {spawn_ctx.current_process()} for {actor.uid}") | ||||
| 
 | ||||
|     _state._current_actor = actor | ||||
| 
 | ||||
|     log.debug(f"parent_addr is {parent_addr}") | ||||
|         f'Started new {spawn_ctx.current_process()} for {actor.uid}' | ||||
|     #     f"parent_addr is {parent_addr}" | ||||
|     ) | ||||
|     _state._current_actor: Actor = actor | ||||
|     trio_main = partial( | ||||
|         async_main, | ||||
|         actor=actor, | ||||
|  | @ -94,7 +97,9 @@ def _mp_main( | |||
|         pass  # handle it the same way trio does? | ||||
| 
 | ||||
|     finally: | ||||
|         log.info(f"Subactor {actor.uid} terminated") | ||||
|         log.info( | ||||
|             f'`mp`-subactor {actor.uid} exited' | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: move this func to some kinda `.devx._conc_lang.py` eventually | ||||
|  |  | |||
|  | @ -59,6 +59,7 @@ from types import ModuleType | |||
| import warnings | ||||
| 
 | ||||
| import trio | ||||
| from trio._core import _run as trio_runtime | ||||
| from trio import ( | ||||
|     CancelScope, | ||||
|     Nursery, | ||||
|  | @ -80,6 +81,7 @@ from ._context import ( | |||
| from .log import get_logger | ||||
| from ._exceptions import ( | ||||
|     ContextCancelled, | ||||
|     InternalError, | ||||
|     ModuleNotExposed, | ||||
|     MsgTypeError, | ||||
|     unpack_error, | ||||
|  | @ -98,6 +100,7 @@ from ._rpc import ( | |||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._supervise import ActorNursery | ||||
|     from trio._channel import MemoryChannelState | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger('tractor') | ||||
|  | @ -896,11 +899,15 @@ class Actor: | |||
|                 f'peer: {chan.uid}\n' | ||||
|                 f'cid:{cid}\n' | ||||
|             ) | ||||
|             ctx._allow_overruns = allow_overruns | ||||
|             ctx._allow_overruns: bool = allow_overruns | ||||
| 
 | ||||
|             # adjust buffer size if specified | ||||
|             state = ctx._send_chan._state  # type: ignore | ||||
|             if msg_buffer_size and state.max_buffer_size != msg_buffer_size: | ||||
|             state: MemoryChannelState  = ctx._send_chan._state  # type: ignore | ||||
|             if ( | ||||
|                 msg_buffer_size | ||||
|                 and | ||||
|                 state.max_buffer_size != msg_buffer_size | ||||
|             ): | ||||
|                 state.max_buffer_size = msg_buffer_size | ||||
| 
 | ||||
|         except KeyError: | ||||
|  | @ -1094,7 +1101,36 @@ class Actor: | |||
|                                 '`tractor.pause_from_sync()` not available!' | ||||
|                             ) | ||||
| 
 | ||||
|                 rvs['_is_root'] = False | ||||
|                 # XXX ensure the "infected `asyncio` mode" setting | ||||
|                 # passed down from our spawning parent is consistent | ||||
|                 # with `trio`-runtime initialization: | ||||
|                 # - during sub-proc boot, the entrypoint func | ||||
|                 #   (`._entry.<spawn_backend>_main()`) should set | ||||
|                 #   `._infected_aio = True` before calling | ||||
|                 #   `run_as_asyncio_guest()`, | ||||
|                 # - the value of `infect_asyncio: bool = True` as | ||||
|                 #   passed to `ActorNursery.start_actor()` must be | ||||
|                 #   the same as `_runtime_vars['_is_infected_aio']` | ||||
|                 if ( | ||||
|                     (aio_rtv := rvs['_is_infected_aio']) | ||||
|                     != | ||||
|                     (aio_attr := self._infected_aio) | ||||
|                 ): | ||||
|                     raise InternalError( | ||||
|                         'Parent sent runtime-vars that mismatch for the ' | ||||
|                         '"infected `asyncio` mode" settings ?!?\n\n' | ||||
| 
 | ||||
|                         f'rvs["_is_infected_aio"] = {aio_rtv}\n' | ||||
|                         f'self._infected_aio = {aio_attr}\n' | ||||
|                     ) | ||||
|                 if aio_rtv: | ||||
|                     assert trio_runtime.GLOBAL_RUN_CONTEXT.runner.is_guest | ||||
|                     # ^TODO^ possibly add a `sniffio` or | ||||
|                     # `trio` pub-API for `is_guest_mode()`? | ||||
| 
 | ||||
|                 rvs['_is_root'] = False  # obvi XD | ||||
| 
 | ||||
|                 # update process-wide globals | ||||
|                 _state._runtime_vars.update(rvs) | ||||
| 
 | ||||
|                 # XXX: ``msgspec`` doesn't support serializing tuples | ||||
|  |  | |||
|  | @ -44,6 +44,8 @@ _runtime_vars: dict[str, Any] = { | |||
|     '_root_mailbox': (None, None), | ||||
|     '_registry_addrs': [], | ||||
| 
 | ||||
|     '_is_infected_aio': False, | ||||
| 
 | ||||
|     # for `tractor.pause_from_sync()` & `breakpoint()` support | ||||
|     'use_greenback': False, | ||||
| } | ||||
|  | @ -70,7 +72,8 @@ def current_actor( | |||
|     ''' | ||||
|     if ( | ||||
|         err_on_no_runtime | ||||
|         and _current_actor is None | ||||
|         and | ||||
|         _current_actor is None | ||||
|     ): | ||||
|         msg: str = 'No local actor has been initialized yet?\n' | ||||
|         from ._exceptions import NoRuntime | ||||
|  |  | |||
|  | @ -158,6 +158,7 @@ class ActorNursery: | |||
|         # configure and pass runtime state | ||||
|         _rtv = _state._runtime_vars.copy() | ||||
|         _rtv['_is_root'] = False | ||||
|         _rtv['_is_infected_aio'] = infect_asyncio | ||||
| 
 | ||||
|         # allow setting debug policy per actor | ||||
|         if debug_mode is not None: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue