Messy-teardown `DebugStatus` related fixes

Mostly fixing edge cases with `asyncio` and/or bg threads where the
`.repl_release: trio.Event` needs to be used from the main `trio`
thread OW confusing-but-valid teardown tracebacks can show under various
races.

Also improve,
- log reporting for such internal bugs to make them more obvious on
  console via `log.exception()`.
- only restore the SIGINT handler when runtime is (still) active.
- reporting when `tractor.pause(shield=True)` should be used and
  unhiding the internal frames from the tb in that case.
- for `pause_from_sync()` some deep fixes..
 |_add a `allow_no_runtime: bool = False` flag to allow
   **not** requiring the actor runtime to be active.
 |_fix the `greenback` case-branch to only trigger on `not
   is_trio_thread`.
 |_add a scope-global `repl_owner: Task|Thread|None = None` to
   avoid ref errors..
aio_abandons
Tyler Goodlet 2024-12-03 15:26:25 -05:00
parent 5c2e972315
commit 8af9b0201d
1 changed files with 86 additions and 26 deletions

View File

@ -730,6 +730,9 @@ class DebugStatus:
# -[ ] see if we can get our proto oco task-mngr to work for # -[ ] see if we can get our proto oco task-mngr to work for
# this? # this?
repl_task: Task|None = None repl_task: Task|None = None
# repl_thread: Thread|None = None
# ^TODO?
repl_release: trio.Event|None = None repl_release: trio.Event|None = None
req_task: Task|None = None req_task: Task|None = None
@ -839,11 +842,12 @@ class DebugStatus:
if ( if (
not cls.is_main_trio_thread() not cls.is_main_trio_thread()
and and
# not _state._runtime_vars.get( not _state._runtime_vars.get(
# '_is_infected_aio', '_is_infected_aio',
# False, False,
# ) )
not current_actor().is_infected_aio() # not current_actor().is_infected_aio()
# ^XXX, since for bg-thr case will always raise..
): ):
trio.from_thread.run_sync( trio.from_thread.run_sync(
signal.signal, signal.signal,
@ -928,12 +932,27 @@ class DebugStatus:
try: try:
# sometimes the task might already be terminated in # sometimes the task might already be terminated in
# which case this call will raise an RTE? # which case this call will raise an RTE?
if repl_release is not None: # See below for reporting on that..
if (
repl_release is not None
and
not repl_release.is_set()
):
if cls.is_main_trio_thread(): if cls.is_main_trio_thread():
repl_release.set() repl_release.set()
elif current_actor().is_infected_aio(): elif (
_state._runtime_vars.get(
'_is_infected_aio',
False,
)
# ^XXX, again bc we need to not except
# but for bg-thread case it will always raise..
#
# TODO, is there a better api then using
# `err_on_no_runtime=False` in the below?
# current_actor().is_infected_aio()
):
async def _set_repl_release(): async def _set_repl_release():
repl_release.set() repl_release.set()
@ -949,6 +968,15 @@ class DebugStatus:
trio.from_thread.run_sync( trio.from_thread.run_sync(
repl_release.set repl_release.set
) )
except RuntimeError as rte:
log.exception(
f'Failed to release debug-request ??\n\n'
f'{cls.repr()}\n'
)
# pdbp.set_trace()
raise rte
finally: finally:
# if req_ctx := cls.req_ctx: # if req_ctx := cls.req_ctx:
# req_ctx._scope.cancel() # req_ctx._scope.cancel()
@ -976,9 +1004,10 @@ class DebugStatus:
# logging when we don't need to? # logging when we don't need to?
cls.repl = None cls.repl = None
# restore original sigint handler # maybe restore original sigint handler
cls.unshield_sigint() # XXX requires runtime check to avoid crash!
if current_actor(err_on_no_runtime=False):
cls.unshield_sigint()
# TODO: use the new `@lowlevel.singleton` for this! # TODO: use the new `@lowlevel.singleton` for this!
@ -1066,7 +1095,7 @@ class PdbREPL(pdbp.Pdb):
# Lock.release(raise_on_thread=False) # Lock.release(raise_on_thread=False)
Lock.release() Lock.release()
# XXX after `Lock.release()` for root local repl usage # XXX AFTER `Lock.release()` for root local repl usage
DebugStatus.release() DebugStatus.release()
def set_quit(self): def set_quit(self):
@ -1672,7 +1701,7 @@ class DebugRequestError(RuntimeError):
''' '''
_repl_fail_msg: str = ( _repl_fail_msg: str|None = (
'Failed to REPl via `_pause()` ' 'Failed to REPl via `_pause()` '
) )
@ -1712,6 +1741,7 @@ async def _pause(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
pause_err: BaseException|None = None
actor: Actor = current_actor() actor: Actor = current_actor()
try: try:
task: Task = current_task() task: Task = current_task()
@ -2094,11 +2124,13 @@ async def _pause(
# TODO: prolly factor this plus the similar block from # TODO: prolly factor this plus the similar block from
# `_enter_repl_sync()` into a common @cm? # `_enter_repl_sync()` into a common @cm?
except BaseException as pause_err: except BaseException as _pause_err:
pause_err: BaseException = _pause_err
if isinstance(pause_err, bdb.BdbQuit): if isinstance(pause_err, bdb.BdbQuit):
log.devx( log.devx(
'REPL for pdb was quit!\n' 'REPL for pdb was explicitly quit!\n'
) )
_repl_fail_msg = None
# when the actor is mid-runtime cancellation the # when the actor is mid-runtime cancellation the
# `Actor._service_n` might get closed before we can spawn # `Actor._service_n` might get closed before we can spawn
@ -2117,13 +2149,18 @@ async def _pause(
) )
return return
else: elif isinstance(pause_err, trio.Cancelled):
log.exception( _repl_fail_msg = (
_repl_fail_msg 'You called `tractor.pause()` from an already cancelled scope!\n\n'
+ 'Consider `await tractor.pause(shield=True)` to make it work B)\n'
f'on behalf of {repl_task} ??\n'
) )
else:
_repl_fail_msg += f'on behalf of {repl_task} ??\n'
if _repl_fail_msg:
log.exception(_repl_fail_msg)
if not actor.is_infected_aio(): if not actor.is_infected_aio():
DebugStatus.release(cancel_req_task=True) DebugStatus.release(cancel_req_task=True)
@ -2152,6 +2189,8 @@ async def _pause(
DebugStatus.req_err DebugStatus.req_err
or or
repl_err repl_err
or
pause_err
): ):
__tracebackhide__: bool = False __tracebackhide__: bool = False
@ -2435,6 +2474,8 @@ def pause_from_sync(
called_from_builtin: bool = False, called_from_builtin: bool = False,
api_frame: FrameType|None = None, api_frame: FrameType|None = None,
allow_no_runtime: bool = False,
# proxy to `._pause()`, for ex: # proxy to `._pause()`, for ex:
# shield: bool = False, # shield: bool = False,
# api_frame: FrameType|None = None, # api_frame: FrameType|None = None,
@ -2453,16 +2494,25 @@ def pause_from_sync(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
repl_owner: Task|Thread|None = None
try: try:
actor: tractor.Actor = current_actor( actor: tractor.Actor = current_actor(
err_on_no_runtime=False, err_on_no_runtime=False,
) )
if not actor: if (
raise RuntimeError( not actor
'Not inside the `tractor`-runtime?\n' and
not allow_no_runtime
):
raise NoRuntime(
'The actor runtime has not been opened?\n\n'
'`tractor.pause_from_sync()` is not functional without a wrapping\n' '`tractor.pause_from_sync()` is not functional without a wrapping\n'
'- `async with tractor.open_nursery()` or,\n' '- `async with tractor.open_nursery()` or,\n'
'- `async with tractor.open_root_actor()`\n' '- `async with tractor.open_root_actor()`\n\n'
'If you are getting this from a builtin `breakpoint()` call\n'
'it might mean the runtime was started then '
'stopped prematurely?\n'
) )
message: str = ( message: str = (
f'{actor.uid} task called `tractor.pause_from_sync()`\n' f'{actor.uid} task called `tractor.pause_from_sync()`\n'
@ -2485,6 +2535,7 @@ def pause_from_sync(
repl: PdbREPL = mk_pdb() repl: PdbREPL = mk_pdb()
# message += f'-> created local REPL {repl}\n' # message += f'-> created local REPL {repl}\n'
is_trio_thread: bool = DebugStatus.is_main_trio_thread()
is_root: bool = is_root_process() is_root: bool = is_root_process()
is_aio: bool = actor.is_infected_aio() is_aio: bool = actor.is_infected_aio()
@ -2500,7 +2551,7 @@ def pause_from_sync(
# thread which will call `._pause()` manually with special # thread which will call `._pause()` manually with special
# handling for root-actor caller usage. # handling for root-actor caller usage.
if ( if (
not DebugStatus.is_main_trio_thread() not is_trio_thread
and and
not is_aio # see below for this usage not is_aio # see below for this usage
): ):
@ -2574,7 +2625,11 @@ def pause_from_sync(
DebugStatus.shield_sigint() DebugStatus.shield_sigint()
assert bg_task is not DebugStatus.repl_task assert bg_task is not DebugStatus.repl_task
elif is_aio: elif (
not is_trio_thread
and
is_aio
):
greenback: ModuleType = maybe_import_greenback() greenback: ModuleType = maybe_import_greenback()
repl_owner: Task = asyncio.current_task() repl_owner: Task = asyncio.current_task()
DebugStatus.shield_sigint() DebugStatus.shield_sigint()
@ -2758,9 +2813,11 @@ def _post_mortem(
# ^TODO, instead a nice runtime-info + maddr + uid? # ^TODO, instead a nice runtime-info + maddr + uid?
# -[ ] impl a `Actor.__repr()__`?? # -[ ] impl a `Actor.__repr()__`??
# |_ <task>:<thread> @ <actor> # |_ <task>:<thread> @ <actor>
# no_runtime: bool = False
except NoRuntime: except NoRuntime:
actor_repr: str = '<no-actor-runtime?>' actor_repr: str = '<no-actor-runtime?>'
# no_runtime: bool = True
try: try:
task_repr: Task = current_task() task_repr: Task = current_task()
@ -2796,6 +2853,8 @@ def _post_mortem(
# Since we presume the post-mortem was enaged to a task-ending # Since we presume the post-mortem was enaged to a task-ending
# error, we MUST release the local REPL request so that not other # error, we MUST release the local REPL request so that not other
# local task nor the root remains blocked! # local task nor the root remains blocked!
# if not no_runtime:
# DebugStatus.release()
DebugStatus.release() DebugStatus.release()
@ -3033,6 +3092,7 @@ async def maybe_wait_for_debugger(
# pass # pass
return False return False
# TODO: better naming and what additionals? # TODO: better naming and what additionals?
# - [ ] optional runtime plugging? # - [ ] optional runtime plugging?
# - [ ] detection for sync vs. async code? # - [ ] detection for sync vs. async code?