Compare commits
	
		
			1 Commits 
		
	
	
		
			f7469442e3
			...
			547b957bbf
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 547b957bbf | 
|  | @ -1,81 +0,0 @@ | |||
| ''' | ||||
| Verify we can dump a `stackscope` tree on a hang. | ||||
| 
 | ||||
| ''' | ||||
| import os | ||||
| import signal | ||||
| 
 | ||||
| import trio | ||||
| import tractor | ||||
| 
 | ||||
| @tractor.context | ||||
| async def start_n_shield_hang( | ||||
|     ctx: tractor.Context, | ||||
| ): | ||||
|     # actor: tractor.Actor = tractor.current_actor() | ||||
| 
 | ||||
|     # sync to parent-side task | ||||
|     await ctx.started(os.getpid()) | ||||
| 
 | ||||
|     print('Entering shield sleep..') | ||||
|     with trio.CancelScope(shield=True): | ||||
|         await trio.sleep_forever()  # in subactor | ||||
| 
 | ||||
|     # XXX NOTE ^^^ since this shields, we expect | ||||
|     # the zombie reaper (aka T800) to engage on | ||||
|     # SIGINT from the user and eventually hard-kill | ||||
|     # this subprocess! | ||||
| 
 | ||||
| 
 | ||||
| async def main( | ||||
|     from_test: bool = False, | ||||
| ) -> None: | ||||
| 
 | ||||
|     async with ( | ||||
|         tractor.open_nursery( | ||||
|             debug_mode=True, | ||||
|             enable_stack_on_sig=True, | ||||
|             # maybe_enable_greenback=False, | ||||
|             loglevel='devx', | ||||
|         ) as an, | ||||
|     ): | ||||
| 
 | ||||
|         ptl: tractor.Portal  = await an.start_actor( | ||||
|             'hanger', | ||||
|             enable_modules=[__name__], | ||||
|             debug_mode=True, | ||||
|         ) | ||||
|         async with ptl.open_context( | ||||
|             start_n_shield_hang, | ||||
|         ) as (ctx, cpid): | ||||
| 
 | ||||
|             _, proc, _ = an._children[ptl.chan.uid] | ||||
|             assert cpid == proc.pid | ||||
| 
 | ||||
|             print( | ||||
|                 'Yo my child hanging..?\n' | ||||
|                 'Sending SIGUSR1 to see a tree-trace!\n' | ||||
|             ) | ||||
| 
 | ||||
|             # XXX simulate the wrapping test's "user actions" | ||||
|             # (i.e. if a human didn't run this manually but wants to | ||||
|             # know what they should do to reproduce test behaviour) | ||||
|             if from_test: | ||||
|                 os.kill( | ||||
|                     cpid, | ||||
|                     signal.SIGUSR1, | ||||
|                 ) | ||||
| 
 | ||||
|                 # simulate user cancelling program | ||||
|                 await trio.sleep(0.5) | ||||
|                 os.kill( | ||||
|                     os.getpid(), | ||||
|                     signal.SIGINT, | ||||
|                 ) | ||||
|             else: | ||||
|                 # actually let user send the ctl-c | ||||
|                 await trio.sleep_forever()  # in root | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     trio.run(main) | ||||
|  | @ -21,11 +21,9 @@ import trio | |||
| import tractor | ||||
| from tractor import ( | ||||
|     current_actor, | ||||
|     Actor, | ||||
|     to_asyncio, | ||||
|     RemoteActorError, | ||||
|     ContextCancelled, | ||||
|     _state, | ||||
| ) | ||||
| from tractor.trionics import BroadcastReceiver | ||||
| from tractor._testing import expect_ctxc | ||||
|  | @ -82,16 +80,7 @@ async def asyncio_actor( | |||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     # ensure internal runtime state is consistent | ||||
|     actor: Actor = tractor.current_actor() | ||||
|     assert ( | ||||
|         actor.is_infected_aio() | ||||
|         and | ||||
|         actor._infected_aio | ||||
|         and | ||||
|         _state._runtime_vars['_is_infected_aio'] | ||||
|     ) | ||||
| 
 | ||||
|     assert tractor.current_actor().is_infected_aio() | ||||
|     target: Callable = globals()[target] | ||||
| 
 | ||||
|     if '.' in expect_err: | ||||
|  | @ -147,7 +136,7 @@ def test_aio_simple_error(reg_addr): | |||
|         assert err | ||||
| 
 | ||||
|     assert isinstance(err, RemoteActorError) | ||||
|     assert err.boxed_type is AssertionError | ||||
|     assert err.boxed_type == AssertionError | ||||
| 
 | ||||
| 
 | ||||
| def test_tractor_cancels_aio(reg_addr): | ||||
|  |  | |||
|  | @ -20,7 +20,6 @@ Sub-process entry points. | |||
| """ | ||||
| from __future__ import annotations | ||||
| from functools import partial | ||||
| import multiprocessing as mp | ||||
| import os | ||||
| import textwrap | ||||
| from typing import ( | ||||
|  | @ -65,22 +64,20 @@ def _mp_main( | |||
|     ''' | ||||
|     actor._forkserver_info = forkserver_info | ||||
|     from ._spawn import try_set_start_method | ||||
|     spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method) | ||||
|     assert spawn_ctx | ||||
|     spawn_ctx = try_set_start_method(start_method) | ||||
| 
 | ||||
|     if actor.loglevel is not None: | ||||
|         log.info( | ||||
|             f'Setting loglevel for {actor.uid} to {actor.loglevel}' | ||||
|         ) | ||||
|             f"Setting loglevel for {actor.uid} to {actor.loglevel}") | ||||
|         get_console_log(actor.loglevel) | ||||
| 
 | ||||
|     # TODO: use scops headers like for `trio` below! | ||||
|     # (well after we libify it maybe..) | ||||
|     assert spawn_ctx | ||||
|     log.info( | ||||
|         f'Started new {spawn_ctx.current_process()} for {actor.uid}' | ||||
|     #     f"parent_addr is {parent_addr}" | ||||
|     ) | ||||
|     _state._current_actor: Actor = actor | ||||
|         f"Started new {spawn_ctx.current_process()} for {actor.uid}") | ||||
| 
 | ||||
|     _state._current_actor = actor | ||||
| 
 | ||||
|     log.debug(f"parent_addr is {parent_addr}") | ||||
|     trio_main = partial( | ||||
|         async_main, | ||||
|         actor=actor, | ||||
|  | @ -97,9 +94,7 @@ def _mp_main( | |||
|         pass  # handle it the same way trio does? | ||||
| 
 | ||||
|     finally: | ||||
|         log.info( | ||||
|             f'`mp`-subactor {actor.uid} exited' | ||||
|         ) | ||||
|         log.info(f"Subactor {actor.uid} terminated") | ||||
| 
 | ||||
| 
 | ||||
| # TODO: move this func to some kinda `.devx._conc_lang.py` eventually | ||||
|  |  | |||
|  | @ -59,7 +59,6 @@ import os | |||
| import warnings | ||||
| 
 | ||||
| import trio | ||||
| from trio._core import _run as trio_runtime | ||||
| from trio import ( | ||||
|     CancelScope, | ||||
|     Nursery, | ||||
|  | @ -81,7 +80,6 @@ from ._context import ( | |||
| from .log import get_logger | ||||
| from ._exceptions import ( | ||||
|     ContextCancelled, | ||||
|     InternalError, | ||||
|     ModuleNotExposed, | ||||
|     MsgTypeError, | ||||
|     unpack_error, | ||||
|  | @ -104,7 +102,6 @@ from ._rpc import ( | |||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._supervise import ActorNursery | ||||
|     from trio._channel import MemoryChannelState | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger('tractor') | ||||
|  | @ -900,15 +897,11 @@ class Actor: | |||
|                 f'peer: {chan.uid}\n' | ||||
|                 f'cid:{cid}\n' | ||||
|             ) | ||||
|             ctx._allow_overruns: bool = allow_overruns | ||||
|             ctx._allow_overruns = allow_overruns | ||||
| 
 | ||||
|             # adjust buffer size if specified | ||||
|             state: MemoryChannelState  = ctx._send_chan._state  # type: ignore | ||||
|             if ( | ||||
|                 msg_buffer_size | ||||
|                 and | ||||
|                 state.max_buffer_size != msg_buffer_size | ||||
|             ): | ||||
|             state = ctx._send_chan._state  # type: ignore | ||||
|             if msg_buffer_size and state.max_buffer_size != msg_buffer_size: | ||||
|                 state.max_buffer_size = msg_buffer_size | ||||
| 
 | ||||
|         except KeyError: | ||||
|  | @ -1102,36 +1095,7 @@ class Actor: | |||
|                                 '`tractor.pause_from_sync()` not available!' | ||||
|                             ) | ||||
| 
 | ||||
|                 # XXX ensure the "infected `asyncio` mode" setting | ||||
|                 # passed down from our spawning parent is consistent | ||||
|                 # with `trio`-runtime initialization: | ||||
|                 # - during sub-proc boot, the entrypoint func | ||||
|                 #   (`._entry.<spawn_backend>_main()`) should set | ||||
|                 #   `._infected_aio = True` before calling | ||||
|                 #   `run_as_asyncio_guest()`, | ||||
|                 # - the value of `infect_asyncio: bool = True` as | ||||
|                 #   passed to `ActorNursery.start_actor()` must be | ||||
|                 #   the same as `_runtime_vars['_is_infected_aio']` | ||||
|                 if ( | ||||
|                     (aio_rtv := rvs['_is_infected_aio']) | ||||
|                     != | ||||
|                     (aio_attr := self._infected_aio) | ||||
|                 ): | ||||
|                     raise InternalError( | ||||
|                         'Parent sent runtime-vars that mismatch for the ' | ||||
|                         '"infected `asyncio` mode" settings ?!?\n\n' | ||||
| 
 | ||||
|                         f'rvs["_is_infected_aio"] = {aio_rtv}\n' | ||||
|                         f'self._infected_aio = {aio_attr}\n' | ||||
|                     ) | ||||
|                 if aio_rtv: | ||||
|                     assert trio_runtime.GLOBAL_RUN_CONTEXT.runner.is_guest | ||||
|                     # ^TODO^ possibly add a `sniffio` or | ||||
|                     # `trio` pub-API for `is_guest_mode()`? | ||||
| 
 | ||||
|                 rvs['_is_root'] = False  # obvi XD | ||||
| 
 | ||||
|                 # update process-wide globals | ||||
|                 rvs['_is_root'] = False | ||||
|                 _state._runtime_vars.update(rvs) | ||||
| 
 | ||||
|                 # XXX: ``msgspec`` doesn't support serializing tuples | ||||
|  |  | |||
|  | @ -44,8 +44,6 @@ _runtime_vars: dict[str, Any] = { | |||
|     '_root_mailbox': (None, None), | ||||
|     '_registry_addrs': [], | ||||
| 
 | ||||
|     '_is_infected_aio': False, | ||||
| 
 | ||||
|     # for `tractor.pause_from_sync()` & `breakpoint()` support | ||||
|     'use_greenback': False, | ||||
| } | ||||
|  | @ -72,8 +70,7 @@ def current_actor( | |||
|     ''' | ||||
|     if ( | ||||
|         err_on_no_runtime | ||||
|         and | ||||
|         _current_actor is None | ||||
|         and _current_actor is None | ||||
|     ): | ||||
|         msg: str = 'No local actor has been initialized yet?\n' | ||||
|         from ._exceptions import NoRuntime | ||||
|  |  | |||
|  | @ -158,7 +158,6 @@ class ActorNursery: | |||
|         # configure and pass runtime state | ||||
|         _rtv = _state._runtime_vars.copy() | ||||
|         _rtv['_is_root'] = False | ||||
|         _rtv['_is_infected_aio'] = infect_asyncio | ||||
| 
 | ||||
|         # allow setting debug policy per actor | ||||
|         if debug_mode is not None: | ||||
|  |  | |||
|  | @ -276,10 +276,7 @@ def _run_asyncio_task( | |||
|     chan._aio_task: asyncio.Task = task | ||||
| 
 | ||||
|     # XXX TODO XXX get this actually workin.. XD | ||||
|     # -[ ] we need logic to setup `greenback` for `asyncio`-side task | ||||
|     #     REPLing.. which should normally be nearly the same as for | ||||
|     #     `trio`? | ||||
|     # -[ ] add to a new `.devx._greenback.maybe_init_for_asyncio()`? | ||||
|     # maybe setup `greenback` for `asyncio`-side task REPLing | ||||
|     if ( | ||||
|         debug_mode() | ||||
|         and | ||||
|  | @ -308,22 +305,15 @@ def _run_asyncio_task( | |||
| 
 | ||||
|             msg: str = ( | ||||
|                 'Infected `asyncio` task {etype_str}\n' | ||||
|                 f'|_{task}\n' | ||||
|             ) | ||||
|             if isinstance(terr, CancelledError): | ||||
|                 msg += ( | ||||
|                     f'c)>\n' | ||||
|                     f' |_{task}\n' | ||||
|                 ) | ||||
|                 log.cancel( | ||||
|                     msg.format(etype_str='cancelled') | ||||
|                 ) | ||||
|             else: | ||||
|                 msg += ( | ||||
|                     f'x)>\n' | ||||
|                     f' |_{task}\n' | ||||
|                 ) | ||||
|                 log.exception( | ||||
|                     msg.format(etype_str='errored') | ||||
|                     msg.format(etype_str='cancelled') | ||||
|                 ) | ||||
| 
 | ||||
|             assert type(terr) is type(aio_err), ( | ||||
|  | @ -629,10 +619,9 @@ def run_as_asyncio_guest( | |||
|         #     ) | ||||
| 
 | ||||
|         def trio_done_callback(main_outcome): | ||||
|             log.runtime( | ||||
|                 f'`trio` guest-run finishing with outcome\n' | ||||
|                 f'>) {main_outcome}\n' | ||||
|                 f'|_{trio_done_fute}\n' | ||||
|             log.info( | ||||
|                 f'trio_main finished with\n' | ||||
|                 f'|_{main_outcome!r}' | ||||
|             ) | ||||
| 
 | ||||
|             if isinstance(main_outcome, Error): | ||||
|  | @ -654,12 +643,6 @@ def run_as_asyncio_guest( | |||
|             else: | ||||
|                 trio_done_fute.set_result(main_outcome) | ||||
| 
 | ||||
|             log.info( | ||||
|                 f'`trio` guest-run finished with outcome\n' | ||||
|                 f')>\n' | ||||
|                 f'|_{trio_done_fute}\n' | ||||
|             ) | ||||
| 
 | ||||
|         startup_msg += ( | ||||
|             f'-> created {trio_done_callback!r}\n' | ||||
|             f'-> scheduling `trio_main`: {trio_main!r}\n' | ||||
|  | @ -698,8 +681,7 @@ def run_as_asyncio_guest( | |||
|             # error path in `asyncio`'s runtime..? | ||||
|             asyncio.CancelledError, | ||||
| 
 | ||||
|         ) as _fute_err: | ||||
|             fute_err = _fute_err | ||||
|         ) as fute_err: | ||||
|             err_message: str = ( | ||||
|                 'main `asyncio` task ' | ||||
|             ) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue