From 6534a363a591166658810b852db71f17e5228ca6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Jun 2024 08:54:03 -0400 Subject: [PATCH] First proto: multi-threaded synced `pdb`-REPLs Functionally working for multi-threaded (via cpython threads spawned from `to_trio.to_thread.run_sync()`) alongside subactors, tested (for now) only with threads started inside the root actor (which seemed to have the most issues in terms of the impl and special cases..) using the new `tractor.pause_from_sync()` API! Main implementation changes to `.pause_from_sync()` ------ - ------ - from the root actor, we need to ensure bg thread case is handled *specially* since no IPC is used to request the TTY stdio mutex and `Lock` (API) usage is conducted entirely from a local task or thread; dedicated `Lock` usage for the root-actor already is branched inside `._pause()` and needs similar handling from a root bg-thread: |_for the special case of a root bg thread we need to `trio`-main-thread schedule a bg task inside a new `_pause_from_bg_root_thread()`. The new task needs to implement most of what was is handled inside `._pause()` manually, mostly because in this root-actor-bg-thread case we have 2 constraints: 1. to enter `PdbREPL.interaction()` **from the bg thread** directly, 2. the task that `Lock._debug_lock.acquire()`s has to be the same that calls `.release() (a `trio.FIFOLock` constraint) |_impl deats of this `_pause_from_bg_root_thread()` include: - (for now) calling `._pause()` to acquire the `Lock._debug_lock`. - setting its own `DebugStatus.repl_release`. - calling `.DebugStatus.shield_sigint()` to ensure the root's main thread uses the right handler when the bg one is REPL-ing. - wait manually on the `.repl_release()` to be set by the thread's dedicated `PdbREPL` exit. - manually calling `Lock.release()` from the **same task** that acquired it. - expect calls to `._pause()` to deliver a `tuple[Task, PdbREPL]` such that we always get the handle both to any newly created REPl instance and the (maybe) the scheduled bg task within which is runs. - add a single `message: str` style to `log.devx()` based on branching style for logging. - ensure both `DebugStatus.repl` and `.repl_task` are set **just before** calling `._set_trace()` to ensure the correct `Task|Thread` is set when the REPL is finally entered from sync code. - add a wrapping caller `_sync_pause_from_builtin()` which passes in the new `called_from_builtin=True` to indicate `breakpoint()` caller usage, obvi pass in `api_frame`. Changes to `._pause()` in support of ^ ------ - ------ - `TaskStatus.started()` and return the `tuple[Task, PdbREPL]` to callers / starters. - only call `DebugStatus.shield_sigint()` when no `repl` passed bc some callers (like bg threads) may need to apply it at some specific point themselves. - tweak some asserts for the `debug_func == None` / non-`trio`-thread case. - add a mod-level `_repl_fail_msg: str` to be used when there's an internal `._pause()` failure for testing, easier to pexpect match. - more comprehensive logging for the root-actor branched case to (attempt to) indicate any of the 3 cases: - remote ctx from subactor has the `Lock`, - already existing root task or thread has it or, - some kinda stale `.locked()` situation where the root has the lock but we don't know why. - for root usage, revert to always `await Lock._debug_lock.acquire()`-ing despite `called_from_sync` since `.pause_from_sync()` was reworked to instead handle the special bg thread case in the new `_pause_from_bg_root_thread()` task. - always do `return _enter_repl_sync(debug_func)`. - try to report any `repl_task: Task|Thread` set by the caller (particularly for the bg thread cases) as being the thread or task `._pause()` was called "on behalf of" Changes to `DebugStatus`/`Lock` in support of ^ ------ - ------ - only call `Lock.release()` from `DebugStatus.set_[quit/continue]()` when called from the main `trio` thread and always call `DebugStatus.release()` **after** to ensure `.repl_released()` is set **after** `._debug_lock.release()`. - only call `.repl_release.set()` from `trio` thread otherwise use `.from_thread.run()`. - much more refinements in `Lock.release()` for threading cases: - return `bool` to indicate whether lock was released by caller. - mask (in prep to drop) `_pause()` usage of `Lock.release.force=True)` since forcing a release can't ever avoid the RTE from `trio`.. same task **must** acquire/release. - don't allow usage from non-`trio`-main-threads, ever; there's no point since the same-task-needs-to-manage-`FIFOLock` constraint. - much more detailed logging using `message`-building-style for all caller (edge) cases. |_ use a `we_released: bool` to determine failed-to-release edge cases which can happen if called from bg threads, ensure we `log.exception()` on any incorrect usage resulting in release failure. |_ complain loudly if the release fails and some other task/thread still holds the lock. |_ be explicit about "who" (which task or thread) the release is "on behalf of" by reading `DebugStatus.repl_task` since the caller isn't the REPL operator in many sync cases. - more or less drop `force` support, as mentioned above. - ensure we unset `._owned_by_root` if the caller is a root task. Other misc ------ - ------ - rename `lock_tty_for_child()` -> `lock_stdio_for_peer()`. - rejig `Lock.repr()` to show lock and event stats. - stage `Lock.stats` and `.owner` methods in prep for doing a singleton instance and `@property`s. --- tractor/devx/_debug.py | 732 +++++++++++++++++++++++++++++------------ 1 file changed, 525 insertions(+), 207 deletions(-) diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index 858133f..3218cff 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -62,7 +62,6 @@ import trio from trio import CancelScope from trio.lowlevel import ( current_task, - Task, ) from trio import ( TaskStatus, @@ -81,6 +80,8 @@ from tractor._state import ( # ) if TYPE_CHECKING: + from trio.lowlevel import Task + from threading import Thread from tractor._ipc import Channel from tractor._context import Context from tractor._runtime import ( @@ -92,6 +93,11 @@ if TYPE_CHECKING: log = get_logger(__name__) +# TODO: refine the internal impl and APIs in this module! +# +# -[ ] separate `._pause()` branch-cases for calling from a root task +# vs. from subactors + def hide_runtime_frames() -> dict[FunctionType, CodeType]: ''' @@ -203,6 +209,15 @@ class Lock: return None + # TODO: once we convert to singleton-per-actor-style + # @property + # def stats(cls) -> trio.LockStatistics: + # return cls._debug_lock.statistics() + + # @property + # def owner(cls) -> Task: + # return cls._debug_lock.statistics().owner + # ROOT ONLY # ------ - ------- # the root-actor-ONLY singletons for, @@ -229,18 +244,22 @@ class Lock: @classmethod def repr(cls) -> str: lock_stats: trio.LockStatistics = cls._debug_lock.statistics() + req: trio.Event|None = cls.req_handler_finished fields: str = ( - f'req_handler_finished: {cls.req_handler_finished}\n' - f'_blocked: {cls._blocked}\n\n' - f'_debug_lock: {cls._debug_lock}\n' - f'lock_stats: {lock_stats}\n' - f'ctx_in_debug: {cls.ctx_in_debug}\n' + f'|_ ._blocked: {cls._blocked}\n' + f'|_ ._debug_lock: {cls._debug_lock}\n' + f' {lock_stats}\n\n' + f'|_ .ctx_in_debug: {cls.ctx_in_debug}\n' + f'|_ .req_handler_finished: {req}\n' ) + if req: + req_stats: trio.EventStatistics = req.statistics() + fields += f' {req_stats}\n' body: str = textwrap.indent( fields, - prefix=' |_', + prefix=' ', ) return ( f'<{cls.__name__}(\n' @@ -253,28 +272,59 @@ class Lock: def release( cls, force: bool = False, - ): - if not cls._owned_by_root: - message: str = 'TTY lock not held by any child\n' - else: - message: str = 'TTY lock held in root-actor task\n' + raise_on_thread: bool = True, - if not (is_trio_main := DebugStatus.is_main_trio_thread()): - task: threading.Thread = threading.current_thread() + ) -> bool: + ''' + Release the actor-tree global TTY stdio lock (only) from the + `trio.run()`-main-thread. + + ''' + we_released: bool = False + ctx_in_debug: Context|None = cls.ctx_in_debug + repl_task: Task|Thread|None = DebugStatus.repl_task + if not DebugStatus.is_main_trio_thread(): + thread: threading.Thread = threading.current_thread() + message: str = ( + '`Lock.release()` can not be called from a non-main-`trio` thread!\n' + f'{thread}\n' + ) + if raise_on_thread: + raise RuntimeError(message) + + log.devx(message) + return False + + task: Task = current_task() + + # sanity check that if we're the root actor + # the lock is marked as such. + # note the pre-release value may be diff the the + # post-release task. + if repl_task is task: + assert cls._owned_by_root + message: str = ( + 'TTY lock held by root-actor on behalf of local task\n' + f'|_{repl_task}\n' + ) else: - task: trio.Task = current_task() + assert DebugStatus.repl_task is not task + + message: str = ( + 'TTY lock was NOT released on behalf of caller\n' + f'|_{task}\n' + ) try: lock: trio.StrictFIFOLock = cls._debug_lock owner: Task = lock.statistics().owner if ( - lock.locked() - and - ( - owner is task - # or - # cls._owned_by_root - ) + (lock.locked() or force) + # ^-TODO-NOTE-^ should we just remove this, since the + # RTE case above will always happen when you force + # from the wrong task? + + and (owner is task) # ^-NOTE-^ if we do NOT ensure this, `trio` will # raise a RTE when a non-owner tries to releasee the # lock. @@ -284,17 +334,27 @@ class Lock: # being set to the `.repl_task` such that the above # condition matches and we actually release the lock. # This is particular of note from `.pause_from_sync()`! + ): - if not is_trio_main: - trio.from_thread.run_sync( - cls._debug_lock.release + cls._debug_lock.release() + we_released: bool = True + if repl_task: + message: str = ( + 'Lock released on behalf of root-actor-local REPL owner\n' + f'|_{repl_task}\n' ) else: - cls._debug_lock.release() - message: str = 'TTY lock released for child\n' + message: str = ( + 'TTY lock released by us on behalf of remote peer?\n' + f'|_ctx_in_debug: {ctx_in_debug}\n\n' + ) + # mk_pdb().set_trace() + # elif owner: except RuntimeError as rte: - log.exception('Failed to release `Lock`?') + log.exception( + 'Failed to release `Lock._debug_lock: trio.FIFOLock`?\n' + ) raise rte finally: @@ -303,40 +363,59 @@ class Lock: # we are now back in the "tty unlocked" state. This is basically # and edge triggered signal around an empty queue of sub-actor # tasks that may have tried to acquire the lock. - lock_stats = cls._debug_lock.statistics() + lock_stats: trio.LockStatistics = cls._debug_lock.statistics() req_handler_finished: trio.Event|None = Lock.req_handler_finished if ( not lock_stats.owner - or force and req_handler_finished is None ): - message += '-> No more child ctx tasks hold the TTY lock!\n' - - elif ( - req_handler_finished - and - lock.locked() - ): - req_stats = req_handler_finished.statistics() message += ( - f'-> A child ctx task still owns the `Lock` ??\n' - f' |_lock_stats: {lock_stats}\n' - f' |_req_stats: {req_stats}\n' + '-> No new task holds the TTY lock!\n\n' + f'{Lock.repr()}\n' ) - cls.ctx_in_debug = None + elif ( + req_handler_finished # new IPC ctx debug request active + and + lock.locked() # someone has the lock + ): + behalf_of_task = ( + ctx_in_debug + or + repl_task + ) + message += ( + f'\nA non-caller task still owns this lock on behalf of ' + f'{behalf_of_task}\n' + f'|_{lock_stats.owner}\n' + ) if ( - cls._owned_by_root + we_released + and + ctx_in_debug ): - if not lock.locked(): - cls._owned_by_root = False - else: - message += 'Lock still held by root actor task?!?\n' - lock.release() + cls.ctx_in_debug = None # unset + + # post-release value (should be diff then value above!) + repl_task: Task|Thread|None = DebugStatus.repl_task + if ( + cls._owned_by_root + and + we_released + ): + cls._owned_by_root = False + + if task is not repl_task: + message += ( + 'Lock released by root actor on behalf of bg thread\n' + f'|_{repl_task}\n' + ) log.devx(message) + return we_released + @classmethod @acm async def acquire_for_ctx( @@ -380,7 +459,7 @@ class Lock: log.runtime(pre_msg) # NOTE: if the surrounding cancel scope from the - # `lock_tty_for_child()` caller is cancelled, this line should + # `lock_stdio_for_peer()` caller is cancelled, this line should # unblock and NOT leave us in some kind of # a "child-locked-TTY-but-child-is-uncontactable-over-IPC" # condition. @@ -398,7 +477,7 @@ class Lock: # IF we received a cancel during the shielded lock entry of some # next-in-queue requesting task, then the resumption here will # result in that ``trio.Cancelled`` being raised to our caller - # (likely from ``lock_tty_for_child()`` below)! In + # (likely from `lock_stdio_for_peer()` below)! In # this case the ``finally:`` below should trigger and the # surrounding caller side context should cancel normally # relaying back to the caller. @@ -408,8 +487,8 @@ class Lock: finally: message :str = 'Exiting `Lock.acquire_for_ctx()` on behalf of sub-actor\n' if we_acquired: - message += '-> TTY lock released by child\n' cls.release() + message += '-> TTY lock released by child\n' else: message += '-> TTY lock never acquired by child??\n' @@ -421,7 +500,7 @@ class Lock: @tractor.context -async def lock_tty_for_child( +async def lock_stdio_for_peer( ctx: Context, subactor_task_uid: tuple[str, int], @@ -545,25 +624,26 @@ async def lock_tty_for_child( except BaseException as req_err: message: str = ( + f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n' 'Forcing `Lock.release()` for req-ctx since likely an ' 'internal error!\n\n' f'{ctx}' ) if isinstance(req_err, trio.Cancelled): message = ( - 'Cancelled during root TTY-lock dialog?\n' + 'Cancelled during root TTY-lock dialog\n' + message ) else: message = ( - 'Errored during root TTY-lock dialog?\n' + 'Errored during root TTY-lock dialog\n' + message ) log.exception(message) - Lock.release(force=True) + Lock.release() #force=True) raise finally: @@ -645,7 +725,7 @@ class DebugStatus: def shield_sigint(cls): ''' Shield out SIGINT handling (which by default triggers - `trio.Task` cancellation) in subactors when a `pdb` REPL + `Task` cancellation) in subactors when a `pdb` REPL is active. Avoids cancellation of the current actor (task) when the user @@ -767,9 +847,17 @@ class DebugStatus: try: # sometimes the task might already be terminated in # which case this call will raise an RTE? - if repl_release is not None: - repl_release.set() - + if ( + repl_release is not None + ): + if cls.is_main_trio_thread(): + repl_release.set() + else: + # XXX NOTE ONLY used for bg root-actor sync + # threads, see `.pause_from_sync()`. + trio.from_thread.run_sync( + repl_release.set + ) finally: # if req_ctx := cls.req_ctx: # req_ctx._scope.cancel() @@ -856,8 +944,6 @@ class PdbREPL(pdbp.Pdb): try: super().set_continue() finally: - DebugStatus.release() - # NOTE: for subactors the stdio lock is released via the # allocated RPC locker task, so for root we have to do it # manually. @@ -865,21 +951,32 @@ class PdbREPL(pdbp.Pdb): is_root_process() and Lock._debug_lock.locked() + and + DebugStatus.is_main_trio_thread() ): + # Lock.release(raise_on_thread=False) Lock.release() + # XXX after `Lock.release()` for root local repl usage + DebugStatus.release() + def set_quit(self): try: super().set_quit() finally: - DebugStatus.release() if ( is_root_process() and Lock._debug_lock.locked() + and + DebugStatus.is_main_trio_thread() ): + # Lock.release(raise_on_thread=False) Lock.release() + # XXX after `Lock.release()` for root local repl usage + DebugStatus.release() + # XXX NOTE: we only override this because apparently the stdlib pdb # bois likes to touch the SIGINT handler as much as i like to touch # my d$%&. @@ -960,20 +1057,24 @@ async def request_root_stdio_lock( task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, ): ''' - Connect to the root actor of this process tree and RPC-invoke - a task which acquires a std-streams global `Lock`: a actor tree - global mutex which prevents other subactors from entering - a `PdbREPL` at the same time as any other. + Connect to the root actor for this actor's process tree and + RPC-invoke a task which acquires the std-streams global `Lock`: + a process-tree-global mutex which prevents multiple actors from + entering `PdbREPL.interaction()` at the same time such that the + parent TTY's stdio is never "clobbered" by simultaneous + reads/writes. - The actual `Lock` singleton exists ONLY in the root actor's - memory and does nothing more then set process-tree global state. - The actual `PdbREPL` interaction is completely isolated to each - sub-actor and with the `Lock` merely providing the multi-process - syncing mechanism to avoid any subactor (or the root itself) from - entering the REPL at the same time. + The actual `Lock` singleton instance exists ONLY in the root + actor's memory space and does nothing more then manage + process-tree global state, + namely a `._debug_lock: trio.FIFOLock`. + + The actual `PdbREPL` interaction/operation is completely isolated + to each sub-actor (process) with the root's `Lock` providing the + multi-process mutex-syncing mechanism to avoid parallel REPL + usage within an actor tree. ''' - log.devx( 'Initing stdio-lock request task with root actor' ) @@ -1004,7 +1105,7 @@ async def request_root_stdio_lock( # `.repl_release: # trio.Event`. with trio.CancelScope(shield=shield) as req_cs: # XXX: was orig for debugging cs stack corruption.. - # log.info( + # log.devx( # 'Request cancel-scope is:\n\n' # f'{pformat_cs(req_cs, var_name="req_cs")}\n\n' # ) @@ -1014,7 +1115,7 @@ async def request_root_stdio_lock( # TODO: merge into single async with ? async with get_root() as portal: async with portal.open_context( - lock_tty_for_child, + lock_stdio_for_peer, subactor_task_uid=task_uid, # NOTE: set it here in the locker request task bc it's # possible for multiple such requests for the lock in any @@ -1468,6 +1569,11 @@ class DebugRequestError(RuntimeError): ''' +_repl_fail_msg: str = ( + 'Failed to REPl via `_pause()` ' +) + + async def _pause( debug_func: Callable|partial|None, @@ -1487,10 +1593,13 @@ async def _pause( hide_tb: bool = True, called_from_sync: bool = False, called_from_bg_thread: bool = False, - task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[ + tuple[Task, PdbREPL], + trio.Event + ] = trio.TASK_STATUS_IGNORED, **debug_func_kwargs, -) -> None: +) -> tuple[PdbREPL, Task]|None: ''' Inner impl for `pause()` to avoid the `trio.CancelScope.__exit__()` stack frame when not shielded (since apparently i can't figure out @@ -1502,25 +1611,26 @@ async def _pause( __tracebackhide__: bool = hide_tb actor: Actor = current_actor() try: - # TODO: use the `Task` instance instead for `is` checks - # below! - task: Task = trio.lowlevel.current_task() + task: Task = current_task() except RuntimeError as rte: + log.exception('Failed to get current task?') if actor.is_infected_aio(): raise RuntimeError( '`tractor.pause[_from_sync]()` not yet supported ' 'for infected `asyncio` mode!' ) from rte + raise + if debug_func is not None: debug_func = partial(debug_func) - repl: PdbREPL = repl or mk_pdb() - # XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug # request from a subactor BEFORE the REPL is entered by that # process. - DebugStatus.shield_sigint() + if not repl: + DebugStatus.shield_sigint() + repl: PdbREPL = repl or mk_pdb() # TODO: move this into a `open_debug_request()` @acm? # -[ ] prolly makes the most sense to do the request @@ -1538,6 +1648,9 @@ async def _pause( debug_func.func.__name__ if debug_func else 'None' ) + # TODO: do we want to support using this **just** for the + # locking / common code (prolly to help address #320)? + task_status.started((task, repl)) try: if debug_func: # block here one (at the appropriate frame *up*) where @@ -1548,11 +1661,11 @@ async def _pause( f' |_{task}\n' ) - # set local actor task to avoid recurrent - # entries/requests from the same local task (to the root - # process). - DebugStatus.repl = repl + # set local task on process-global state to avoid + # recurrent entries/requests from the same + # actor-local task. DebugStatus.repl_task = task + DebugStatus.repl = repl # invoke the low-level REPL activation routine which itself # should call into a `Pdb.set_trace()` of some sort. @@ -1568,16 +1681,13 @@ async def _pause( else: if ( called_from_sync - # and - # is_root_process() and not DebugStatus.is_main_trio_thread() ): + assert called_from_bg_thread assert DebugStatus.repl_task is not task - # TODO: do we want to support using this **just** for the - # locking / common code (prolly to help address #320)? - task_status.started(DebugStatus) + return (task, repl) except trio.Cancelled: log.exception( @@ -1607,12 +1717,23 @@ async def _pause( # TODO: this should be created as part of `DebugRequest()` init # which should instead be a one-shot-use singleton much like # the `PdbREPL`. + repl_task: Thread|Task|None = DebugStatus.repl_task if ( not DebugStatus.repl_release or DebugStatus.repl_release.is_set() ): + log.devx( + 'Setting new `DebugStatus.repl_release: trio.Event` for requesting task\n' + f'|_{task}\n' + ) DebugStatus.repl_release = trio.Event() + else: + log.devx( + 'Already an existing actor-local REPL user task\n' + f'|_{repl_task}\n' + ) + # ^-NOTE-^ this must be created BEFORE scheduling any subactor # debug-req task since it needs to wait on it just after # `.started()`-ing back its wrapping `.req_cs: CancelScope`. @@ -1620,73 +1741,110 @@ async def _pause( repl_err: BaseException|None = None try: if is_root_process(): - # we also wait in the root-parent for any child that # may have the tty locked prior - # TODO: wait, what about multiple root tasks acquiring it though? + # TODO: wait, what about multiple root tasks (with bg + # threads) acquiring it though? ctx: Context|None = Lock.ctx_in_debug + repl_task: Task|None = DebugStatus.repl_task if ( ctx is None and - DebugStatus.repl - and - DebugStatus.repl_task is task + repl_task is task + # and + # DebugStatus.repl + # ^-NOTE-^ matches for multi-threaded case as well? ): # re-entrant root process already has it: noop. log.warning( f'This root actor task is already within an active REPL session\n' - f'Ignoring this re-entered `tractor.pause()`\n' - f'task: {task.name}\n' + f'Ignoring this recurrent`tractor.pause()` entry\n\n' + f'|_{task}\n' # TODO: use `._frame_stack` scanner to find the @api_frame ) with trio.CancelScope(shield=shield): await trio.lowlevel.checkpoint() - return + return repl, task + + # elif repl_task: + # log.warning( + # f'This root actor has another task already in REPL\n' + # f'Waitin for the other task to complete..\n\n' + # f'|_{task}\n' + # # TODO: use `._frame_stack` scanner to find the @api_frame + # ) + # with trio.CancelScope(shield=shield): + # await DebugStatus.repl_release.wait() + # await trio.sleep(0.1) # must shield here to avoid hitting a `Cancelled` and # a child getting stuck bc we clobbered the tty with trio.CancelScope(shield=shield): - if Lock._debug_lock.locked(): + ctx_line = '`Lock` in this root actor task' + acq_prefix: str = 'shield-' if shield else '' + if ( + Lock._debug_lock.locked() + ): + if ctx: + ctx_line: str = ( + 'active `Lock` owned by ctx\n\n' + f'{ctx}' + ) + elif Lock._owned_by_root: + ctx_line: str = ( + 'Already owned by root-task `Lock`\n\n' + f'repl_task: {DebugStatus.repl_task}\n' + f'repl: {DebugStatus.repl}\n' + ) + else: + ctx_line: str = ( + '**STALE `Lock`** held by unknown root/remote task ' + 'with no request ctx !?!?' + ) - acq_prefix: str = 'shield-' if shield else '' - ctx_line: str = ( - 'lock owned by ctx\n\n' - f'{ctx}' - ) if ctx else 'stale lock with no request ctx!?' - log.devx( - f'attempting to {acq_prefix}acquire active TTY ' - f'{ctx_line}' - ) + log.devx( + f'attempting to {acq_prefix}acquire ' + f'{ctx_line}' + ) + await Lock._debug_lock.acquire() + Lock._owned_by_root = True + # else: - # XXX: since we need to enter pdb synchronously below, - # and we don't want to block the thread that starts - # stepping through the application thread, we later - # must `Lock._debug_lock.release()` manually from - # some `PdbREPL` completion callback(`.set_[continue/exit]()`). - # - # So, when `._pause()` is called from a (bg/non-trio) - # thread, special provisions are needed and we need - # to do the `.acquire()`/`.release()` calls from - # a common `trio.task` (due to internal impl of - # `FIFOLock`). Thus we do not acquire here and - # instead expect `.pause_from_sync()` to take care of - # this detail depending on the caller's (threading) - # usage. - # - # NOTE that this special case is ONLY required when - # using `.pause_from_sync()` from the root actor - # since OW a subactor will instead make an IPC - # request (in the branch below) to acquire the - # `Lock`-mutex and a common root-actor RPC task will - # take care of `._debug_lock` mgmt! - if not called_from_sync: - await Lock._debug_lock.acquire() - Lock._owned_by_root = True + # if ( + # not called_from_bg_thread + # and not called_from_sync + # ): + # log.devx( + # f'attempting to {acq_prefix}acquire ' + # f'{ctx_line}' + # ) + + # XXX: since we need to enter pdb synchronously below, + # and we don't want to block the thread that starts + # stepping through the application thread, we later + # must `Lock._debug_lock.release()` manually from + # some `PdbREPL` completion callback(`.set_[continue/exit]()`). + # + # So, when `._pause()` is called from a (bg/non-trio) + # thread, special provisions are needed and we need + # to do the `.acquire()`/`.release()` calls from + # a common `trio.task` (due to internal impl of + # `FIFOLock`). Thus we do not acquire here and + # instead expect `.pause_from_sync()` to take care of + # this detail depending on the caller's (threading) + # usage. + # + # NOTE that this special case is ONLY required when + # using `.pause_from_sync()` from the root actor + # since OW a subactor will instead make an IPC + # request (in the branch below) to acquire the + # `Lock`-mutex and a common root-actor RPC task will + # take care of `._debug_lock` mgmt! # enter REPL from root, no TTY locking IPC ctx necessary # since we can acquire the `Lock._debug_lock` directly in # thread. - _enter_repl_sync(debug_func) + return _enter_repl_sync(debug_func) # TODO: need a more robust check for the "root" actor elif ( @@ -1809,7 +1967,7 @@ async def _pause( ) # enter REPL - _enter_repl_sync(debug_func) + return _enter_repl_sync(debug_func) # TODO: prolly factor this plus the similar block from # `_enter_repl_sync()` into a common @cm? @@ -1838,7 +1996,9 @@ async def _pause( else: log.exception( - 'Failed to engage debugger via `_pause()` ??\n' + _repl_fail_msg + + + f'on behalf of {repl_task} ??\n' ) DebugStatus.release(cancel_req_task=True) @@ -1882,11 +2042,11 @@ def _set_trace( # optionally passed in to provide support for # `pause_from_sync()` where actor: tractor.Actor|None = None, - task: trio.Task|None = None, + task: Task|Thread|None = None, ): __tracebackhide__: bool = hide_tb actor: tractor.Actor = actor or current_actor() - task: trio.Task = task or current_task() + task: Task|Thread = task or current_task() # else: # TODO: maybe print the actor supervion tree up to the @@ -2023,7 +2183,7 @@ async def maybe_init_greenback( if mod := maybe_import_greenback(**kwargs): await mod.ensure_portal() - log.info( + log.devx( '`greenback` portal opened!\n' 'Sync debug support activated!\n' ) @@ -2032,12 +2192,116 @@ async def maybe_init_greenback( return None -# TODO: allow pausing from sync code. -# normally by remapping python's builtin breakpoint() hook to this -# runtime aware version which takes care of all . -def pause_from_sync( +async def _pause_from_bg_root_thread( + behalf_of_thread: Thread, + repl: PdbREPL, + hide_tb: bool, + task_status: TaskStatus[Task] = trio.TASK_STATUS_IGNORED, + **_pause_kwargs, +): + ''' + Acquire the `Lock._debug_lock` from a bg (only need for + root-actor) non-`trio` thread (started via a call to + `.to_thread.run_sync()` in some actor) by scheduling this func in + the actor's service (TODO eventually a special debug_mode) + nursery. This task acquires the lock then `.started()`s the + `DebugStatus.repl_release: trio.Event` waits for the `PdbREPL` to + set it, then terminates very much the same way as + `request_root_stdio_lock()` uses an IPC `Context` from a subactor + to do the same from a remote process. + + This task is normally only required to be scheduled for the + special cases of a bg sync thread running in the root actor; see + the only usage inside `.pause_from_sync()`. + + ''' + global Lock + # TODO: unify this copied code with where it was + # from in `maybe_wait_for_debugger()` + # if ( + # Lock.req_handler_finished is not None + # and not Lock.req_handler_finished.is_set() + # and (in_debug := Lock.ctx_in_debug) + # ): + # log.devx( + # '\nRoot is waiting on tty lock to release from\n\n' + # # f'{caller_frame_info}\n' + # ) + # with trio.CancelScope(shield=True): + # await Lock.req_handler_finished.wait() + + # log.pdb( + # f'Subactor released debug lock\n' + # f'|_{in_debug}\n' + # ) + task: Task = current_task() + + # Manually acquire since otherwise on release we'll + # get a RTE raised by `trio` due to ownership.. + log.devx( + 'Trying to acquire `Lock` on behalf of bg thread\n' + f'|_{behalf_of_thread}\n' + ) + # DebugStatus.repl_task = behalf_of_thread + out = await _pause( + debug_func=None, + repl=repl, + hide_tb=hide_tb, + called_from_sync=True, + called_from_bg_thread=True, + **_pause_kwargs + ) + lock: trio.FIFOLock = Lock._debug_lock + stats: trio.LockStatistics= lock.statistics() + assert stats.owner is task + assert Lock._owned_by_root + assert DebugStatus.repl_release + + # TODO: do we actually need this? + # originally i was trying to solve wy this was + # unblocking too soon in a thread but it was actually + # that we weren't setting our own `repl_release` below.. + while stats.owner is not task: + log.devx( + 'Trying to acquire `._debug_lock` from {stats.owner} for\n' + f'|_{behalf_of_thread}\n' + ) + await lock.acquire() + break + + # XXX NOTE XXX super important dawg.. + # set our own event since the current one might + # have already been overriden and then set when the + # last REPL mutex holder exits their sesh! + # => we do NOT want to override any existing one + # and we want to ensure we set our own ONLY AFTER we have + # acquired the `._debug_lock` + repl_release = DebugStatus.repl_release = trio.Event() + + # unblock caller thread delivering this bg task + log.devx( + 'Unblocking root-bg-thread since we acquired lock via `._pause()`\n' + f'|_{behalf_of_thread}\n' + ) + task_status.started(out) + DebugStatus.shield_sigint() + + # wait for bg thread to exit REPL sesh. + try: + await repl_release.wait() + finally: + log.devx( + 'releasing lock from bg root thread task!\n' + f'|_ {behalf_of_thread}\n' + ) + Lock.release() + + +def pause_from_sync( hide_tb: bool = True, + called_from_builtin: bool = False, + api_frame: FrameType|None = None, # proxy to `._pause()`, for ex: # shield: bool = False, @@ -2045,15 +2309,24 @@ def pause_from_sync( **_pause_kwargs, ) -> None: + ''' + Pause a `tractor` scheduled task or thread from sync (non-async + function) code. + When `greenback` is installed we remap python's builtin + `breakpoint()` hook to this runtime-aware version which takes + care of all bg-thread detection and appropriate synchronization + with the root actor's `Lock` to avoid mult-thread/process REPL + clobbering Bo + + ''' __tracebackhide__: bool = hide_tb try: actor: tractor.Actor = current_actor( err_on_no_runtime=False, ) - log.debug( - f'{actor.uid}: JUST ENTERED `tractor.pause_from_sync()`' - f'|_{actor}\n' + message: str = ( + f'{actor.uid} task called `tractor.pause_from_sync()`\n\n' ) if not actor: raise RuntimeError( @@ -2063,7 +2336,7 @@ def pause_from_sync( '- `async with tractor.open_root_actor()`\n' ) - # NOTE: once supported, remove this AND the one + # TODO: once supported, remove this AND the one # inside `._pause()`! if actor.is_infected_aio(): raise RuntimeError( @@ -2071,78 +2344,111 @@ def pause_from_sync( 'for infected `asyncio` mode!' ) - # raises on not-found by default - greenback: ModuleType = maybe_import_greenback() - mdb: PdbREPL = mk_pdb() + DebugStatus.shield_sigint() + repl: PdbREPL = mk_pdb() - # run async task which will lock out the root proc's TTY. + # message += f'-> created local REPL {repl}\n' + is_root: bool = is_root_process() + + # TODO: we could also check for a non-`.to_thread` context + # using `trio.from_thread.check_cancelled()` (says + # oremanj) wherein we get the following outputs: + # + # `RuntimeError`: non-`.to_thread` spawned thread + # noop: non-cancelled `.to_thread` + # `trio.Cancelled`: cancelled `.to_thread` + + # when called from a (bg) thread, run an async task in a new + # thread which will call `._pause()` manually with special + # handling for root-actor caller usage. if not DebugStatus.is_main_trio_thread(): - - # TODO: we could also check for a non-`.to_thread` context - # using `trio.from_thread.check_cancelled()` (says - # oremanj) wherein we get the following outputs: - # - # `RuntimeError`: non-`.to_thread` spawned thread - # noop: non-cancelled `.to_thread` - # `trio.Cancelled`: cancelled `.to_thread` - # - log.warning( - 'Engaging `.pause_from_sync()` from ANOTHER THREAD!' - ) - task: threading.Thread = threading.current_thread() - DebugStatus.repl_task: str = task + thread: threading.Thread = threading.current_thread() + repl_owner = thread # TODO: make root-actor bg thread usage work! - # if is_root_process(): - # async def _pause_from_sync_thread(): - # ... - # else: - # .. the below .. + if is_root: + message += ( + f'-> called from a root-actor bg {thread}\n' + f'-> scheduling `._pause_from_sync_thread()`..\n' + ) + bg_task, repl = trio.from_thread.run( + afn=partial( + actor._service_n.start, + partial( + _pause_from_bg_root_thread, + behalf_of_thread=thread, + repl=repl, + hide_tb=hide_tb, + **_pause_kwargs, + ), + ) + ) + message += ( + f'-> `._pause_from_sync_thread()` started bg task {bg_task}\n' + ) + else: + message += f'-> called from a bg {thread}\n' + # NOTE: since this is a subactor, `._pause()` will + # internally issue a debug request via + # `request_root_stdio_lock()` and we don't need to + # worry about all the special considerations as with + # the root-actor per above. + bg_task, repl = trio.from_thread.run( + afn=partial( + _pause, + debug_func=None, + repl=repl, + hide_tb=hide_tb, - trio.from_thread.run( - partial( - _pause, - debug_func=None, - repl=mdb, - hide_tb=hide_tb, + # XXX to prevent `._pause()` for setting + # `DebugStatus.repl_task` to the gb task! + called_from_sync=True, + called_from_bg_thread=True, - # XXX to prevent `._pause()` for setting - # `DebugStatus.repl_task` to the gb task! - called_from_sync=True, - called_from_bg_thread=True, - - **_pause_kwargs - ), - ) + **_pause_kwargs + ), + ) + assert bg_task is not DebugStatus.repl_task else: # we are presumably the `trio.run()` + main thread - task: trio.Task = current_task() - DebugStatus.repl_task: str = task - greenback.await_( + # raises on not-found by default + greenback: ModuleType = maybe_import_greenback() + message += f'-> imported {greenback}\n' + repl_owner: Task = current_task() + message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' + out = greenback.await_( _pause( debug_func=None, - repl=mdb, + repl=repl, hide_tb=hide_tb, called_from_sync=True, **_pause_kwargs, ) ) + if out: + bg_task, repl = out + assert repl is repl + assert bg_task is repl_owner - if is_root_process(): - # Manually acquire since otherwise on release we'll - # get a RTE raised by `trio` due to ownership.. - Lock._debug_lock.acquire_nowait() - Lock._owned_by_root = True + # NOTE: normally set inside `_enter_repl_sync()` + DebugStatus.repl_task: str = repl_owner # TODO: ensure we aggressively make the user aware about - # entering the global ``breakpoint()`` built-in from sync + # entering the global `breakpoint()` built-in from sync # code? + message += ( + f'-> successfully scheduled `._pause()` in `trio` thread on behalf of {bg_task}\n' + f'-> Entering REPL via `tractor._set_trace()` from caller {repl_owner}\n' + ) + log.devx(message) + + DebugStatus.repl = repl _set_trace( - api_frame=inspect.currentframe(), - repl=mdb, + api_frame=api_frame or inspect.currentframe(), + repl=repl, hide_tb=hide_tb, actor=actor, - task=task, + task=repl_owner, ) # LEGACY NOTE on next LOC's frame showing weirdness.. # @@ -2155,6 +2461,26 @@ def pause_from_sync( raise err +def _sync_pause_from_builtin( + *args, + called_from_builtin=True, + **kwargs, +) -> None: + ''' + Proxy call `.pause_from_sync()` but indicate the caller is the + `breakpoint()` built-in. + + Note: this assigned to `os.environ['PYTHONBREAKPOINT']` inside `._root` + + ''' + pause_from_sync( + *args, + called_from_builtin=True, + api_frame=inspect.currentframe(), + **kwargs, + ) + + # NOTE prefer a new "pause" semantic since it better describes # "pausing the actor's runtime" for this particular # paralell task to do debugging in a REPL. @@ -2406,7 +2732,6 @@ async def maybe_wait_for_debugger( and not Lock.req_handler_finished.is_set() and in_debug is not None ): - # caller_frame_info: str = pformat_caller_frame() logmeth( msg @@ -2421,7 +2746,7 @@ async def maybe_wait_for_debugger( with trio.CancelScope(shield=True): await Lock.req_handler_finished.wait() - log.pdb( + log.devx( f'Subactor released debug lock\n' f'|_{in_debug}\n' ) @@ -2453,13 +2778,6 @@ async def maybe_wait_for_debugger( await trio.sleep(poll_delay) continue - # fallthrough on failure to acquire.. - # else: - # raise RuntimeError( - # msg - # + - # 'Root actor failed to acquire debug lock?' - # ) return True # else: