forked from goodboy/tractor
1
0
Fork 0

Make `request_root_stdio_lock()` post-mortem-able

Finally got this working so that if/when an internal bug is introduced
to this request task-func, we can actually REPL-debug the lock request
task itself B)

As in, if the subactor's lock request task internally errors we,
- ensure the task always terminates (by calling `DebugStatus.release()`)
  and explicitly reports (via a `log.exception()`) the internal error.
- capture the error instance and set as a new `DebugStatus.req_err` and
  always check for it on final teardown - in which case we also,
 - ensure it's reraised from a new `DebugRequestError`.
 - unhide the stack frames for `_pause()`, `_enter_repl_sync()` so that
   the dev can upward inspect the `_pause()` call stack sanely.

Supporting internal impl changes,
- add `DebugStatus.cancel()` and `.req_err`.
- don't ever cancel the request task from
  `PdbREPL.set_[continue/quit]()` only when there's some internal error
  that would likely result in a hang and stale lock state with the root.
- only release the root's lock when the current ask is also the owner
  (avoids bad release errors).
- also show internal `._pause()`-related frames on any `repl_err`.

Other temp-dev-tweaks,
- make pld-dec change log msgs info level again while solving this
  final context-vars race stuff..
- drop the debug pld-dec instance match asserts for now since
  the problem is already caught (and now debug-able B) by an attr-error
  on the decoded-as-`dict` started msg, and instead add in
  a `log.exception()` trace to see which task is triggering the case
  where the debug `MsgDec` isn't set correctly vs. when we think it's
  being applied.
runtime_to_msgspec
Tyler Goodlet 2024-05-14 15:22:13 -04:00
parent 31de5f6648
commit b23780c102
1 changed files with 207 additions and 134 deletions

View File

@ -234,49 +234,26 @@ class Lock:
cls, cls,
force: bool = False, force: bool = False,
): ):
lock: trio.StrictFIFOLock = cls._debug_lock
try: try:
if lock.locked(): lock: trio.StrictFIFOLock = cls._debug_lock
owner: Task = lock.statistics().owner
if (
lock.locked()
and
owner is current_task()
# ^-NOTE-^ if not will raise a RTE..
):
if not DebugStatus.is_main_trio_thread(): if not DebugStatus.is_main_trio_thread():
trio.from_thread.run_sync( trio.from_thread.run_sync(
cls._debug_lock.release cls._debug_lock.release
) )
else: else:
cls._debug_lock.release() cls._debug_lock.release()
message: str = 'TTY lock released for child\n' message: str = 'TTY lock released for child\n'
else: else:
message: str = 'TTY lock not held by any child\n' message: str = 'TTY lock not held by any child\n'
except RuntimeError as rte:
message: str = (
'TTY lock FAILED to release for child??\n'
f'{current_task()}\n'
)
log.exception(message)
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = cls._debug_lock.statistics().owner
# if (
# owner
# and
# cls.remote_task_in_debug is None
# ):
# raise RuntimeError(
# 'Stale `Lock` detected, no remote task active!?\n'
# f'|_{owner}\n'
# # f'{cls}'
# ) from rte
if owner:
raise rte
# OW suppress, can't member why tho .. XD
# something somethin corrupts a cancel-scope
# somewhere..
finally: finally:
# IFF there are no more requesting tasks queued up fire, the # IFF there are no more requesting tasks queued up fire, the
# "tty-unlocked" event thereby alerting any monitors of the lock that # "tty-unlocked" event thereby alerting any monitors of the lock that
@ -518,11 +495,23 @@ async def lock_tty_for_child(
locked=False, locked=False,
) )
except BaseException: except BaseException as req_err:
message: str = (
'Forcing `Lock.release()` since likely an internal error!\n'
)
if isinstance(req_err, trio.Cancelled):
log.cancel(
'Cancelled during root TTY-lock dialog?\n'
+
message
)
else:
log.exception( log.exception(
'Errored during root TTY-lock dialog?\n' 'Errored during root TTY-lock dialog?\n'
'Forcing release since an internal error caused this!\n' +
message
) )
Lock.release(force=True) Lock.release(force=True)
raise raise
@ -555,6 +544,7 @@ class DebugStatus:
repl_release: trio.Event|None = None repl_release: trio.Event|None = None
req_finished: trio.Event|None = None req_finished: trio.Event|None = None
lock_status: LockStatus|None = None lock_status: LockStatus|None = None
req_err: BaseException|None = None
_orig_sigint_handler: Callable|None = None _orig_sigint_handler: Callable|None = None
_trio_handler: ( _trio_handler: (
@ -693,28 +683,37 @@ class DebugStatus:
# is not threading.main_thread() # is not threading.main_thread()
# ) # )
@classmethod
def cancel(cls) -> bool:
if (req_cs := cls.req_cs):
req_cs.cancel()
return True
return False
@classmethod @classmethod
@pdbp.hideframe @pdbp.hideframe
def release( def release(
cls, cls,
cancel_req_task: bool = True, cancel_req_task: bool = False,
): ):
repl_release: trio.Event = cls.repl_release
try: try:
# sometimes the task might already be terminated in # sometimes the task might already be terminated in
# which case this call will raise an RTE? # which case this call will raise an RTE?
if cls.repl_release is not None: if repl_release is not None:
cls.repl_release.set() repl_release.set()
finally: finally:
# if req_ctx := cls.req_ctx: # if req_ctx := cls.req_ctx:
# req_ctx._scope.cancel() # req_ctx._scope.cancel()
if cancel_req_task:
if ( cancelled: bool = cls.cancel()
cancel_req_task if not cancelled:
and log.warning(
(req_cs := cls.req_cs) 'Failed to cancel request task!?\n'
): f'{cls.repl_task}\n'
req_cs.cancel() )
# restore original sigint handler # restore original sigint handler
cls.unshield_sigint() cls.unshield_sigint()
@ -759,16 +758,19 @@ class PdbREPL(pdbp.Pdb):
status = DebugStatus status = DebugStatus
# NOTE: see details in stdlib's `bdb.py` # NOTE: see details in stdlib's `bdb.py`
def user_exception(self, frame, exc_info): # def user_exception(self, frame, exc_info):
''' # '''
Called when we stop on an exception. # Called when we stop on an exception.
''' # '''
log.warning( # log.warning(
'Exception during REPL sesh\n\n' # 'Exception during REPL sesh\n\n'
f'{frame}\n\n' # f'{frame}\n\n'
f'{exc_info}\n\n' # f'{exc_info}\n\n'
) # )
# NOTE: this actually hooks but i don't see anyway to detect
# if an error was caught.. this is why currently we just always
# call `DebugStatus.release` inside `_post_mortem()`.
# def preloop(self): # def preloop(self):
# print('IN PRELOOP') # print('IN PRELOOP')
# super().preloop() # super().preloop()
@ -804,10 +806,7 @@ class PdbREPL(pdbp.Pdb):
try: try:
super().set_quit() super().set_quit()
finally: finally:
DebugStatus.release( DebugStatus.release()
cancel_req_task=False,
)
if ( if (
is_root_process() is_root_process()
and and
@ -863,7 +862,6 @@ def apply_debug_pldec() -> _codec.MsgCodec:
(only in the current task). (only in the current task).
''' '''
from tractor.msg import ( from tractor.msg import (
_ops as msgops, _ops as msgops,
) )
@ -874,8 +872,12 @@ def apply_debug_pldec() -> _codec.MsgCodec:
with msgops.limit_plds( with msgops.limit_plds(
spec=__pld_spec__, spec=__pld_spec__,
) as debug_dec: ) as debug_dec:
assert debug_dec is msgops.current_pldrx().pld_dec assert (
log.runtime( debug_dec
is
msgops.current_pldrx().pld_dec
)
log.info(
'Applied `.devx._debug` pld-spec\n\n' 'Applied `.devx._debug` pld-spec\n\n'
f'{debug_dec}\n' f'{debug_dec}\n'
) )
@ -887,11 +889,12 @@ def apply_debug_pldec() -> _codec.MsgCodec:
and and
plrx.pld_dec is orig_pldec plrx.pld_dec is orig_pldec
) )
log.runtime( log.info(
'Reverted to previous pld-spec\n\n' 'Reverted to previous pld-spec\n\n'
f'{orig_pldec}\n' f'{orig_pldec}\n'
) )
async def request_root_stdio_lock( async def request_root_stdio_lock(
actor_uid: tuple[str, str], actor_uid: tuple[str, str],
task_uid: tuple[str, int], task_uid: tuple[str, int],
@ -911,6 +914,10 @@ async def request_root_stdio_lock(
entering the REPL at the same time. entering the REPL at the same time.
''' '''
log.pdb(
'Initing stdio-lock request task with root actor'
)
# TODO: likely we can implement this mutex more generally as # TODO: likely we can implement this mutex more generally as
# a `._sync.Lock`? # a `._sync.Lock`?
# -[ ] simply add the wrapping needed for the debugger specifics? # -[ ] simply add the wrapping needed for the debugger specifics?
@ -923,6 +930,8 @@ async def request_root_stdio_lock(
DebugStatus.req_finished = trio.Event() DebugStatus.req_finished = trio.Event()
try: try:
from tractor._discovery import get_root from tractor._discovery import get_root
from tractor.msg import _ops as msgops
debug_dec: msgops.MsgDec
with ( with (
# NOTE: we need this to ensure that this task exits # NOTE: we need this to ensure that this task exits
# BEFORE the REPl instance raises an error like # BEFORE the REPl instance raises an error like
@ -953,12 +962,13 @@ async def request_root_stdio_lock(
# #
apply_debug_pldec() as debug_dec, apply_debug_pldec() as debug_dec,
): ):
log.critical( # XXX: was orig for debugging cs stack corruption..
'Request cancel-scope is:\n\n' # log.info(
f'{pformat_cs(req_cs, var_name="req_cs")}\n\n' # 'Request cancel-scope is:\n\n'
# f'{pformat_cs(req_cs, var_name="req_cs")}\n\n'
) # )
DebugStatus.req_cs = req_cs DebugStatus.req_cs = req_cs
req_ctx: Context|None = None
try: try:
# TODO: merge into single async with ? # TODO: merge into single async with ?
async with get_root() as portal: async with get_root() as portal:
@ -966,31 +976,37 @@ async def request_root_stdio_lock(
async with portal.open_context( async with portal.open_context(
lock_tty_for_child, lock_tty_for_child,
subactor_task_uid=task_uid, subactor_task_uid=task_uid,
) as (ctx, status): ) as (req_ctx, status):
DebugStatus.req_ctx = ctx DebugStatus.req_ctx = req_ctx
# sanity checks on pld-spec limit state
assert debug_dec
# curr_pldrx: msgops.PldRx = msgops.current_pldrx()
# assert (
# curr_pldrx.pld_dec is debug_dec
# )
from tractor.msg import (
_ops as msgops,
)
assert (
msgops.current_pldrx().pld_dec is debug_dec
)
log.debug( log.debug(
'Subactor locked TTY with msg\n\n' 'Subactor locked TTY with msg\n\n'
f'{status}\n' f'{status}\n'
) )
# mk_pdb().set_trace() # mk_pdb().set_trace()
try:
assert status.subactor_uid == actor_uid assert status.subactor_uid == actor_uid
assert status.cid assert status.cid
except AttributeError:
log.exception('failed pldspec asserts!')
raise
# set last rxed lock dialog status. # set last rxed lock dialog status.
DebugStatus.lock_status = status DebugStatus.lock_status = status
async with ctx.open_stream() as stream: async with req_ctx.open_stream() as stream:
assert DebugStatus.repl_release assert DebugStatus.repl_release
task_status.started(ctx) task_status.started(req_ctx)
# wait for local task to exit its # wait for local task to exit its
# `PdbREPL.interaction()`, call # `PdbREPL.interaction()`, call
@ -1006,25 +1022,25 @@ async def request_root_stdio_lock(
# sync with child-side root locker task # sync with child-side root locker task
# completion # completion
status: LockStatus = await ctx.result() status: LockStatus = await req_ctx.result()
assert not status.locked assert not status.locked
DebugStatus.lock_status = status DebugStatus.lock_status = status
log.pdb( log.pdb(
'TTY lock was released for subactor with msg\n\n' 'TTY lock was released for subactor with msg\n\n'
f'{status}\n\n' f'{status}\n\n'
f'Exitting {ctx.side!r}-side of locking ctx' f'Exitting {req_ctx.side!r}-side of locking req_ctx'
) )
except ( except (
tractor.ContextCancelled, tractor.ContextCancelled,
trio.Cancelled, trio.Cancelled,
): ):
log.exception( log.cancel(
'Debug lock request CANCELLED?\n\n' 'Debug lock request was CANCELLED?\n\n'
f'{pformat_cs(req_cs, var_name="req_cs")}\n\n' f'{req_ctx}\n'
f'{pformat_cs(ctx._scope, var_name="ctx._scope")}\n\n' # f'{pformat_cs(req_cs, var_name="req_cs")}\n\n'
f'{ctx}' # f'{pformat_cs(req_ctx._scope, var_name="req_ctx._scope")}\n\n'
) )
raise raise
@ -1033,11 +1049,11 @@ async def request_root_stdio_lock(
): ):
log.exception( log.exception(
'Failed during root TTY-lock dialog?\n' 'Failed during root TTY-lock dialog?\n'
f'{ctx}\n' f'{req_ctx}\n'
f'Cancelling IPC ctx!\n' f'Cancelling IPC ctx!\n'
) )
await ctx.cancel() await req_ctx.cancel()
raise raise
@ -1047,13 +1063,26 @@ async def request_root_stdio_lock(
): ):
log.cancel( log.cancel(
'Debug lock request CANCELLED?\n' 'Debug lock request CANCELLED?\n'
f'{ctx}\n' f'{req_ctx}\n'
) )
raise raise
except BaseException: except BaseException as req_err:
log.exception('Errored during root TTY-lock dialog?') # log.error('Failed to request root stdio-lock?')
raise DebugStatus.req_err = req_err
DebugStatus.release()
# TODO: how to dev a test that ensures we actually drop
# into THIS internal frame on any internal error in the above
# code?
# -[ ] eg. on failed pld_dec assert above we should be able
# to REPL pm it.
# -[ ]FURTHER, after we 'continue', we should be able to
# ctl-c out of the currently hanging task!
raise DebugRequestError(
'Failed to lock stdio from subactor IPC ctx!\n\n'
f'req_ctx: {req_ctx}\n'
) from req_err
finally: finally:
log.debug('Exiting debugger TTY lock request func from child') log.debug('Exiting debugger TTY lock request func from child')
@ -1369,6 +1398,13 @@ def shield_sigint_handler(
_pause_msg: str = 'Attaching to pdb REPL in actor' _pause_msg: str = 'Attaching to pdb REPL in actor'
class DebugRequestError(RuntimeError):
'''
Failed to request stdio lock from root actor!
'''
async def _pause( async def _pause(
debug_func: Callable|None, debug_func: Callable|None,
@ -1480,15 +1516,18 @@ async def _pause(
raise raise
except BaseException: except BaseException:
__tracebackhide__: bool = False
log.exception( log.exception(
'Failed to invoke internal `debug_func = ' 'Failed to invoke internal `debug_func = '
f'{debug_func.func.__name__}`\n' f'{debug_func.func.__name__}`\n'
) )
# NOTE: OW this is ONLY called from the # NOTE: OW this is ONLY called from the
# `.set_continue/next` hooks! # `.set_continue/next` hooks!
DebugStatus.release() DebugStatus.release(cancel_req_task=True)
raise raise
repl_err: BaseException|None = None
try: try:
if is_root_process(): if is_root_process():
@ -1584,26 +1623,37 @@ async def _pause(
# actor._service_n.cancel_scope.shield = shield # actor._service_n.cancel_scope.shield = shield
# ``` # ```
# but not entirely sure if that's a sane way to implement it? # but not entirely sure if that's a sane way to implement it?
try:
# NOTE spawn the stdio locker request task inside the # NOTE currently we spawn the lock request task inside this
# current `Context._scope_nursery` to entsure that # subactor's global `Actor._service_n` so that the
# the request never can outlive the task's (parent) # lifetime of the lock-request can outlive the current
# lifetime. # `._pause()` scope while the user steps through their
curr_ctx: Context = current_ipc_ctx() # application code and when they finally exit the
# TODO: see `_errors_relayed_via_ipc()` where we # session, via 'continue' or 'quit' cmds, the `PdbREPL`
# should dynamically open a `debug_tn` for use here, # will manually call `DebugStatus.release()` to release
# BUT it needs to be outside the normal error # the lock session with the root actor.
# catching and `_maybe_enter_debugger()` call! #
# TODO: ideally we can add a tighter scope for this
# request task likely by conditionally opening a "debug
# nursery" inside `_errors_relayed_via_ipc()`, see the
# todo in tht module, but
# -[ ] it needs to be outside the normal crash handling
# `_maybe_enter_debugger()` block-call.
# -[ ] we probably only need to allocate the nursery when
# we detect the runtime is already in debug mode.
#
# ctx: Context = await curr_ctx._debug_tn.start( # ctx: Context = await curr_ctx._debug_tn.start(
ctx: Context = await actor._service_n.start( req_ctx: Context = await actor._service_n.start(
request_root_stdio_lock, request_root_stdio_lock,
actor.uid, actor.uid,
(task.name, id(task)), # task uuid (effectively) (task.name, id(task)), # task uuid (effectively)
) )
# our locker task should be the one in ctx # XXX sanity, our locker task should be the one which
# with the root actor # entered a new IPC ctx with the root actor, NOT the one
# that exists around the task calling into `._pause()`.
curr_ctx: Context = current_ipc_ctx()
assert ( assert (
ctx req_ctx
is is
DebugStatus.req_ctx DebugStatus.req_ctx
is not is not
@ -1613,15 +1663,6 @@ async def _pause(
# enter REPL # enter REPL
_enter_repl_sync(debug_func) _enter_repl_sync(debug_func)
except RuntimeError:
if actor._cancel_called:
# service nursery won't be usable and we
# don't want to lock up the root either way since
# we're in (the midst of) cancellation.
return
raise
# TODO: prolly factor this plus the similar block from # TODO: prolly factor this plus the similar block from
# `_enter_repl_sync()` into a common @cm? # `_enter_repl_sync()` into a common @cm?
except BaseException as repl_err: except BaseException as repl_err:
@ -1629,13 +1670,31 @@ async def _pause(
log.devx( log.devx(
'REPL for pdb was quit!\n' 'REPL for pdb was quit!\n'
) )
# when the actor is mid-runtime cancellation the
# `Actor._service_n` might get closed before we can spawn
# the request task, so just ignore expected RTE.
elif (
isinstance(repl_err, RuntimeError)
and
actor._cancel_called
):
# service nursery won't be usable and we
# don't want to lock up the root either way since
# we're in (the midst of) cancellation.
log.warning(
'Service nursery likely closed due to actor-runtime cancellation..\n'
'Ignoring failed debugger lock request task spawn..\n'
)
return
else: else:
log.exception( log.exception(
'Failed to engage debugger via `_pause()` ??\n' 'Failed to engage debugger via `_pause()` ??\n'
) )
mk_pdb().set_trace()
DebugStatus.release() DebugStatus.release(cancel_req_task=True)
# sanity checks for ^ on request/status teardown # sanity checks for ^ on request/status teardown
assert DebugStatus.repl is None assert DebugStatus.repl is None
assert DebugStatus.repl_task is None assert DebugStatus.repl_task is None
@ -1645,6 +1704,16 @@ async def _pause(
raise raise
finally:
# always show frame when request fails due to internal
# failure in the above code (including an `BdbQuit`).
if (
DebugStatus.req_err
or
repl_err
):
__tracebackhide__: bool = False
def _set_trace( def _set_trace(
repl: PdbREPL, # passed by `_pause()` repl: PdbREPL, # passed by `_pause()`
@ -1703,7 +1772,7 @@ async def pause(
https://en.wikipedia.org/wiki/Breakpoint https://en.wikipedia.org/wiki/Breakpoint
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = hide_tb
# always start 1 level up from THIS in user code since normally # always start 1 level up from THIS in user code since normally
# `tractor.pause()` is called explicitly by use-app code thus # `tractor.pause()` is called explicitly by use-app code thus
@ -1885,12 +1954,15 @@ def pause_from_sync(
# NOTE prefer a new "pause" semantic since it better describes # NOTE prefer a new "pause" semantic since it better describes
# "pausing the actor's runtime" for this particular # "pausing the actor's runtime" for this particular
# paralell task to do debugging in a REPL. # paralell task to do debugging in a REPL.
async def breakpoint(**kwargs): async def breakpoint(
hide_tb: bool = True,
**kwargs,
):
log.warning( log.warning(
'`tractor.breakpoint()` is deprecated!\n' '`tractor.breakpoint()` is deprecated!\n'
'Please use `tractor.pause()` instead!\n' 'Please use `tractor.pause()` instead!\n'
) )
__tracebackhide__: bool = True __tracebackhide__: bool = hide_tb
await pause( await pause(
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
**kwargs, **kwargs,
@ -1951,6 +2023,7 @@ def _post_mortem(
# frame=None, # frame=None,
traceback=tb, traceback=tb,
) )
# XXX NOTE XXX: absolutely required to avoid hangs!
# Since we presume the post-mortem was enaged to a task-ending # Since we presume the post-mortem was enaged to a task-ending
# error, we MUST release the local REPL request so that not other # error, we MUST release the local REPL request so that not other
# local task nor the root remains blocked! # local task nor the root remains blocked!