From 71cf9e7bd3404500acc799cd2f3da756a9a5ab96 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 13 Jul 2024 00:16:28 -0400 Subject: [PATCH] First draft, `asyncio`-task, sync-pausing Bo Mostly due to magic from @oremanj where we slap in a little bit of `.from_asyncio`-type stuff to run a `trio`-task from `asyncio.Task` code! I'm not gonna go into tooo too much detail but basically the primary thing needed was a way to (blocking-ly) invoke a `trio.lowlevel.Task` from an `asyncio` one (which we now have with a new `run_trio_task_in_future()` thanks to draft code from the aforementioned jefe) which we now invoke from a dedicated aio case-branch inside `.devx._debug.pause_from_sync()`. Further include a case inside `DebugStatus.release()` to handle using the same func to set the `repl_release: trio.Event` from the aio side when releasing the REPL on exit cmds. Prolly more refinements to come ;{o --- examples/debugging/asyncio_bp.py | 21 +-- tractor/devx/_debug.py | 213 +++++++++++++++++++++---------- tractor/to_asyncio.py | 95 ++++++++++++++ 3 files changed, 255 insertions(+), 74 deletions(-) diff --git a/examples/debugging/asyncio_bp.py b/examples/debugging/asyncio_bp.py index baddfe0..f2fabdd 100644 --- a/examples/debugging/asyncio_bp.py +++ b/examples/debugging/asyncio_bp.py @@ -2,7 +2,10 @@ import asyncio import trio import tractor -from tractor import to_asyncio +from tractor import ( + to_asyncio, + Portal, +) async def aio_sleep_forever(): @@ -43,7 +46,7 @@ async def bp_then_error( @tractor.context async def trio_ctx( ctx: tractor.Context, - bp_before_started: bool = False, + bp_before_started: bool = True, ): # this will block until the ``asyncio`` task sends a "first" @@ -57,7 +60,6 @@ async def trio_ctx( trio.open_nursery() as n, ): - assert first == 'start' if bp_before_started: @@ -73,15 +75,18 @@ async def trio_ctx( async def main( - bps_all_over: bool = False, + bps_all_over: bool = True, ) -> None: async with tractor.open_nursery( - # debug_mode=True, + debug_mode=True, + maybe_enable_greenback=True, + # loglevel='devx', + # loglevel='runtime', ) as n: - p = await n.start_actor( + ptl: Portal = await n.start_actor( 'aio_daemon', enable_modules=[__name__], infect_asyncio=True, @@ -89,7 +94,7 @@ async def main( loglevel='cancel', ) - async with p.open_context( + async with ptl.open_context( trio_ctx, bp_before_started=bps_all_over, ) as (ctx, first): @@ -105,7 +110,7 @@ async def main( # TODO: case where we cancel from trio-side while asyncio task # has debugger lock? - # await p.cancel_actor() + # await ptl.cancel_actor() if __name__ == '__main__': diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index 02551fa..35c1c44 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -20,6 +20,7 @@ Multi-core debugging for da peeps! """ from __future__ import annotations +import asyncio import bdb from contextlib import ( asynccontextmanager as acm, @@ -67,6 +68,7 @@ from trio import ( TaskStatus, ) import tractor +from tractor.to_asyncio import run_trio_task_in_future from tractor.log import get_logger from tractor._context import Context from tractor import _state @@ -296,7 +298,7 @@ class Lock: ) @classmethod - @pdbp.hideframe + # @pdbp.hideframe def release( cls, raise_on_thread: bool = True, @@ -310,39 +312,40 @@ class Lock: we_released: bool = False ctx_in_debug: Context|None = cls.ctx_in_debug repl_task: Task|Thread|None = DebugStatus.repl_task - if not DebugStatus.is_main_trio_thread(): - thread: threading.Thread = threading.current_thread() - message: str = ( - '`Lock.release()` can not be called from a non-main-`trio` thread!\n' - f'{thread}\n' - ) - if raise_on_thread: - raise RuntimeError(message) - - log.devx(message) - return False - - task: Task = current_task() - - # sanity check that if we're the root actor - # the lock is marked as such. - # note the pre-release value may be diff the the - # post-release task. - if repl_task is task: - assert cls._owned_by_root - message: str = ( - 'TTY lock held by root-actor on behalf of local task\n' - f'|_{repl_task}\n' - ) - else: - assert DebugStatus.repl_task is not task - - message: str = ( - 'TTY lock was NOT released on behalf of caller\n' - f'|_{task}\n' - ) try: + if not DebugStatus.is_main_trio_thread(): + thread: threading.Thread = threading.current_thread() + message: str = ( + '`Lock.release()` can not be called from a non-main-`trio` thread!\n' + f'{thread}\n' + ) + if raise_on_thread: + raise RuntimeError(message) + + log.devx(message) + return False + + task: Task = current_task() + + # sanity check that if we're the root actor + # the lock is marked as such. + # note the pre-release value may be diff the the + # post-release task. + if repl_task is task: + assert cls._owned_by_root + message: str = ( + 'TTY lock held by root-actor on behalf of local task\n' + f'|_{repl_task}\n' + ) + else: + assert DebugStatus.repl_task is not task + + message: str = ( + 'TTY lock was NOT released on behalf of caller\n' + f'|_{task}\n' + ) + lock: trio.StrictFIFOLock = cls._debug_lock owner: Task = lock.statistics().owner if ( @@ -788,7 +791,14 @@ class DebugStatus: # in which case schedule the SIGINT shielding override # to in the main thread. # https://docs.python.org/3/library/signal.html#signals-and-threads - if not cls.is_main_trio_thread(): + if ( + not cls.is_main_trio_thread() + and + not _state._runtime_vars.get( + '_is_infected_aio', + False, + ) + ): cls._orig_sigint_handler: Callable = trio.from_thread.run_sync( signal.signal, signal.SIGINT, @@ -813,7 +823,16 @@ class DebugStatus: # always restore ``trio``'s sigint handler. see notes below in # the pdb factory about the nightmare that is that code swapping # out the handler when the repl activates... - if not cls.is_main_trio_thread(): + # if not cls.is_main_trio_thread(): + if ( + not cls.is_main_trio_thread() + and + # not _state._runtime_vars.get( + # '_is_infected_aio', + # False, + # ) + not current_actor().is_infected_aio() + ): trio.from_thread.run_sync( signal.signal, signal.SIGINT, @@ -871,7 +890,7 @@ class DebugStatus: return False @classmethod - @pdbp.hideframe + # @pdbp.hideframe def release( cls, cancel_req_task: bool = False, @@ -880,11 +899,21 @@ class DebugStatus: try: # sometimes the task might already be terminated in # which case this call will raise an RTE? - if ( - repl_release is not None - ): + if repl_release is not None: if cls.is_main_trio_thread(): repl_release.set() + + elif current_actor().is_infected_aio(): + + async def _set_repl_release(): + repl_release.set() + + fute: asyncio.Future = run_trio_task_in_future( + _set_repl_release + ) + if not fute.done(): + log.warning('REPL release state unknown..?') + else: # XXX NOTE ONLY used for bg root-actor sync # threads, see `.pause_from_sync()`. @@ -1658,18 +1687,24 @@ async def _pause( try: task: Task = current_task() except RuntimeError as rte: + # NOTE, 2 cases we might get here: + # + # - ACTUALLY not a `trio.lowlevel.Task` nor runtime caller, + # |_ error out as normal + # + # - an infected `asycio` actor calls it from an actual + # `asyncio.Task` + # |_ in this case we DO NOT want to RTE! __tracebackhide__: bool = False - log.exception( - 'Failed to get current `trio`-task?' - ) - # if actor.is_infected_aio(): - # mk_pdb().set_trace() - # raise RuntimeError( - # '`tractor.pause[_from_sync]()` not yet supported ' - # 'directly (infected) `asyncio` tasks!' - # ) from rte - - raise rte + if actor.is_infected_aio(): + log.exception( + 'Failed to get current `trio`-task?' + ) + raise RuntimeError( + 'An `asyncio` task should not be calling this!?' + ) from rte + else: + task = asyncio.current_task() if debug_func is not None: debug_func = partial(debug_func) @@ -2060,7 +2095,8 @@ async def _pause( f'on behalf of {repl_task} ??\n' ) - DebugStatus.release(cancel_req_task=True) + if not actor.is_infected_aio(): + DebugStatus.release(cancel_req_task=True) # sanity checks for ^ on request/status teardown # assert DebugStatus.repl is None # XXX no more bc bg thread cases? @@ -2113,7 +2149,9 @@ def _set_trace( log.pdb( f'{_pause_msg}\n' f'>(\n' - f'|_ {task} @ {actor.uid}\n' + f'|_{actor.uid}\n' + f' |_{task}\n' # @ {actor.uid}\n' + # f'|_{task}\n' # ^-TODO-^ more compact pformating? # -[ ] make an `Actor.__repr()__` # -[ ] should we use `log.pformat_task_uid()`? @@ -2390,9 +2428,6 @@ def pause_from_sync( actor: tractor.Actor = current_actor( err_on_no_runtime=False, ) - message: str = ( - f'{actor.uid} task called `tractor.pause_from_sync()`\n' - ) if not actor: raise RuntimeError( 'Not inside the `tractor`-runtime?\n' @@ -2400,6 +2435,9 @@ def pause_from_sync( '- `async with tractor.open_nursery()` or,\n' '- `async with tractor.open_root_actor()`\n' ) + message: str = ( + f'{actor.uid} task called `tractor.pause_from_sync()`\n' + ) # TODO: once supported, remove this AND the one # inside `._pause()`! @@ -2409,16 +2447,17 @@ def pause_from_sync( # injection? # -[ ] should `breakpoint()` work and what does it normally # do in `asyncio` ctxs? - if actor.is_infected_aio(): - raise RuntimeError( - '`tractor.pause[_from_sync]()` not yet supported ' - 'for infected `asyncio` mode!' - ) + # if actor.is_infected_aio(): + # raise RuntimeError( + # '`tractor.pause[_from_sync]()` not yet supported ' + # 'for infected `asyncio` mode!' + # ) repl: PdbREPL = mk_pdb() # message += f'-> created local REPL {repl}\n' is_root: bool = is_root_process() + is_aio: bool = actor.is_infected_aio() # TODO: we could also check for a non-`.to_thread` context # using `trio.from_thread.check_cancelled()` (says @@ -2431,8 +2470,11 @@ def pause_from_sync( # when called from a (bg) thread, run an async task in a new # thread which will call `._pause()` manually with special # handling for root-actor caller usage. - if not DebugStatus.is_main_trio_thread(): - + if ( + not DebugStatus.is_main_trio_thread() + and + not is_aio # see below for this usage + ): # TODO: `threading.Lock()` this so we don't get races in # multi-thr cases where they're acquiring/releasing the # REPL and setting request/`Lock` state, etc.. @@ -2440,10 +2482,21 @@ def pause_from_sync( repl_owner = thread # TODO: make root-actor bg thread usage work! - if is_root: + if ( + is_root + # or + # is_aio + ): + if is_root: + message += ( + f'-> called from a root-actor bg {thread}\n' + ) + elif is_aio: + message += ( + f'-> called from a `asyncio`-task bg {thread}\n' + ) message += ( - f'-> called from a root-actor bg {thread}\n' - f'-> scheduling `._pause_from_bg_root_thread()`..\n' + '-> scheduling `._pause_from_bg_root_thread()`..\n' ) # XXX SUBTLE BADNESS XXX that should really change! # don't over-write the `repl` here since when @@ -2462,7 +2515,8 @@ def pause_from_sync( hide_tb=hide_tb, **_pause_kwargs, ), - ) + ), + trio_token=trio.lowlevel.current_trio_token(), ) DebugStatus.shield_sigint() message += ( @@ -2495,6 +2549,29 @@ def pause_from_sync( DebugStatus.shield_sigint() assert bg_task is not DebugStatus.repl_task + elif is_aio: + greenback: ModuleType = maybe_import_greenback() + repl_owner: Task = asyncio.current_task() + fute: asyncio.Future = run_trio_task_in_future( + partial( + _pause, + debug_func=None, + repl=repl, + hide_tb=hide_tb, + + # XXX to prevent `._pause()` for setting + # `DebugStatus.repl_task` to the gb task! + called_from_sync=True, + called_from_bg_thread=True, + + **_pause_kwargs + ) + ) + # TODO: for async version -> `.pause_from_aio()`? + # bg_task, _ = await fute + bg_task, _ = greenback.await_(fute) + bg_task: asyncio.Task = asyncio.current_task() + else: # we are presumably the `trio.run()` + main thread # raises on not-found by default greenback: ModuleType = maybe_import_greenback() @@ -2509,8 +2586,8 @@ def pause_from_sync( # NOTE XXX seems to need to be set BEFORE the `_pause()` # invoke using gb below? DebugStatus.shield_sigint() - repl_owner: Task = current_task() + message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' try: out = greenback.await_( @@ -2572,6 +2649,10 @@ def pause_from_sync( # -[ ] tried to use `@pdbp.hideframe` decoration but # still doesn't work except BaseException as err: + log.exception( + 'Failed to sync-pause from\n\n' + f'{repl_owner}\n' + ) __tracebackhide__: bool = False raise err diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index c37c046..8feaf88 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -562,6 +562,101 @@ class AsyncioRuntimeTranslationError(RuntimeError): ''' +def run_trio_task_in_future( + async_fn, + *args, +) -> asyncio.Future: + ''' + Run an async-func as a `trio` task from an `asyncio.Task` wrapped + in a `asyncio.Future` which is returned to the caller. + + Another astounding feat by the great @oremanj !! + + Bo + + ''' + result_future = asyncio.Future() + cancel_scope = trio.CancelScope() + finished: bool = False + + # monkey-patch the future's `.cancel()` meth to + # allow cancellation relay to `trio`-task. + cancel_message: str|None = None + orig_cancel = result_future.cancel + + def wrapped_cancel( + msg: str|None = None, + ): + nonlocal cancel_message + if finished: + # We're being called back after the task completed + if msg is not None: + return orig_cancel(msg) + elif cancel_message is not None: + return orig_cancel(cancel_message) + else: + return orig_cancel() + + if result_future.done(): + return False + + # Forward cancellation to the Trio task, don't mark + # future as cancelled until it completes + cancel_message = msg + cancel_scope.cancel() + return True + + result_future.cancel = wrapped_cancel + + async def trio_task() -> None: + nonlocal finished + try: + with cancel_scope: + try: + # TODO: type this with new tech in 3.13 + result: Any = await async_fn(*args) + finally: + finished = True + + # Propagate result or cancellation to the Future + if cancel_scope.cancelled_caught: + result_future.cancel() + + elif not result_future.cancelled(): + result_future.set_result(result) + + except BaseException as exc: + # the result future gets all the non-Cancelled + # exceptions. Any Cancelled need to keep propagating + # out of this stack frame in order to reach the cancel + # scope for which they're intended. + cancelled: BaseException|None + rest: BaseException|None + if isinstance(exc, BaseExceptionGroup): + cancelled, rest = exc.split(trio.Cancelled) + + elif isinstance(exc, trio.Cancelled): + cancelled, rest = exc, None + + else: + cancelled, rest = None, exc + + if not result_future.cancelled(): + if rest: + result_future.set_exception(rest) + else: + result_future.cancel() + + if cancelled: + raise cancelled + + trio.lowlevel.spawn_system_task( + trio_task, + name=async_fn, + ) + return result_future + + def run_as_asyncio_guest( trio_main: Callable, # ^-NOTE-^ when spawned with `infected_aio=True` this func is