Compare commits
	
		
			1 Commits 
		
	
	
		
			a69bc00593
			...
			71cf9e7bd3
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 71cf9e7bd3 | 
|  | @ -2,7 +2,10 @@ import asyncio | |||
| 
 | ||||
| import trio | ||||
| import tractor | ||||
| from tractor import to_asyncio | ||||
| from tractor import ( | ||||
|     to_asyncio, | ||||
|     Portal, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| async def aio_sleep_forever(): | ||||
|  | @ -43,7 +46,7 @@ async def bp_then_error( | |||
| @tractor.context | ||||
| async def trio_ctx( | ||||
|     ctx: tractor.Context, | ||||
|     bp_before_started: bool = False, | ||||
|     bp_before_started: bool = True, | ||||
| ): | ||||
| 
 | ||||
|     # this will block until the ``asyncio`` task sends a "first" | ||||
|  | @ -57,7 +60,6 @@ async def trio_ctx( | |||
| 
 | ||||
|         trio.open_nursery() as n, | ||||
|     ): | ||||
| 
 | ||||
|         assert first == 'start' | ||||
| 
 | ||||
|         if bp_before_started: | ||||
|  | @ -73,15 +75,18 @@ async def trio_ctx( | |||
| 
 | ||||
| 
 | ||||
| async def main( | ||||
|     bps_all_over: bool = False, | ||||
|     bps_all_over: bool = True, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     async with tractor.open_nursery( | ||||
|         # debug_mode=True, | ||||
|         debug_mode=True, | ||||
|         maybe_enable_greenback=True, | ||||
|         # loglevel='devx', | ||||
|         # loglevel='runtime', | ||||
|     ) as n: | ||||
| 
 | ||||
|         p = await n.start_actor( | ||||
|         ptl: Portal = await n.start_actor( | ||||
|             'aio_daemon', | ||||
|             enable_modules=[__name__], | ||||
|             infect_asyncio=True, | ||||
|  | @ -89,7 +94,7 @@ async def main( | |||
|             loglevel='cancel', | ||||
|         ) | ||||
| 
 | ||||
|         async with p.open_context( | ||||
|         async with ptl.open_context( | ||||
|             trio_ctx, | ||||
|             bp_before_started=bps_all_over, | ||||
|         ) as (ctx, first): | ||||
|  | @ -105,7 +110,7 @@ async def main( | |||
| 
 | ||||
|         # TODO: case where we cancel from trio-side while asyncio task | ||||
|         # has debugger lock? | ||||
|         # await p.cancel_actor() | ||||
|         # await ptl.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|  |  | |||
|  | @ -20,6 +20,7 @@ Multi-core debugging for da peeps! | |||
| 
 | ||||
| """ | ||||
| from __future__ import annotations | ||||
| import asyncio | ||||
| import bdb | ||||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
|  | @ -67,6 +68,7 @@ from trio import ( | |||
|     TaskStatus, | ||||
| ) | ||||
| import tractor | ||||
| from tractor.to_asyncio import run_trio_task_in_future | ||||
| from tractor.log import get_logger | ||||
| from tractor._context import Context | ||||
| from tractor import _state | ||||
|  | @ -296,7 +298,7 @@ class Lock: | |||
|         ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     @pdbp.hideframe | ||||
|     # @pdbp.hideframe | ||||
|     def release( | ||||
|         cls, | ||||
|         raise_on_thread: bool = True, | ||||
|  | @ -310,6 +312,8 @@ class Lock: | |||
|         we_released: bool = False | ||||
|         ctx_in_debug: Context|None = cls.ctx_in_debug | ||||
|         repl_task: Task|Thread|None = DebugStatus.repl_task | ||||
| 
 | ||||
|         try: | ||||
|             if not DebugStatus.is_main_trio_thread(): | ||||
|                 thread: threading.Thread = threading.current_thread() | ||||
|                 message: str = ( | ||||
|  | @ -342,7 +346,6 @@ class Lock: | |||
|                 f'|_{task}\n' | ||||
|             ) | ||||
| 
 | ||||
|         try: | ||||
|             lock: trio.StrictFIFOLock = cls._debug_lock | ||||
|             owner: Task = lock.statistics().owner | ||||
|             if ( | ||||
|  | @ -788,7 +791,14 @@ class DebugStatus: | |||
|         # in which case schedule the SIGINT shielding override | ||||
|         # to in the main thread. | ||||
|         # https://docs.python.org/3/library/signal.html#signals-and-threads | ||||
|         if not cls.is_main_trio_thread(): | ||||
|         if ( | ||||
|             not cls.is_main_trio_thread() | ||||
|             and | ||||
|             not _state._runtime_vars.get( | ||||
|                 '_is_infected_aio', | ||||
|                 False, | ||||
|             ) | ||||
|         ): | ||||
|             cls._orig_sigint_handler: Callable = trio.from_thread.run_sync( | ||||
|                 signal.signal, | ||||
|                 signal.SIGINT, | ||||
|  | @ -813,7 +823,16 @@ class DebugStatus: | |||
|         # always restore ``trio``'s sigint handler. see notes below in | ||||
|         # the pdb factory about the nightmare that is that code swapping | ||||
|         # out the handler when the repl activates... | ||||
|         if not cls.is_main_trio_thread(): | ||||
|         # if not cls.is_main_trio_thread(): | ||||
|         if ( | ||||
|             not cls.is_main_trio_thread() | ||||
|             and | ||||
|             # not _state._runtime_vars.get( | ||||
|             #     '_is_infected_aio', | ||||
|             #     False, | ||||
|             # ) | ||||
|             not current_actor().is_infected_aio() | ||||
|         ): | ||||
|             trio.from_thread.run_sync( | ||||
|                 signal.signal, | ||||
|                 signal.SIGINT, | ||||
|  | @ -871,7 +890,7 @@ class DebugStatus: | |||
|         return False | ||||
| 
 | ||||
|     @classmethod | ||||
|     @pdbp.hideframe | ||||
|     # @pdbp.hideframe | ||||
|     def release( | ||||
|         cls, | ||||
|         cancel_req_task: bool = False, | ||||
|  | @ -880,11 +899,21 @@ class DebugStatus: | |||
|         try: | ||||
|             # sometimes the task might already be terminated in | ||||
|             # which case this call will raise an RTE? | ||||
|             if ( | ||||
|                 repl_release is not None | ||||
|             ): | ||||
|             if repl_release is not None: | ||||
|                 if cls.is_main_trio_thread(): | ||||
|                     repl_release.set() | ||||
| 
 | ||||
|                 elif current_actor().is_infected_aio(): | ||||
| 
 | ||||
|                     async def _set_repl_release(): | ||||
|                         repl_release.set() | ||||
| 
 | ||||
|                     fute: asyncio.Future = run_trio_task_in_future( | ||||
|                         _set_repl_release | ||||
|                     ) | ||||
|                     if not fute.done(): | ||||
|                         log.warning('REPL release state unknown..?') | ||||
| 
 | ||||
|                 else: | ||||
|                     # XXX NOTE ONLY used for bg root-actor sync | ||||
|                     # threads, see `.pause_from_sync()`. | ||||
|  | @ -1658,18 +1687,24 @@ async def _pause( | |||
|     try: | ||||
|         task: Task = current_task() | ||||
|     except RuntimeError as rte: | ||||
|         # NOTE, 2 cases we might get here: | ||||
|         # | ||||
|         # - ACTUALLY not a `trio.lowlevel.Task` nor runtime caller, | ||||
|         #  |_ error out as normal | ||||
|         # | ||||
|         # - an infected `asycio` actor calls it from an actual | ||||
|         #   `asyncio.Task` | ||||
|         #  |_ in this case we DO NOT want to RTE! | ||||
|         __tracebackhide__: bool = False | ||||
|         if actor.is_infected_aio(): | ||||
|             log.exception( | ||||
|                 'Failed to get current `trio`-task?' | ||||
|             ) | ||||
|         # if actor.is_infected_aio(): | ||||
|             # mk_pdb().set_trace() | ||||
|             # raise RuntimeError( | ||||
|             #     '`tractor.pause[_from_sync]()` not yet supported ' | ||||
|             #     'directly (infected) `asyncio` tasks!' | ||||
|             # ) from rte | ||||
| 
 | ||||
|         raise rte | ||||
|             raise RuntimeError( | ||||
|                 'An `asyncio` task should not be calling this!?' | ||||
|             ) from rte | ||||
|         else: | ||||
|             task = asyncio.current_task() | ||||
| 
 | ||||
|     if debug_func is not None: | ||||
|         debug_func = partial(debug_func) | ||||
|  | @ -2060,6 +2095,7 @@ async def _pause( | |||
|                 f'on behalf of {repl_task} ??\n' | ||||
|             ) | ||||
| 
 | ||||
|         if not actor.is_infected_aio(): | ||||
|             DebugStatus.release(cancel_req_task=True) | ||||
| 
 | ||||
|         # sanity checks for ^ on request/status teardown | ||||
|  | @ -2113,7 +2149,9 @@ def _set_trace( | |||
|     log.pdb( | ||||
|         f'{_pause_msg}\n' | ||||
|         f'>(\n' | ||||
|         f'|_ {task} @ {actor.uid}\n' | ||||
|         f'|_{actor.uid}\n' | ||||
|         f'  |_{task}\n' #  @ {actor.uid}\n' | ||||
|         # f'|_{task}\n' | ||||
|         # ^-TODO-^ more compact pformating? | ||||
|         # -[ ] make an `Actor.__repr()__` | ||||
|         # -[ ] should we use `log.pformat_task_uid()`? | ||||
|  | @ -2390,9 +2428,6 @@ def pause_from_sync( | |||
|         actor: tractor.Actor = current_actor( | ||||
|             err_on_no_runtime=False, | ||||
|         ) | ||||
|         message: str = ( | ||||
|             f'{actor.uid} task called `tractor.pause_from_sync()`\n' | ||||
|         ) | ||||
|         if not actor: | ||||
|             raise RuntimeError( | ||||
|                 'Not inside the `tractor`-runtime?\n' | ||||
|  | @ -2400,6 +2435,9 @@ def pause_from_sync( | |||
|                 '- `async with tractor.open_nursery()` or,\n' | ||||
|                 '- `async with tractor.open_root_actor()`\n' | ||||
|             ) | ||||
|         message: str = ( | ||||
|             f'{actor.uid} task called `tractor.pause_from_sync()`\n' | ||||
|         ) | ||||
| 
 | ||||
|         # TODO: once supported, remove this AND the one | ||||
|         # inside `._pause()`! | ||||
|  | @ -2409,16 +2447,17 @@ def pause_from_sync( | |||
|         #     injection? | ||||
|         # -[ ] should `breakpoint()` work and what does it normally | ||||
|         #     do in `asyncio` ctxs? | ||||
|         if actor.is_infected_aio(): | ||||
|             raise RuntimeError( | ||||
|                 '`tractor.pause[_from_sync]()` not yet supported ' | ||||
|                 'for infected `asyncio` mode!' | ||||
|             ) | ||||
|         # if actor.is_infected_aio(): | ||||
|         #     raise RuntimeError( | ||||
|         #         '`tractor.pause[_from_sync]()` not yet supported ' | ||||
|         #         'for infected `asyncio` mode!' | ||||
|         #     ) | ||||
| 
 | ||||
|         repl: PdbREPL = mk_pdb() | ||||
| 
 | ||||
|         # message += f'-> created local REPL {repl}\n' | ||||
|         is_root: bool = is_root_process() | ||||
|         is_aio: bool = actor.is_infected_aio() | ||||
| 
 | ||||
|         # TODO: we could also check for a non-`.to_thread` context | ||||
|         # using `trio.from_thread.check_cancelled()` (says | ||||
|  | @ -2431,8 +2470,11 @@ def pause_from_sync( | |||
|         # when called from a (bg) thread, run an async task in a new | ||||
|         # thread which will call `._pause()` manually with special | ||||
|         # handling for root-actor caller usage. | ||||
|         if not DebugStatus.is_main_trio_thread(): | ||||
| 
 | ||||
|         if ( | ||||
|             not DebugStatus.is_main_trio_thread() | ||||
|             and | ||||
|             not is_aio  # see below for this usage | ||||
|         ): | ||||
|             # TODO: `threading.Lock()` this so we don't get races in | ||||
|             # multi-thr cases where they're acquiring/releasing the | ||||
|             # REPL and setting request/`Lock` state, etc.. | ||||
|  | @ -2440,10 +2482,21 @@ def pause_from_sync( | |||
|             repl_owner = thread | ||||
| 
 | ||||
|             # TODO: make root-actor bg thread usage work! | ||||
|             if ( | ||||
|                 is_root | ||||
|                 # or | ||||
|                 # is_aio | ||||
|             ): | ||||
|                 if is_root: | ||||
|                     message += ( | ||||
|                         f'-> called from a root-actor bg {thread}\n' | ||||
|                     f'-> scheduling `._pause_from_bg_root_thread()`..\n' | ||||
|                     ) | ||||
|                 elif is_aio: | ||||
|                     message += ( | ||||
|                         f'-> called from a `asyncio`-task bg {thread}\n' | ||||
|                     ) | ||||
|                 message += ( | ||||
|                     '-> scheduling `._pause_from_bg_root_thread()`..\n' | ||||
|                 ) | ||||
|                 # XXX SUBTLE BADNESS XXX that should really change! | ||||
|                 # don't over-write the `repl` here since when | ||||
|  | @ -2462,7 +2515,8 @@ def pause_from_sync( | |||
|                             hide_tb=hide_tb, | ||||
|                             **_pause_kwargs, | ||||
|                         ), | ||||
|                     ) | ||||
|                     ), | ||||
|                     trio_token=trio.lowlevel.current_trio_token(), | ||||
|                 ) | ||||
|                 DebugStatus.shield_sigint() | ||||
|                 message += ( | ||||
|  | @ -2495,6 +2549,29 @@ def pause_from_sync( | |||
|                 DebugStatus.shield_sigint() | ||||
|                 assert bg_task is not DebugStatus.repl_task | ||||
| 
 | ||||
|         elif is_aio: | ||||
|             greenback: ModuleType = maybe_import_greenback() | ||||
|             repl_owner: Task = asyncio.current_task() | ||||
|             fute: asyncio.Future = run_trio_task_in_future( | ||||
|                 partial( | ||||
|                     _pause, | ||||
|                     debug_func=None, | ||||
|                     repl=repl, | ||||
|                     hide_tb=hide_tb, | ||||
| 
 | ||||
|                     # XXX to prevent `._pause()` for setting | ||||
|                     # `DebugStatus.repl_task` to the gb task! | ||||
|                     called_from_sync=True, | ||||
|                     called_from_bg_thread=True, | ||||
| 
 | ||||
|                     **_pause_kwargs | ||||
|                 ) | ||||
|             ) | ||||
|             # TODO: for async version -> `.pause_from_aio()`? | ||||
|             # bg_task, _ = await fute | ||||
|             bg_task, _ = greenback.await_(fute) | ||||
|             bg_task: asyncio.Task = asyncio.current_task() | ||||
| 
 | ||||
|         else:  # we are presumably the `trio.run()` + main thread | ||||
|             # raises on not-found by default | ||||
|             greenback: ModuleType = maybe_import_greenback() | ||||
|  | @ -2509,8 +2586,8 @@ def pause_from_sync( | |||
|             # NOTE XXX seems to need to be set BEFORE the `_pause()` | ||||
|             # invoke using gb below? | ||||
|             DebugStatus.shield_sigint() | ||||
| 
 | ||||
|             repl_owner: Task = current_task() | ||||
| 
 | ||||
|             message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' | ||||
|             try: | ||||
|                 out = greenback.await_( | ||||
|  | @ -2572,6 +2649,10 @@ def pause_from_sync( | |||
|         # -[ ] tried to use `@pdbp.hideframe` decoration but | ||||
|         #   still doesn't work | ||||
|     except BaseException as err: | ||||
|         log.exception( | ||||
|             'Failed to sync-pause from\n\n' | ||||
|             f'{repl_owner}\n' | ||||
|         ) | ||||
|         __tracebackhide__: bool = False | ||||
|         raise err | ||||
| 
 | ||||
|  |  | |||
|  | @ -562,6 +562,101 @@ class AsyncioRuntimeTranslationError(RuntimeError): | |||
|     ''' | ||||
| 
 | ||||
| 
 | ||||
| def run_trio_task_in_future( | ||||
|     async_fn, | ||||
|     *args, | ||||
| ) -> asyncio.Future: | ||||
|     ''' | ||||
|     Run an async-func as a `trio` task from an `asyncio.Task` wrapped | ||||
|     in a `asyncio.Future` which is returned to the caller. | ||||
| 
 | ||||
|     Another astounding feat by the great @oremanj !! | ||||
| 
 | ||||
|     Bo | ||||
| 
 | ||||
|     ''' | ||||
|     result_future = asyncio.Future() | ||||
|     cancel_scope = trio.CancelScope() | ||||
|     finished: bool = False | ||||
| 
 | ||||
|     # monkey-patch the future's `.cancel()` meth to | ||||
|     # allow cancellation relay to `trio`-task. | ||||
|     cancel_message: str|None = None | ||||
|     orig_cancel = result_future.cancel | ||||
| 
 | ||||
|     def wrapped_cancel( | ||||
|         msg: str|None = None, | ||||
|     ): | ||||
|         nonlocal cancel_message | ||||
|         if finished: | ||||
|             # We're being called back after the task completed | ||||
|             if msg is not None: | ||||
|                 return orig_cancel(msg) | ||||
|             elif cancel_message is not None: | ||||
|                 return orig_cancel(cancel_message) | ||||
|             else: | ||||
|                 return orig_cancel() | ||||
| 
 | ||||
|         if result_future.done(): | ||||
|             return False | ||||
| 
 | ||||
|         # Forward cancellation to the Trio task, don't mark | ||||
|         # future as cancelled until it completes | ||||
|         cancel_message = msg | ||||
|         cancel_scope.cancel() | ||||
|         return True | ||||
| 
 | ||||
|     result_future.cancel = wrapped_cancel | ||||
| 
 | ||||
|     async def trio_task() -> None: | ||||
|         nonlocal finished | ||||
|         try: | ||||
|             with cancel_scope: | ||||
|                 try: | ||||
|                     # TODO: type this with new tech in 3.13 | ||||
|                     result: Any = await async_fn(*args) | ||||
|                 finally: | ||||
|                     finished = True | ||||
| 
 | ||||
|             # Propagate result or cancellation to the Future | ||||
|             if cancel_scope.cancelled_caught: | ||||
|                 result_future.cancel() | ||||
| 
 | ||||
|             elif not result_future.cancelled(): | ||||
|                 result_future.set_result(result) | ||||
| 
 | ||||
|         except BaseException as exc: | ||||
|             # the result future gets all the non-Cancelled | ||||
|             # exceptions. Any Cancelled need to keep propagating | ||||
|             # out of this stack frame in order to reach the cancel | ||||
|             # scope for which they're intended. | ||||
|             cancelled: BaseException|None | ||||
|             rest: BaseException|None | ||||
|             if isinstance(exc, BaseExceptionGroup): | ||||
|                 cancelled, rest = exc.split(trio.Cancelled) | ||||
| 
 | ||||
|             elif isinstance(exc, trio.Cancelled): | ||||
|                 cancelled, rest = exc, None | ||||
| 
 | ||||
|             else: | ||||
|                 cancelled, rest = None, exc | ||||
| 
 | ||||
|             if not result_future.cancelled(): | ||||
|                 if rest: | ||||
|                     result_future.set_exception(rest) | ||||
|                 else: | ||||
|                     result_future.cancel() | ||||
| 
 | ||||
|             if cancelled: | ||||
|                 raise cancelled | ||||
| 
 | ||||
|     trio.lowlevel.spawn_system_task( | ||||
|         trio_task, | ||||
|         name=async_fn, | ||||
|     ) | ||||
|     return result_future | ||||
| 
 | ||||
| 
 | ||||
| def run_as_asyncio_guest( | ||||
|     trio_main: Callable, | ||||
|     # ^-NOTE-^ when spawned with `infected_aio=True` this func is | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue