forked from goodboy/tractor
More failed REPL-lock-request refinements
In `lock_stdio_for_peer()` better internal-error handling/reporting: - only `Lock._blocked.remove(ctx.cid)` if that same cid was added on entry to avoid needless key-errors. - drop all `Lock.release(force: bool)` usage remnants. - if `req_ctx.cancel()` fails mention it with `ctx_err.add_note()`. - add more explicit internal-failed-request log messaging via a new `fail_reason: str`. - use and use new `x)<=\n|_` annots in any failure logging. Other cleanups/niceties: - drop `force: bool` flag entirely from the `Lock.release()`. - use more supervisor-op-annots in `.pdb()` logging with both `_pause/crash_msg: str` instead of double '|' lines when `.pdb()`-reported from `._set_trace()`/`._post_mortem()`.aio_abandons
parent
b46400a86f
commit
9be821a5cf
|
@ -299,7 +299,6 @@ class Lock:
|
||||||
@pdbp.hideframe
|
@pdbp.hideframe
|
||||||
def release(
|
def release(
|
||||||
cls,
|
cls,
|
||||||
force: bool = False,
|
|
||||||
raise_on_thread: bool = True,
|
raise_on_thread: bool = True,
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
@ -347,12 +346,9 @@ class Lock:
|
||||||
lock: trio.StrictFIFOLock = cls._debug_lock
|
lock: trio.StrictFIFOLock = cls._debug_lock
|
||||||
owner: Task = lock.statistics().owner
|
owner: Task = lock.statistics().owner
|
||||||
if (
|
if (
|
||||||
(lock.locked() or force)
|
lock.locked()
|
||||||
# ^-TODO-NOTE-^ should we just remove this, since the
|
and
|
||||||
# RTE case above will always happen when you force
|
(owner is task)
|
||||||
# from the wrong task?
|
|
||||||
|
|
||||||
and (owner is task)
|
|
||||||
# ^-NOTE-^ if we do NOT ensure this, `trio` will
|
# ^-NOTE-^ if we do NOT ensure this, `trio` will
|
||||||
# raise a RTE when a non-owner tries to releasee the
|
# raise a RTE when a non-owner tries to releasee the
|
||||||
# lock.
|
# lock.
|
||||||
|
@ -553,6 +549,7 @@ async def lock_stdio_for_peer(
|
||||||
# can try to avoid clobbering any connection from a child
|
# can try to avoid clobbering any connection from a child
|
||||||
# that's currently relying on it.
|
# that's currently relying on it.
|
||||||
we_finished = Lock.req_handler_finished = trio.Event()
|
we_finished = Lock.req_handler_finished = trio.Event()
|
||||||
|
lock_blocked: bool = False
|
||||||
try:
|
try:
|
||||||
if ctx.cid in Lock._blocked:
|
if ctx.cid in Lock._blocked:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
@ -565,7 +562,8 @@ async def lock_stdio_for_peer(
|
||||||
'Consider that an internal bug exists given the TTY '
|
'Consider that an internal bug exists given the TTY '
|
||||||
'`Lock`ing IPC dialog..\n'
|
'`Lock`ing IPC dialog..\n'
|
||||||
)
|
)
|
||||||
|
Lock._blocked.add(ctx.cid)
|
||||||
|
lock_blocked = True
|
||||||
root_task_name: str = current_task().name
|
root_task_name: str = current_task().name
|
||||||
if tuple(subactor_uid) in Lock._blocked:
|
if tuple(subactor_uid) in Lock._blocked:
|
||||||
log.warning(
|
log.warning(
|
||||||
|
@ -575,7 +573,11 @@ async def lock_stdio_for_peer(
|
||||||
)
|
)
|
||||||
ctx._enter_debugger_on_cancel: bool = False
|
ctx._enter_debugger_on_cancel: bool = False
|
||||||
message: str = (
|
message: str = (
|
||||||
f'Debug lock blocked for {subactor_uid}\n'
|
f'Debug lock blocked for subactor\n\n'
|
||||||
|
f'x)<= {subactor_uid}\n\n'
|
||||||
|
|
||||||
|
f'Likely because the root actor already started shutdown and is '
|
||||||
|
'closing IPC connections for this child!\n\n'
|
||||||
'Cancelling debug request!\n'
|
'Cancelling debug request!\n'
|
||||||
)
|
)
|
||||||
log.cancel(message)
|
log.cancel(message)
|
||||||
|
@ -589,7 +591,6 @@ async def lock_stdio_for_peer(
|
||||||
f'remote task: {subactor_task_uid}\n'
|
f'remote task: {subactor_task_uid}\n'
|
||||||
)
|
)
|
||||||
DebugStatus.shield_sigint()
|
DebugStatus.shield_sigint()
|
||||||
Lock._blocked.add(ctx.cid)
|
|
||||||
|
|
||||||
# NOTE: we use the IPC ctx's cancel scope directly in order to
|
# NOTE: we use the IPC ctx's cancel scope directly in order to
|
||||||
# ensure that on any transport failure, or cancellation request
|
# ensure that on any transport failure, or cancellation request
|
||||||
|
@ -648,30 +649,33 @@ async def lock_stdio_for_peer(
|
||||||
)
|
)
|
||||||
|
|
||||||
except BaseException as req_err:
|
except BaseException as req_err:
|
||||||
message: str = (
|
fail_reason: str = (
|
||||||
f'On behalf of remote peer {subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
|
f'on behalf of peer\n\n'
|
||||||
'Forcing `Lock.release()` for req-ctx since likely an '
|
f'x)<=\n'
|
||||||
'internal error!\n\n'
|
f' |_{subactor_task_uid!r}@{ctx.chan.uid!r}\n\n'
|
||||||
f'{ctx}'
|
|
||||||
|
'Forcing `Lock.release()` due to acquire failure!\n\n'
|
||||||
|
f'x)=> {ctx}\n'
|
||||||
)
|
)
|
||||||
if isinstance(req_err, trio.Cancelled):
|
if isinstance(req_err, trio.Cancelled):
|
||||||
message = (
|
fail_reason = (
|
||||||
'Cancelled during root TTY-lock dialog\n'
|
'Cancelled during stdio-mutex request '
|
||||||
+
|
+
|
||||||
message
|
fail_reason
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
message = (
|
fail_reason = (
|
||||||
'Errored during root TTY-lock dialog\n'
|
'Failed to deliver stdio-mutex request '
|
||||||
+
|
+
|
||||||
message
|
fail_reason
|
||||||
)
|
)
|
||||||
|
|
||||||
log.exception(message)
|
log.exception(fail_reason)
|
||||||
Lock.release() #force=True)
|
Lock.release()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
if lock_blocked:
|
||||||
Lock._blocked.remove(ctx.cid)
|
Lock._blocked.remove(ctx.cid)
|
||||||
|
|
||||||
# wakeup any waiters since the lock was (presumably)
|
# wakeup any waiters since the lock was (presumably)
|
||||||
|
@ -1167,7 +1171,7 @@ async def request_root_stdio_lock(
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Debug lock request was CANCELLED?\n\n'
|
'Debug lock request was CANCELLED?\n\n'
|
||||||
f'{req_ctx}\n'
|
f'<=c) {req_ctx}\n'
|
||||||
# f'{pformat_cs(req_cs, var_name="req_cs")}\n\n'
|
# f'{pformat_cs(req_cs, var_name="req_cs")}\n\n'
|
||||||
# f'{pformat_cs(req_ctx._scope, var_name="req_ctx._scope")}\n\n'
|
# f'{pformat_cs(req_ctx._scope, var_name="req_ctx._scope")}\n\n'
|
||||||
)
|
)
|
||||||
|
@ -1179,22 +1183,26 @@ async def request_root_stdio_lock(
|
||||||
message: str = (
|
message: str = (
|
||||||
'Failed during debug request dialog with root actor?\n\n'
|
'Failed during debug request dialog with root actor?\n\n'
|
||||||
)
|
)
|
||||||
|
if (req_ctx := DebugStatus.req_ctx):
|
||||||
if req_ctx:
|
|
||||||
message += (
|
message += (
|
||||||
f'{req_ctx}\n'
|
f'<=x) {req_ctx}\n\n'
|
||||||
f'Cancelling IPC ctx!\n'
|
f'Cancelling IPC ctx!\n'
|
||||||
)
|
)
|
||||||
|
try:
|
||||||
await req_ctx.cancel()
|
await req_ctx.cancel()
|
||||||
|
except trio.ClosedResourceError as terr:
|
||||||
|
ctx_err.add_note(
|
||||||
|
# f'Failed with {type(terr)!r} x)> `req_ctx.cancel()` '
|
||||||
|
f'Failed with `req_ctx.cancel()` <x) {type(terr)!r} '
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
message += 'Failed during `Portal.open_context()` ?\n'
|
message += 'Failed in `Portal.open_context()` call ??\n'
|
||||||
|
|
||||||
log.exception(message)
|
log.exception(message)
|
||||||
ctx_err.add_note(message)
|
ctx_err.add_note(message)
|
||||||
raise ctx_err
|
raise ctx_err
|
||||||
|
|
||||||
|
|
||||||
except (
|
except (
|
||||||
tractor.ContextCancelled,
|
tractor.ContextCancelled,
|
||||||
trio.Cancelled,
|
trio.Cancelled,
|
||||||
|
@ -1218,9 +1226,10 @@ async def request_root_stdio_lock(
|
||||||
# -[ ]FURTHER, after we 'continue', we should be able to
|
# -[ ]FURTHER, after we 'continue', we should be able to
|
||||||
# ctl-c out of the currently hanging task!
|
# ctl-c out of the currently hanging task!
|
||||||
raise DebugRequestError(
|
raise DebugRequestError(
|
||||||
'Failed to lock stdio from subactor IPC ctx!\n\n'
|
'Failed during stdio-locking dialog from root actor\n\n'
|
||||||
|
|
||||||
f'req_ctx: {DebugStatus.req_ctx}\n'
|
f'<=x)\n'
|
||||||
|
f'|_{DebugStatus.req_ctx}\n'
|
||||||
) from req_err
|
) from req_err
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -1998,10 +2007,10 @@ async def _pause(
|
||||||
# sanity, for when hackin on all this?
|
# sanity, for when hackin on all this?
|
||||||
if not isinstance(pause_err, trio.Cancelled):
|
if not isinstance(pause_err, trio.Cancelled):
|
||||||
req_ctx: Context = DebugStatus.req_ctx
|
req_ctx: Context = DebugStatus.req_ctx
|
||||||
if req_ctx:
|
# if req_ctx:
|
||||||
# XXX, bc the child-task in root might cancel it?
|
# # XXX, bc the child-task in root might cancel it?
|
||||||
# assert req_ctx._scope.cancel_called
|
# # assert req_ctx._scope.cancel_called
|
||||||
assert req_ctx.maybe_error
|
# assert req_ctx.maybe_error
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -2041,11 +2050,12 @@ def _set_trace(
|
||||||
# root here? Bo
|
# root here? Bo
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'{_pause_msg}\n'
|
f'{_pause_msg}\n'
|
||||||
'|\n'
|
# '|\n'
|
||||||
# TODO: more compact pformating?
|
f'>(\n'
|
||||||
|
f' |_ {task} @ {actor.uid}\n'
|
||||||
|
# ^-TODO-^ more compact pformating?
|
||||||
# -[ ] make an `Actor.__repr()__`
|
# -[ ] make an `Actor.__repr()__`
|
||||||
# -[ ] should we use `log.pformat_task_uid()`?
|
# -[ ] should we use `log.pformat_task_uid()`?
|
||||||
f'|_ {task} @ {actor.uid}\n'
|
|
||||||
)
|
)
|
||||||
# presuming the caller passed in the "api frame"
|
# presuming the caller passed in the "api frame"
|
||||||
# (the last frame before user code - like `.pause()`)
|
# (the last frame before user code - like `.pause()`)
|
||||||
|
@ -2541,8 +2551,8 @@ def _post_mortem(
|
||||||
# here! Bo
|
# here! Bo
|
||||||
log.pdb(
|
log.pdb(
|
||||||
f'{_crash_msg}\n'
|
f'{_crash_msg}\n'
|
||||||
'|\n'
|
# '|\n'
|
||||||
# f'|_ {current_task()}\n'
|
f'x>(\n'
|
||||||
f' |_ {current_task()} @ {actor.uid}\n'
|
f' |_ {current_task()} @ {actor.uid}\n'
|
||||||
|
|
||||||
# f'|_ @{actor.uid}\n'
|
# f'|_ @{actor.uid}\n'
|
||||||
|
|
Loading…
Reference in New Issue