diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index 858133f..3218cff 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -62,7 +62,6 @@ import trio from trio import CancelScope from trio.lowlevel import ( current_task, - Task, ) from trio import ( TaskStatus, @@ -81,6 +80,8 @@ from tractor._state import ( # ) if TYPE_CHECKING: + from trio.lowlevel import Task + from threading import Thread from tractor._ipc import Channel from tractor._context import Context from tractor._runtime import ( @@ -92,6 +93,11 @@ if TYPE_CHECKING: log = get_logger(__name__) +# TODO: refine the internal impl and APIs in this module! +# +# -[ ] separate `._pause()` branch-cases for calling from a root task +# vs. from subactors + def hide_runtime_frames() -> dict[FunctionType, CodeType]: ''' @@ -203,6 +209,15 @@ class Lock: return None + # TODO: once we convert to singleton-per-actor-style + # @property + # def stats(cls) -> trio.LockStatistics: + # return cls._debug_lock.statistics() + + # @property + # def owner(cls) -> Task: + # return cls._debug_lock.statistics().owner + # ROOT ONLY # ------ - ------- # the root-actor-ONLY singletons for, @@ -229,18 +244,22 @@ class Lock: @classmethod def repr(cls) -> str: lock_stats: trio.LockStatistics = cls._debug_lock.statistics() + req: trio.Event|None = cls.req_handler_finished fields: str = ( - f'req_handler_finished: {cls.req_handler_finished}\n' - f'_blocked: {cls._blocked}\n\n' - f'_debug_lock: {cls._debug_lock}\n' - f'lock_stats: {lock_stats}\n' - f'ctx_in_debug: {cls.ctx_in_debug}\n' + f'|_ ._blocked: {cls._blocked}\n' + f'|_ ._debug_lock: {cls._debug_lock}\n' + f' {lock_stats}\n\n' + f'|_ .ctx_in_debug: {cls.ctx_in_debug}\n' + f'|_ .req_handler_finished: {req}\n' ) + if req: + req_stats: trio.EventStatistics = req.statistics() + fields += f' {req_stats}\n' body: str = textwrap.indent( fields, - prefix=' |_', + prefix=' ', ) return ( f'<{cls.__name__}(\n' @@ -253,28 +272,59 @@ class Lock: def release( cls, force: bool = False, - ): - if not cls._owned_by_root: - message: str = 'TTY lock not held by any child\n' - else: - message: str = 'TTY lock held in root-actor task\n' + raise_on_thread: bool = True, - if not (is_trio_main := DebugStatus.is_main_trio_thread()): - task: threading.Thread = threading.current_thread() + ) -> bool: + ''' + Release the actor-tree global TTY stdio lock (only) from the + `trio.run()`-main-thread. + + ''' + we_released: bool = False + ctx_in_debug: Context|None = cls.ctx_in_debug + repl_task: Task|Thread|None = DebugStatus.repl_task + if not DebugStatus.is_main_trio_thread(): + thread: threading.Thread = threading.current_thread() + message: str = ( + '`Lock.release()` can not be called from a non-main-`trio` thread!\n' + f'{thread}\n' + ) + if raise_on_thread: + raise RuntimeError(message) + + log.devx(message) + return False + + task: Task = current_task() + + # sanity check that if we're the root actor + # the lock is marked as such. + # note the pre-release value may be diff the the + # post-release task. + if repl_task is task: + assert cls._owned_by_root + message: str = ( + 'TTY lock held by root-actor on behalf of local task\n' + f'|_{repl_task}\n' + ) else: - task: trio.Task = current_task() + assert DebugStatus.repl_task is not task + + message: str = ( + 'TTY lock was NOT released on behalf of caller\n' + f'|_{task}\n' + ) try: lock: trio.StrictFIFOLock = cls._debug_lock owner: Task = lock.statistics().owner if ( - lock.locked() - and - ( - owner is task - # or - # cls._owned_by_root - ) + (lock.locked() or force) + # ^-TODO-NOTE-^ should we just remove this, since the + # RTE case above will always happen when you force + # from the wrong task? + + and (owner is task) # ^-NOTE-^ if we do NOT ensure this, `trio` will # raise a RTE when a non-owner tries to releasee the # lock. @@ -284,17 +334,27 @@ class Lock: # being set to the `.repl_task` such that the above # condition matches and we actually release the lock. # This is particular of note from `.pause_from_sync()`! + ): - if not is_trio_main: - trio.from_thread.run_sync( - cls._debug_lock.release + cls._debug_lock.release() + we_released: bool = True + if repl_task: + message: str = ( + 'Lock released on behalf of root-actor-local REPL owner\n' + f'|_{repl_task}\n' ) else: - cls._debug_lock.release() - message: str = 'TTY lock released for child\n' + message: str = ( + 'TTY lock released by us on behalf of remote peer?\n' + f'|_ctx_in_debug: {ctx_in_debug}\n\n' + ) + # mk_pdb().set_trace() + # elif owner: except RuntimeError as rte: - log.exception('Failed to release `Lock`?') + log.exception( + 'Failed to release `Lock._debug_lock: trio.FIFOLock`?\n' + ) raise rte finally: @@ -303,40 +363,59 @@ class Lock: # we are now back in the "tty unlocked" state. This is basically # and edge triggered signal around an empty queue of sub-actor # tasks that may have tried to acquire the lock. - lock_stats = cls._debug_lock.statistics() + lock_stats: trio.LockStatistics = cls._debug_lock.statistics() req_handler_finished: trio.Event|None = Lock.req_handler_finished if ( not lock_stats.owner - or force and req_handler_finished is None ): - message += '-> No more child ctx tasks hold the TTY lock!\n' - - elif ( - req_handler_finished - and - lock.locked() - ): - req_stats = req_handler_finished.statistics() message += ( - f'-> A child ctx task still owns the `Lock` ??\n' - f' |_lock_stats: {lock_stats}\n' - f' |_req_stats: {req_stats}\n' + '-> No new task holds the TTY lock!\n\n' + f'{Lock.repr()}\n' ) - cls.ctx_in_debug = None + elif ( + req_handler_finished # new IPC ctx debug request active + and + lock.locked() # someone has the lock + ): + behalf_of_task = ( + ctx_in_debug + or + repl_task + ) + message += ( + f'\nA non-caller task still owns this lock on behalf of ' + f'{behalf_of_task}\n' + f'|_{lock_stats.owner}\n' + ) if ( - cls._owned_by_root + we_released + and + ctx_in_debug ): - if not lock.locked(): - cls._owned_by_root = False - else: - message += 'Lock still held by root actor task?!?\n' - lock.release() + cls.ctx_in_debug = None # unset + + # post-release value (should be diff then value above!) + repl_task: Task|Thread|None = DebugStatus.repl_task + if ( + cls._owned_by_root + and + we_released + ): + cls._owned_by_root = False + + if task is not repl_task: + message += ( + 'Lock released by root actor on behalf of bg thread\n' + f'|_{repl_task}\n' + ) log.devx(message) + return we_released + @classmethod @acm async def acquire_for_ctx( @@ -380,7 +459,7 @@ class Lock: log.runtime(pre_msg) # NOTE: if the surrounding cancel scope from the - # `lock_tty_for_child()` caller is cancelled, this line should + # `lock_stdio_for_peer()` caller is cancelled, this line should # unblock and NOT leave us in some kind of # a "child-locked-TTY-but-child-is-uncontactable-over-IPC" # condition. @@ -398,7 +477,7 @@ class Lock: # IF we received a cancel during the shielded lock entry of some # next-in-queue requesting task, then the resumption here will # result in that ``trio.Cancelled`` being raised to our caller - # (likely from ``lock_tty_for_child()`` below)! In + # (likely from `lock_stdio_for_peer()` below)! In # this case the ``finally:`` below should trigger and the # surrounding caller side context should cancel normally # relaying back to the caller. @@ -408,8 +487,8 @@ class Lock: finally: message :str = 'Exiting `Lock.acquire_for_ctx()` on behalf of sub-actor\n' if we_acquired: - message += '-> TTY lock released by child\n' cls.release() + message += '-> TTY lock released by child\n' else: message += '-> TTY lock never acquired by child??\n' @@ -421,7 +500,7 @@ class Lock: @tractor.context -async def lock_tty_for_child( +async def lock_stdio_for_peer( ctx: Context, subactor_task_uid: tuple[str, int], @@ -545,25 +624,26 @@ async def lock_tty_for_child( except BaseException as req_err: message: str = ( + f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n' 'Forcing `Lock.release()` for req-ctx since likely an ' 'internal error!\n\n' f'{ctx}' ) if isinstance(req_err, trio.Cancelled): message = ( - 'Cancelled during root TTY-lock dialog?\n' + 'Cancelled during root TTY-lock dialog\n' + message ) else: message = ( - 'Errored during root TTY-lock dialog?\n' + 'Errored during root TTY-lock dialog\n' + message ) log.exception(message) - Lock.release(force=True) + Lock.release() #force=True) raise finally: @@ -645,7 +725,7 @@ class DebugStatus: def shield_sigint(cls): ''' Shield out SIGINT handling (which by default triggers - `trio.Task` cancellation) in subactors when a `pdb` REPL + `Task` cancellation) in subactors when a `pdb` REPL is active. Avoids cancellation of the current actor (task) when the user @@ -767,9 +847,17 @@ class DebugStatus: try: # sometimes the task might already be terminated in # which case this call will raise an RTE? - if repl_release is not None: - repl_release.set() - + if ( + repl_release is not None + ): + if cls.is_main_trio_thread(): + repl_release.set() + else: + # XXX NOTE ONLY used for bg root-actor sync + # threads, see `.pause_from_sync()`. + trio.from_thread.run_sync( + repl_release.set + ) finally: # if req_ctx := cls.req_ctx: # req_ctx._scope.cancel() @@ -856,8 +944,6 @@ class PdbREPL(pdbp.Pdb): try: super().set_continue() finally: - DebugStatus.release() - # NOTE: for subactors the stdio lock is released via the # allocated RPC locker task, so for root we have to do it # manually. @@ -865,21 +951,32 @@ class PdbREPL(pdbp.Pdb): is_root_process() and Lock._debug_lock.locked() + and + DebugStatus.is_main_trio_thread() ): + # Lock.release(raise_on_thread=False) Lock.release() + # XXX after `Lock.release()` for root local repl usage + DebugStatus.release() + def set_quit(self): try: super().set_quit() finally: - DebugStatus.release() if ( is_root_process() and Lock._debug_lock.locked() + and + DebugStatus.is_main_trio_thread() ): + # Lock.release(raise_on_thread=False) Lock.release() + # XXX after `Lock.release()` for root local repl usage + DebugStatus.release() + # XXX NOTE: we only override this because apparently the stdlib pdb # bois likes to touch the SIGINT handler as much as i like to touch # my d$%&. @@ -960,20 +1057,24 @@ async def request_root_stdio_lock( task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, ): ''' - Connect to the root actor of this process tree and RPC-invoke - a task which acquires a std-streams global `Lock`: a actor tree - global mutex which prevents other subactors from entering - a `PdbREPL` at the same time as any other. + Connect to the root actor for this actor's process tree and + RPC-invoke a task which acquires the std-streams global `Lock`: + a process-tree-global mutex which prevents multiple actors from + entering `PdbREPL.interaction()` at the same time such that the + parent TTY's stdio is never "clobbered" by simultaneous + reads/writes. - The actual `Lock` singleton exists ONLY in the root actor's - memory and does nothing more then set process-tree global state. - The actual `PdbREPL` interaction is completely isolated to each - sub-actor and with the `Lock` merely providing the multi-process - syncing mechanism to avoid any subactor (or the root itself) from - entering the REPL at the same time. + The actual `Lock` singleton instance exists ONLY in the root + actor's memory space and does nothing more then manage + process-tree global state, + namely a `._debug_lock: trio.FIFOLock`. + + The actual `PdbREPL` interaction/operation is completely isolated + to each sub-actor (process) with the root's `Lock` providing the + multi-process mutex-syncing mechanism to avoid parallel REPL + usage within an actor tree. ''' - log.devx( 'Initing stdio-lock request task with root actor' ) @@ -1004,7 +1105,7 @@ async def request_root_stdio_lock( # `.repl_release: # trio.Event`. with trio.CancelScope(shield=shield) as req_cs: # XXX: was orig for debugging cs stack corruption.. - # log.info( + # log.devx( # 'Request cancel-scope is:\n\n' # f'{pformat_cs(req_cs, var_name="req_cs")}\n\n' # ) @@ -1014,7 +1115,7 @@ async def request_root_stdio_lock( # TODO: merge into single async with ? async with get_root() as portal: async with portal.open_context( - lock_tty_for_child, + lock_stdio_for_peer, subactor_task_uid=task_uid, # NOTE: set it here in the locker request task bc it's # possible for multiple such requests for the lock in any @@ -1468,6 +1569,11 @@ class DebugRequestError(RuntimeError): ''' +_repl_fail_msg: str = ( + 'Failed to REPl via `_pause()` ' +) + + async def _pause( debug_func: Callable|partial|None, @@ -1487,10 +1593,13 @@ async def _pause( hide_tb: bool = True, called_from_sync: bool = False, called_from_bg_thread: bool = False, - task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[ + tuple[Task, PdbREPL], + trio.Event + ] = trio.TASK_STATUS_IGNORED, **debug_func_kwargs, -) -> None: +) -> tuple[PdbREPL, Task]|None: ''' Inner impl for `pause()` to avoid the `trio.CancelScope.__exit__()` stack frame when not shielded (since apparently i can't figure out @@ -1502,25 +1611,26 @@ async def _pause( __tracebackhide__: bool = hide_tb actor: Actor = current_actor() try: - # TODO: use the `Task` instance instead for `is` checks - # below! - task: Task = trio.lowlevel.current_task() + task: Task = current_task() except RuntimeError as rte: + log.exception('Failed to get current task?') if actor.is_infected_aio(): raise RuntimeError( '`tractor.pause[_from_sync]()` not yet supported ' 'for infected `asyncio` mode!' ) from rte + raise + if debug_func is not None: debug_func = partial(debug_func) - repl: PdbREPL = repl or mk_pdb() - # XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug # request from a subactor BEFORE the REPL is entered by that # process. - DebugStatus.shield_sigint() + if not repl: + DebugStatus.shield_sigint() + repl: PdbREPL = repl or mk_pdb() # TODO: move this into a `open_debug_request()` @acm? # -[ ] prolly makes the most sense to do the request @@ -1538,6 +1648,9 @@ async def _pause( debug_func.func.__name__ if debug_func else 'None' ) + # TODO: do we want to support using this **just** for the + # locking / common code (prolly to help address #320)? + task_status.started((task, repl)) try: if debug_func: # block here one (at the appropriate frame *up*) where @@ -1548,11 +1661,11 @@ async def _pause( f' |_{task}\n' ) - # set local actor task to avoid recurrent - # entries/requests from the same local task (to the root - # process). - DebugStatus.repl = repl + # set local task on process-global state to avoid + # recurrent entries/requests from the same + # actor-local task. DebugStatus.repl_task = task + DebugStatus.repl = repl # invoke the low-level REPL activation routine which itself # should call into a `Pdb.set_trace()` of some sort. @@ -1568,16 +1681,13 @@ async def _pause( else: if ( called_from_sync - # and - # is_root_process() and not DebugStatus.is_main_trio_thread() ): + assert called_from_bg_thread assert DebugStatus.repl_task is not task - # TODO: do we want to support using this **just** for the - # locking / common code (prolly to help address #320)? - task_status.started(DebugStatus) + return (task, repl) except trio.Cancelled: log.exception( @@ -1607,12 +1717,23 @@ async def _pause( # TODO: this should be created as part of `DebugRequest()` init # which should instead be a one-shot-use singleton much like # the `PdbREPL`. + repl_task: Thread|Task|None = DebugStatus.repl_task if ( not DebugStatus.repl_release or DebugStatus.repl_release.is_set() ): + log.devx( + 'Setting new `DebugStatus.repl_release: trio.Event` for requesting task\n' + f'|_{task}\n' + ) DebugStatus.repl_release = trio.Event() + else: + log.devx( + 'Already an existing actor-local REPL user task\n' + f'|_{repl_task}\n' + ) + # ^-NOTE-^ this must be created BEFORE scheduling any subactor # debug-req task since it needs to wait on it just after # `.started()`-ing back its wrapping `.req_cs: CancelScope`. @@ -1620,73 +1741,110 @@ async def _pause( repl_err: BaseException|None = None try: if is_root_process(): - # we also wait in the root-parent for any child that # may have the tty locked prior - # TODO: wait, what about multiple root tasks acquiring it though? + # TODO: wait, what about multiple root tasks (with bg + # threads) acquiring it though? ctx: Context|None = Lock.ctx_in_debug + repl_task: Task|None = DebugStatus.repl_task if ( ctx is None and - DebugStatus.repl - and - DebugStatus.repl_task is task + repl_task is task + # and + # DebugStatus.repl + # ^-NOTE-^ matches for multi-threaded case as well? ): # re-entrant root process already has it: noop. log.warning( f'This root actor task is already within an active REPL session\n' - f'Ignoring this re-entered `tractor.pause()`\n' - f'task: {task.name}\n' + f'Ignoring this recurrent`tractor.pause()` entry\n\n' + f'|_{task}\n' # TODO: use `._frame_stack` scanner to find the @api_frame ) with trio.CancelScope(shield=shield): await trio.lowlevel.checkpoint() - return + return repl, task + + # elif repl_task: + # log.warning( + # f'This root actor has another task already in REPL\n' + # f'Waitin for the other task to complete..\n\n' + # f'|_{task}\n' + # # TODO: use `._frame_stack` scanner to find the @api_frame + # ) + # with trio.CancelScope(shield=shield): + # await DebugStatus.repl_release.wait() + # await trio.sleep(0.1) # must shield here to avoid hitting a `Cancelled` and # a child getting stuck bc we clobbered the tty with trio.CancelScope(shield=shield): - if Lock._debug_lock.locked(): + ctx_line = '`Lock` in this root actor task' + acq_prefix: str = 'shield-' if shield else '' + if ( + Lock._debug_lock.locked() + ): + if ctx: + ctx_line: str = ( + 'active `Lock` owned by ctx\n\n' + f'{ctx}' + ) + elif Lock._owned_by_root: + ctx_line: str = ( + 'Already owned by root-task `Lock`\n\n' + f'repl_task: {DebugStatus.repl_task}\n' + f'repl: {DebugStatus.repl}\n' + ) + else: + ctx_line: str = ( + '**STALE `Lock`** held by unknown root/remote task ' + 'with no request ctx !?!?' + ) - acq_prefix: str = 'shield-' if shield else '' - ctx_line: str = ( - 'lock owned by ctx\n\n' - f'{ctx}' - ) if ctx else 'stale lock with no request ctx!?' - log.devx( - f'attempting to {acq_prefix}acquire active TTY ' - f'{ctx_line}' - ) + log.devx( + f'attempting to {acq_prefix}acquire ' + f'{ctx_line}' + ) + await Lock._debug_lock.acquire() + Lock._owned_by_root = True + # else: - # XXX: since we need to enter pdb synchronously below, - # and we don't want to block the thread that starts - # stepping through the application thread, we later - # must `Lock._debug_lock.release()` manually from - # some `PdbREPL` completion callback(`.set_[continue/exit]()`). - # - # So, when `._pause()` is called from a (bg/non-trio) - # thread, special provisions are needed and we need - # to do the `.acquire()`/`.release()` calls from - # a common `trio.task` (due to internal impl of - # `FIFOLock`). Thus we do not acquire here and - # instead expect `.pause_from_sync()` to take care of - # this detail depending on the caller's (threading) - # usage. - # - # NOTE that this special case is ONLY required when - # using `.pause_from_sync()` from the root actor - # since OW a subactor will instead make an IPC - # request (in the branch below) to acquire the - # `Lock`-mutex and a common root-actor RPC task will - # take care of `._debug_lock` mgmt! - if not called_from_sync: - await Lock._debug_lock.acquire() - Lock._owned_by_root = True + # if ( + # not called_from_bg_thread + # and not called_from_sync + # ): + # log.devx( + # f'attempting to {acq_prefix}acquire ' + # f'{ctx_line}' + # ) + + # XXX: since we need to enter pdb synchronously below, + # and we don't want to block the thread that starts + # stepping through the application thread, we later + # must `Lock._debug_lock.release()` manually from + # some `PdbREPL` completion callback(`.set_[continue/exit]()`). + # + # So, when `._pause()` is called from a (bg/non-trio) + # thread, special provisions are needed and we need + # to do the `.acquire()`/`.release()` calls from + # a common `trio.task` (due to internal impl of + # `FIFOLock`). Thus we do not acquire here and + # instead expect `.pause_from_sync()` to take care of + # this detail depending on the caller's (threading) + # usage. + # + # NOTE that this special case is ONLY required when + # using `.pause_from_sync()` from the root actor + # since OW a subactor will instead make an IPC + # request (in the branch below) to acquire the + # `Lock`-mutex and a common root-actor RPC task will + # take care of `._debug_lock` mgmt! # enter REPL from root, no TTY locking IPC ctx necessary # since we can acquire the `Lock._debug_lock` directly in # thread. - _enter_repl_sync(debug_func) + return _enter_repl_sync(debug_func) # TODO: need a more robust check for the "root" actor elif ( @@ -1809,7 +1967,7 @@ async def _pause( ) # enter REPL - _enter_repl_sync(debug_func) + return _enter_repl_sync(debug_func) # TODO: prolly factor this plus the similar block from # `_enter_repl_sync()` into a common @cm? @@ -1838,7 +1996,9 @@ async def _pause( else: log.exception( - 'Failed to engage debugger via `_pause()` ??\n' + _repl_fail_msg + + + f'on behalf of {repl_task} ??\n' ) DebugStatus.release(cancel_req_task=True) @@ -1882,11 +2042,11 @@ def _set_trace( # optionally passed in to provide support for # `pause_from_sync()` where actor: tractor.Actor|None = None, - task: trio.Task|None = None, + task: Task|Thread|None = None, ): __tracebackhide__: bool = hide_tb actor: tractor.Actor = actor or current_actor() - task: trio.Task = task or current_task() + task: Task|Thread = task or current_task() # else: # TODO: maybe print the actor supervion tree up to the @@ -2023,7 +2183,7 @@ async def maybe_init_greenback( if mod := maybe_import_greenback(**kwargs): await mod.ensure_portal() - log.info( + log.devx( '`greenback` portal opened!\n' 'Sync debug support activated!\n' ) @@ -2032,12 +2192,116 @@ async def maybe_init_greenback( return None -# TODO: allow pausing from sync code. -# normally by remapping python's builtin breakpoint() hook to this -# runtime aware version which takes care of all . -def pause_from_sync( +async def _pause_from_bg_root_thread( + behalf_of_thread: Thread, + repl: PdbREPL, + hide_tb: bool, + task_status: TaskStatus[Task] = trio.TASK_STATUS_IGNORED, + **_pause_kwargs, +): + ''' + Acquire the `Lock._debug_lock` from a bg (only need for + root-actor) non-`trio` thread (started via a call to + `.to_thread.run_sync()` in some actor) by scheduling this func in + the actor's service (TODO eventually a special debug_mode) + nursery. This task acquires the lock then `.started()`s the + `DebugStatus.repl_release: trio.Event` waits for the `PdbREPL` to + set it, then terminates very much the same way as + `request_root_stdio_lock()` uses an IPC `Context` from a subactor + to do the same from a remote process. + + This task is normally only required to be scheduled for the + special cases of a bg sync thread running in the root actor; see + the only usage inside `.pause_from_sync()`. + + ''' + global Lock + # TODO: unify this copied code with where it was + # from in `maybe_wait_for_debugger()` + # if ( + # Lock.req_handler_finished is not None + # and not Lock.req_handler_finished.is_set() + # and (in_debug := Lock.ctx_in_debug) + # ): + # log.devx( + # '\nRoot is waiting on tty lock to release from\n\n' + # # f'{caller_frame_info}\n' + # ) + # with trio.CancelScope(shield=True): + # await Lock.req_handler_finished.wait() + + # log.pdb( + # f'Subactor released debug lock\n' + # f'|_{in_debug}\n' + # ) + task: Task = current_task() + + # Manually acquire since otherwise on release we'll + # get a RTE raised by `trio` due to ownership.. + log.devx( + 'Trying to acquire `Lock` on behalf of bg thread\n' + f'|_{behalf_of_thread}\n' + ) + # DebugStatus.repl_task = behalf_of_thread + out = await _pause( + debug_func=None, + repl=repl, + hide_tb=hide_tb, + called_from_sync=True, + called_from_bg_thread=True, + **_pause_kwargs + ) + lock: trio.FIFOLock = Lock._debug_lock + stats: trio.LockStatistics= lock.statistics() + assert stats.owner is task + assert Lock._owned_by_root + assert DebugStatus.repl_release + + # TODO: do we actually need this? + # originally i was trying to solve wy this was + # unblocking too soon in a thread but it was actually + # that we weren't setting our own `repl_release` below.. + while stats.owner is not task: + log.devx( + 'Trying to acquire `._debug_lock` from {stats.owner} for\n' + f'|_{behalf_of_thread}\n' + ) + await lock.acquire() + break + + # XXX NOTE XXX super important dawg.. + # set our own event since the current one might + # have already been overriden and then set when the + # last REPL mutex holder exits their sesh! + # => we do NOT want to override any existing one + # and we want to ensure we set our own ONLY AFTER we have + # acquired the `._debug_lock` + repl_release = DebugStatus.repl_release = trio.Event() + + # unblock caller thread delivering this bg task + log.devx( + 'Unblocking root-bg-thread since we acquired lock via `._pause()`\n' + f'|_{behalf_of_thread}\n' + ) + task_status.started(out) + DebugStatus.shield_sigint() + + # wait for bg thread to exit REPL sesh. + try: + await repl_release.wait() + finally: + log.devx( + 'releasing lock from bg root thread task!\n' + f'|_ {behalf_of_thread}\n' + ) + Lock.release() + + +def pause_from_sync( hide_tb: bool = True, + called_from_builtin: bool = False, + api_frame: FrameType|None = None, # proxy to `._pause()`, for ex: # shield: bool = False, @@ -2045,15 +2309,24 @@ def pause_from_sync( **_pause_kwargs, ) -> None: + ''' + Pause a `tractor` scheduled task or thread from sync (non-async + function) code. + When `greenback` is installed we remap python's builtin + `breakpoint()` hook to this runtime-aware version which takes + care of all bg-thread detection and appropriate synchronization + with the root actor's `Lock` to avoid mult-thread/process REPL + clobbering Bo + + ''' __tracebackhide__: bool = hide_tb try: actor: tractor.Actor = current_actor( err_on_no_runtime=False, ) - log.debug( - f'{actor.uid}: JUST ENTERED `tractor.pause_from_sync()`' - f'|_{actor}\n' + message: str = ( + f'{actor.uid} task called `tractor.pause_from_sync()`\n\n' ) if not actor: raise RuntimeError( @@ -2063,7 +2336,7 @@ def pause_from_sync( '- `async with tractor.open_root_actor()`\n' ) - # NOTE: once supported, remove this AND the one + # TODO: once supported, remove this AND the one # inside `._pause()`! if actor.is_infected_aio(): raise RuntimeError( @@ -2071,78 +2344,111 @@ def pause_from_sync( 'for infected `asyncio` mode!' ) - # raises on not-found by default - greenback: ModuleType = maybe_import_greenback() - mdb: PdbREPL = mk_pdb() + DebugStatus.shield_sigint() + repl: PdbREPL = mk_pdb() - # run async task which will lock out the root proc's TTY. + # message += f'-> created local REPL {repl}\n' + is_root: bool = is_root_process() + + # TODO: we could also check for a non-`.to_thread` context + # using `trio.from_thread.check_cancelled()` (says + # oremanj) wherein we get the following outputs: + # + # `RuntimeError`: non-`.to_thread` spawned thread + # noop: non-cancelled `.to_thread` + # `trio.Cancelled`: cancelled `.to_thread` + + # when called from a (bg) thread, run an async task in a new + # thread which will call `._pause()` manually with special + # handling for root-actor caller usage. if not DebugStatus.is_main_trio_thread(): - - # TODO: we could also check for a non-`.to_thread` context - # using `trio.from_thread.check_cancelled()` (says - # oremanj) wherein we get the following outputs: - # - # `RuntimeError`: non-`.to_thread` spawned thread - # noop: non-cancelled `.to_thread` - # `trio.Cancelled`: cancelled `.to_thread` - # - log.warning( - 'Engaging `.pause_from_sync()` from ANOTHER THREAD!' - ) - task: threading.Thread = threading.current_thread() - DebugStatus.repl_task: str = task + thread: threading.Thread = threading.current_thread() + repl_owner = thread # TODO: make root-actor bg thread usage work! - # if is_root_process(): - # async def _pause_from_sync_thread(): - # ... - # else: - # .. the below .. + if is_root: + message += ( + f'-> called from a root-actor bg {thread}\n' + f'-> scheduling `._pause_from_sync_thread()`..\n' + ) + bg_task, repl = trio.from_thread.run( + afn=partial( + actor._service_n.start, + partial( + _pause_from_bg_root_thread, + behalf_of_thread=thread, + repl=repl, + hide_tb=hide_tb, + **_pause_kwargs, + ), + ) + ) + message += ( + f'-> `._pause_from_sync_thread()` started bg task {bg_task}\n' + ) + else: + message += f'-> called from a bg {thread}\n' + # NOTE: since this is a subactor, `._pause()` will + # internally issue a debug request via + # `request_root_stdio_lock()` and we don't need to + # worry about all the special considerations as with + # the root-actor per above. + bg_task, repl = trio.from_thread.run( + afn=partial( + _pause, + debug_func=None, + repl=repl, + hide_tb=hide_tb, - trio.from_thread.run( - partial( - _pause, - debug_func=None, - repl=mdb, - hide_tb=hide_tb, + # XXX to prevent `._pause()` for setting + # `DebugStatus.repl_task` to the gb task! + called_from_sync=True, + called_from_bg_thread=True, - # XXX to prevent `._pause()` for setting - # `DebugStatus.repl_task` to the gb task! - called_from_sync=True, - called_from_bg_thread=True, - - **_pause_kwargs - ), - ) + **_pause_kwargs + ), + ) + assert bg_task is not DebugStatus.repl_task else: # we are presumably the `trio.run()` + main thread - task: trio.Task = current_task() - DebugStatus.repl_task: str = task - greenback.await_( + # raises on not-found by default + greenback: ModuleType = maybe_import_greenback() + message += f'-> imported {greenback}\n' + repl_owner: Task = current_task() + message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' + out = greenback.await_( _pause( debug_func=None, - repl=mdb, + repl=repl, hide_tb=hide_tb, called_from_sync=True, **_pause_kwargs, ) ) + if out: + bg_task, repl = out + assert repl is repl + assert bg_task is repl_owner - if is_root_process(): - # Manually acquire since otherwise on release we'll - # get a RTE raised by `trio` due to ownership.. - Lock._debug_lock.acquire_nowait() - Lock._owned_by_root = True + # NOTE: normally set inside `_enter_repl_sync()` + DebugStatus.repl_task: str = repl_owner # TODO: ensure we aggressively make the user aware about - # entering the global ``breakpoint()`` built-in from sync + # entering the global `breakpoint()` built-in from sync # code? + message += ( + f'-> successfully scheduled `._pause()` in `trio` thread on behalf of {bg_task}\n' + f'-> Entering REPL via `tractor._set_trace()` from caller {repl_owner}\n' + ) + log.devx(message) + + DebugStatus.repl = repl _set_trace( - api_frame=inspect.currentframe(), - repl=mdb, + api_frame=api_frame or inspect.currentframe(), + repl=repl, hide_tb=hide_tb, actor=actor, - task=task, + task=repl_owner, ) # LEGACY NOTE on next LOC's frame showing weirdness.. # @@ -2155,6 +2461,26 @@ def pause_from_sync( raise err +def _sync_pause_from_builtin( + *args, + called_from_builtin=True, + **kwargs, +) -> None: + ''' + Proxy call `.pause_from_sync()` but indicate the caller is the + `breakpoint()` built-in. + + Note: this assigned to `os.environ['PYTHONBREAKPOINT']` inside `._root` + + ''' + pause_from_sync( + *args, + called_from_builtin=True, + api_frame=inspect.currentframe(), + **kwargs, + ) + + # NOTE prefer a new "pause" semantic since it better describes # "pausing the actor's runtime" for this particular # paralell task to do debugging in a REPL. @@ -2406,7 +2732,6 @@ async def maybe_wait_for_debugger( and not Lock.req_handler_finished.is_set() and in_debug is not None ): - # caller_frame_info: str = pformat_caller_frame() logmeth( msg @@ -2421,7 +2746,7 @@ async def maybe_wait_for_debugger( with trio.CancelScope(shield=True): await Lock.req_handler_finished.wait() - log.pdb( + log.devx( f'Subactor released debug lock\n' f'|_{in_debug}\n' ) @@ -2453,13 +2778,6 @@ async def maybe_wait_for_debugger( await trio.sleep(poll_delay) continue - # fallthrough on failure to acquire.. - # else: - # raise RuntimeError( - # msg - # + - # 'Root actor failed to acquire debug lock?' - # ) return True # else: