Handle egs on failed `request_root_stdio_lock()`
Namely when the subactor fails to lock the root, in which case we try to be very verbose about how/what failed in logging as well as ensure we cancel the employed IPC ctx. Implement the outer `BaseException` handler to handle both styles, - match on an eg (or the prior std cancel excs) only raising a lone sub-exc from for former. - always `as _req_err:` and assign to a new func-global `req_err` to enable the above matching. Other, - raise `DebugStateError` on `status.subactor_uid != actor_uid`. - fix a `_repl_fail_report` ref error due to making silly assumptions about the `_repl_fail_msg` global; now copy from global as default. - various log-fmt and logic expression styling tweaks. - ignore `trio.Cancelled` by default in `open_crash_handler()`.py313_support
parent
b90f3ce781
commit
0349d492ab
|
@ -317,8 +317,6 @@ class Lock:
|
||||||
we_released: bool = False
|
we_released: bool = False
|
||||||
ctx_in_debug: Context|None = cls.ctx_in_debug
|
ctx_in_debug: Context|None = cls.ctx_in_debug
|
||||||
repl_task: Task|Thread|None = DebugStatus.repl_task
|
repl_task: Task|Thread|None = DebugStatus.repl_task
|
||||||
message: str = ''
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not DebugStatus.is_main_trio_thread():
|
if not DebugStatus.is_main_trio_thread():
|
||||||
thread: threading.Thread = threading.current_thread()
|
thread: threading.Thread = threading.current_thread()
|
||||||
|
@ -333,6 +331,10 @@ class Lock:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
task: Task = current_task()
|
task: Task = current_task()
|
||||||
|
message: str = (
|
||||||
|
'TTY NOT RELEASED on behalf of caller\n'
|
||||||
|
f'|_{task}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# sanity check that if we're the root actor
|
# sanity check that if we're the root actor
|
||||||
# the lock is marked as such.
|
# the lock is marked as such.
|
||||||
|
@ -347,11 +349,6 @@ class Lock:
|
||||||
else:
|
else:
|
||||||
assert DebugStatus.repl_task is not task
|
assert DebugStatus.repl_task is not task
|
||||||
|
|
||||||
message: str = (
|
|
||||||
'TTY lock was NOT released on behalf of caller\n'
|
|
||||||
f'|_{task}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
lock: trio.StrictFIFOLock = cls._debug_lock
|
lock: trio.StrictFIFOLock = cls._debug_lock
|
||||||
owner: Task = lock.statistics().owner
|
owner: Task = lock.statistics().owner
|
||||||
if (
|
if (
|
||||||
|
@ -366,23 +363,21 @@ class Lock:
|
||||||
# correct task, greenback-spawned-task and/or thread
|
# correct task, greenback-spawned-task and/or thread
|
||||||
# being set to the `.repl_task` such that the above
|
# being set to the `.repl_task` such that the above
|
||||||
# condition matches and we actually release the lock.
|
# condition matches and we actually release the lock.
|
||||||
|
#
|
||||||
# This is particular of note from `.pause_from_sync()`!
|
# This is particular of note from `.pause_from_sync()`!
|
||||||
|
|
||||||
):
|
):
|
||||||
cls._debug_lock.release()
|
cls._debug_lock.release()
|
||||||
we_released: bool = True
|
we_released: bool = True
|
||||||
if repl_task:
|
if repl_task:
|
||||||
message: str = (
|
message: str = (
|
||||||
'Lock released on behalf of root-actor-local REPL owner\n'
|
'TTY released on behalf of root-actor-local REPL owner\n'
|
||||||
f'|_{repl_task}\n'
|
f'|_{repl_task}\n'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
message: str = (
|
message: str = (
|
||||||
'TTY lock released by us on behalf of remote peer?\n'
|
'TTY released by us on behalf of remote peer?\n'
|
||||||
f'|_ctx_in_debug: {ctx_in_debug}\n\n'
|
f'{ctx_in_debug}\n'
|
||||||
)
|
)
|
||||||
# mk_pdb().set_trace()
|
|
||||||
# elif owner:
|
|
||||||
|
|
||||||
except RuntimeError as rte:
|
except RuntimeError as rte:
|
||||||
log.exception(
|
log.exception(
|
||||||
|
@ -400,7 +395,8 @@ class Lock:
|
||||||
req_handler_finished: trio.Event|None = Lock.req_handler_finished
|
req_handler_finished: trio.Event|None = Lock.req_handler_finished
|
||||||
if (
|
if (
|
||||||
not lock_stats.owner
|
not lock_stats.owner
|
||||||
and req_handler_finished is None
|
and
|
||||||
|
req_handler_finished is None
|
||||||
):
|
):
|
||||||
message += (
|
message += (
|
||||||
'-> No new task holds the TTY lock!\n\n'
|
'-> No new task holds the TTY lock!\n\n'
|
||||||
|
@ -418,8 +414,8 @@ class Lock:
|
||||||
repl_task
|
repl_task
|
||||||
)
|
)
|
||||||
message += (
|
message += (
|
||||||
f'A non-caller task still owns this lock on behalf of '
|
f'A non-caller task still owns this lock on behalf of\n'
|
||||||
f'`{behalf_of_task}`\n'
|
f'{behalf_of_task}\n'
|
||||||
f'lock owner task: {lock_stats.owner}\n'
|
f'lock owner task: {lock_stats.owner}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -447,8 +443,6 @@ class Lock:
|
||||||
|
|
||||||
if message:
|
if message:
|
||||||
log.devx(message)
|
log.devx(message)
|
||||||
else:
|
|
||||||
import pdbp; pdbp.set_trace()
|
|
||||||
|
|
||||||
return we_released
|
return we_released
|
||||||
|
|
||||||
|
@ -668,10 +662,11 @@ async def lock_stdio_for_peer(
|
||||||
fail_reason: str = (
|
fail_reason: str = (
|
||||||
f'on behalf of peer\n\n'
|
f'on behalf of peer\n\n'
|
||||||
f'x)<=\n'
|
f'x)<=\n'
|
||||||
f' |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
|
f' |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n'
|
||||||
|
f'\n'
|
||||||
'Forcing `Lock.release()` due to acquire failure!\n\n'
|
'Forcing `Lock.release()` due to acquire failure!\n\n'
|
||||||
f'x)=> {ctx}\n'
|
f'x)=>\n'
|
||||||
|
f' {ctx}'
|
||||||
)
|
)
|
||||||
if isinstance(req_err, trio.Cancelled):
|
if isinstance(req_err, trio.Cancelled):
|
||||||
fail_reason = (
|
fail_reason = (
|
||||||
|
@ -1179,7 +1174,7 @@ async def request_root_stdio_lock(
|
||||||
log.devx(
|
log.devx(
|
||||||
'Initing stdio-lock request task with root actor'
|
'Initing stdio-lock request task with root actor'
|
||||||
)
|
)
|
||||||
# TODO: likely we can implement this mutex more generally as
|
# TODO: can we implement this mutex more generally as
|
||||||
# a `._sync.Lock`?
|
# a `._sync.Lock`?
|
||||||
# -[ ] simply add the wrapping needed for the debugger specifics?
|
# -[ ] simply add the wrapping needed for the debugger specifics?
|
||||||
# - the `__pld_spec__` impl and maybe better APIs for the client
|
# - the `__pld_spec__` impl and maybe better APIs for the client
|
||||||
|
@ -1190,6 +1185,7 @@ async def request_root_stdio_lock(
|
||||||
# - https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.RLock
|
# - https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.RLock
|
||||||
DebugStatus.req_finished = trio.Event()
|
DebugStatus.req_finished = trio.Event()
|
||||||
DebugStatus.req_task = current_task()
|
DebugStatus.req_task = current_task()
|
||||||
|
req_err: BaseException|None = None
|
||||||
try:
|
try:
|
||||||
from tractor._discovery import get_root
|
from tractor._discovery import get_root
|
||||||
# NOTE: we need this to ensure that this task exits
|
# NOTE: we need this to ensure that this task exits
|
||||||
|
@ -1212,6 +1208,7 @@ async def request_root_stdio_lock(
|
||||||
# )
|
# )
|
||||||
DebugStatus.req_cs = req_cs
|
DebugStatus.req_cs = req_cs
|
||||||
req_ctx: Context|None = None
|
req_ctx: Context|None = None
|
||||||
|
ctx_eg: BaseExceptionGroup|None = None
|
||||||
try:
|
try:
|
||||||
# TODO: merge into single async with ?
|
# TODO: merge into single async with ?
|
||||||
async with get_root() as portal:
|
async with get_root() as portal:
|
||||||
|
@ -1242,7 +1239,12 @@ async def request_root_stdio_lock(
|
||||||
)
|
)
|
||||||
|
|
||||||
# try:
|
# try:
|
||||||
assert status.subactor_uid == actor_uid
|
if (locker := status.subactor_uid) != actor_uid:
|
||||||
|
raise DebugStateError(
|
||||||
|
f'Root actor locked by another peer !?\n'
|
||||||
|
f'locker: {locker!r}\n'
|
||||||
|
f'actor_uid: {actor_uid}\n'
|
||||||
|
)
|
||||||
assert status.cid
|
assert status.cid
|
||||||
# except AttributeError:
|
# except AttributeError:
|
||||||
# log.exception('failed pldspec asserts!')
|
# log.exception('failed pldspec asserts!')
|
||||||
|
@ -1279,10 +1281,11 @@ async def request_root_stdio_lock(
|
||||||
f'Exitting {req_ctx.side!r}-side of locking req_ctx\n'
|
f'Exitting {req_ctx.side!r}-side of locking req_ctx\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
except (
|
except* (
|
||||||
tractor.ContextCancelled,
|
tractor.ContextCancelled,
|
||||||
trio.Cancelled,
|
trio.Cancelled,
|
||||||
):
|
) as _taskc_eg:
|
||||||
|
ctx_eg = _taskc_eg
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Debug lock request was CANCELLED?\n\n'
|
'Debug lock request was CANCELLED?\n\n'
|
||||||
f'<=c) {req_ctx}\n'
|
f'<=c) {req_ctx}\n'
|
||||||
|
@ -1291,21 +1294,23 @@ async def request_root_stdio_lock(
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
except (
|
except* (
|
||||||
BaseException,
|
BaseException,
|
||||||
) as ctx_err:
|
) as _ctx_eg:
|
||||||
|
ctx_eg = _ctx_eg
|
||||||
message: str = (
|
message: str = (
|
||||||
'Failed during debug request dialog with root actor?\n\n'
|
'Failed during debug request dialog with root actor?\n'
|
||||||
)
|
)
|
||||||
if (req_ctx := DebugStatus.req_ctx):
|
if (req_ctx := DebugStatus.req_ctx):
|
||||||
message += (
|
message += (
|
||||||
f'<=x) {req_ctx}\n\n'
|
f'<=x)\n'
|
||||||
|
f' |_{req_ctx}\n'
|
||||||
f'Cancelling IPC ctx!\n'
|
f'Cancelling IPC ctx!\n'
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
await req_ctx.cancel()
|
await req_ctx.cancel()
|
||||||
except trio.ClosedResourceError as terr:
|
except trio.ClosedResourceError as terr:
|
||||||
ctx_err.add_note(
|
ctx_eg.add_note(
|
||||||
# f'Failed with {type(terr)!r} x)> `req_ctx.cancel()` '
|
# f'Failed with {type(terr)!r} x)> `req_ctx.cancel()` '
|
||||||
f'Failed with `req_ctx.cancel()` <x) {type(terr)!r} '
|
f'Failed with `req_ctx.cancel()` <x) {type(terr)!r} '
|
||||||
)
|
)
|
||||||
|
@ -1314,21 +1319,45 @@ async def request_root_stdio_lock(
|
||||||
message += 'Failed in `Portal.open_context()` call ??\n'
|
message += 'Failed in `Portal.open_context()` call ??\n'
|
||||||
|
|
||||||
log.exception(message)
|
log.exception(message)
|
||||||
ctx_err.add_note(message)
|
ctx_eg.add_note(message)
|
||||||
raise ctx_err
|
raise ctx_eg
|
||||||
|
|
||||||
except (
|
except BaseException as _req_err:
|
||||||
tractor.ContextCancelled,
|
req_err = _req_err
|
||||||
trio.Cancelled,
|
|
||||||
):
|
# XXX NOTE, since new `trio` enforces strict egs by default
|
||||||
log.cancel(
|
# we have to always handle the eg explicitly given the
|
||||||
'Debug lock request CANCELLED?\n'
|
# `Portal.open_context()` call above (which implicitly opens
|
||||||
f'{req_ctx}\n'
|
# a nursery).
|
||||||
)
|
match req_err:
|
||||||
raise
|
case BaseExceptionGroup():
|
||||||
|
# for an eg of just one taskc, just unpack and raise
|
||||||
|
# since we want to propagate a plane ol' `Cancelled`
|
||||||
|
# up from the `.pause()` call.
|
||||||
|
excs: list[BaseException] = req_err.exceptions
|
||||||
|
if (
|
||||||
|
len(excs) == 1
|
||||||
|
and
|
||||||
|
type(exc := excs[0]) in (
|
||||||
|
tractor.ContextCancelled,
|
||||||
|
trio.Cancelled,
|
||||||
|
)
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Debug lock request CANCELLED?\n'
|
||||||
|
f'{req_ctx}\n'
|
||||||
|
)
|
||||||
|
raise exc
|
||||||
|
case (
|
||||||
|
tractor.ContextCancelled(),
|
||||||
|
trio.Cancelled(),
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Debug lock request CANCELLED?\n'
|
||||||
|
f'{req_ctx}\n'
|
||||||
|
)
|
||||||
|
raise exc
|
||||||
|
|
||||||
except BaseException as req_err:
|
|
||||||
# log.error('Failed to request root stdio-lock?')
|
|
||||||
DebugStatus.req_err = req_err
|
DebugStatus.req_err = req_err
|
||||||
DebugStatus.release()
|
DebugStatus.release()
|
||||||
|
|
||||||
|
@ -1343,7 +1372,7 @@ async def request_root_stdio_lock(
|
||||||
'Failed during stdio-locking dialog from root actor\n\n'
|
'Failed during stdio-locking dialog from root actor\n\n'
|
||||||
|
|
||||||
f'<=x)\n'
|
f'<=x)\n'
|
||||||
f'|_{DebugStatus.req_ctx}\n'
|
f' |_{DebugStatus.req_ctx}\n'
|
||||||
) from req_err
|
) from req_err
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -1406,7 +1435,7 @@ def any_connected_locker_child() -> bool:
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
|
|
||||||
if not is_root_process():
|
if not is_root_process():
|
||||||
raise RuntimeError('This is a root-actor only API!')
|
raise InternalError('This is a root-actor only API!')
|
||||||
|
|
||||||
if (
|
if (
|
||||||
(ctx := Lock.ctx_in_debug)
|
(ctx := Lock.ctx_in_debug)
|
||||||
|
@ -2143,11 +2172,12 @@ async def _pause(
|
||||||
# `_enter_repl_sync()` into a common @cm?
|
# `_enter_repl_sync()` into a common @cm?
|
||||||
except BaseException as _pause_err:
|
except BaseException as _pause_err:
|
||||||
pause_err: BaseException = _pause_err
|
pause_err: BaseException = _pause_err
|
||||||
|
_repl_fail_report: str|None = _repl_fail_msg
|
||||||
if isinstance(pause_err, bdb.BdbQuit):
|
if isinstance(pause_err, bdb.BdbQuit):
|
||||||
log.devx(
|
log.devx(
|
||||||
'REPL for pdb was explicitly quit!\n'
|
'REPL for pdb was explicitly quit!\n'
|
||||||
)
|
)
|
||||||
_repl_fail_msg = None
|
_repl_fail_report = None
|
||||||
|
|
||||||
# when the actor is mid-runtime cancellation the
|
# when the actor is mid-runtime cancellation the
|
||||||
# `Actor._service_n` might get closed before we can spawn
|
# `Actor._service_n` might get closed before we can spawn
|
||||||
|
@ -2167,16 +2197,16 @@ async def _pause(
|
||||||
return
|
return
|
||||||
|
|
||||||
elif isinstance(pause_err, trio.Cancelled):
|
elif isinstance(pause_err, trio.Cancelled):
|
||||||
_repl_fail_msg = (
|
_repl_fail_report += (
|
||||||
'You called `tractor.pause()` from an already cancelled scope!\n\n'
|
'You called `tractor.pause()` from an already cancelled scope!\n\n'
|
||||||
'Consider `await tractor.pause(shield=True)` to make it work B)\n'
|
'Consider `await tractor.pause(shield=True)` to make it work B)\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
_repl_fail_msg += f'on behalf of {repl_task} ??\n'
|
_repl_fail_report += f'on behalf of {repl_task} ??\n'
|
||||||
|
|
||||||
if _repl_fail_msg:
|
if _repl_fail_report:
|
||||||
log.exception(_repl_fail_msg)
|
log.exception(_repl_fail_report)
|
||||||
|
|
||||||
if not actor.is_infected_aio():
|
if not actor.is_infected_aio():
|
||||||
DebugStatus.release(cancel_req_task=True)
|
DebugStatus.release(cancel_req_task=True)
|
||||||
|
@ -3051,7 +3081,8 @@ async def maybe_wait_for_debugger(
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not debug_mode()
|
not debug_mode()
|
||||||
and not child_in_debug
|
and
|
||||||
|
not child_in_debug
|
||||||
):
|
):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -3109,7 +3140,7 @@ async def maybe_wait_for_debugger(
|
||||||
logmeth(
|
logmeth(
|
||||||
msg
|
msg
|
||||||
+
|
+
|
||||||
'\nRoot is waiting on tty lock to release from\n\n'
|
'\n^^ Root is waiting on tty lock release.. ^^\n'
|
||||||
# f'{caller_frame_info}\n'
|
# f'{caller_frame_info}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -3172,11 +3203,11 @@ async def maybe_wait_for_debugger(
|
||||||
@cm
|
@cm
|
||||||
def open_crash_handler(
|
def open_crash_handler(
|
||||||
catch: set[BaseException] = {
|
catch: set[BaseException] = {
|
||||||
# Exception,
|
|
||||||
BaseException,
|
BaseException,
|
||||||
},
|
},
|
||||||
ignore: set[BaseException] = {
|
ignore: set[BaseException] = {
|
||||||
KeyboardInterrupt,
|
KeyboardInterrupt,
|
||||||
|
trio.Cancelled,
|
||||||
},
|
},
|
||||||
tb_hide: bool = True,
|
tb_hide: bool = True,
|
||||||
):
|
):
|
||||||
|
|
Loading…
Reference in New Issue