More `TransportClosed`-handling around IPC-IO

For IPC-disconnects-during-teardown edge cases, augment some `._rpc`
machinery,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying `Channel` already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports-n-reraises the exc (same as prior behaviour).
  * originally i thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * hence the also-added-bu-masked-out `debug_filter` / guard expression
    around the `await debug._maybe_enter_pm()` line.
- show the `._invoke()` frame for the moment.
final_eg_refinements
Tyler Goodlet 2025-08-19 12:58:31 -04:00
parent 5ab642bdf0
commit 25d6738d03
1 changed files with 38 additions and 6 deletions

View File

@ -284,6 +284,10 @@ async def _errors_relayed_via_ipc(
try: try:
yield # run RPC invoke body yield # run RPC invoke body
except TransportClosed:
log.exception('Tpt disconnect during remote-exc relay?')
raise
# box and ship RPC errors for wire-transit via # box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel. # the task's requesting parent IPC-channel.
except ( except (
@ -319,6 +323,9 @@ async def _errors_relayed_via_ipc(
and debug_kbis and debug_kbis
) )
) )
# TODO? better then `debug_filter` below?
# and
# not isinstance(err, TransportClosed)
): ):
# XXX QUESTION XXX: is there any case where we'll # XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default? # want to debug IPC disconnects as a default?
@ -327,13 +334,25 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of # recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n' if _state.debug_mode():
f'|_{ctx}' log.exception(
) f'RPC task crashed!\n'
f'Attempting to enter debugger\n'
f'\n'
f'{ctx}'
)
entered_debug = await debug._maybe_enter_pm( entered_debug = await debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
) )
if not entered_debug: if not entered_debug:
# if we prolly should have entered the REPL but # if we prolly should have entered the REPL but
@ -450,7 +469,7 @@ async def _invoke(
kwargs: dict[str, Any], kwargs: dict[str, Any],
is_rpc: bool = True, is_rpc: bool = True,
hide_tb: bool = True, hide_tb: bool = False,
return_msg_type: Return|CancelAck = Return, return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[ task_status: TaskStatus[
@ -674,7 +693,20 @@ async def _invoke(
f'\n' f'\n'
f'{pretty_struct.pformat(return_msg)}\n' f'{pretty_struct.pformat(return_msg)}\n'
) )
await chan.send(return_msg) try:
await chan.send(return_msg)
except TransportClosed:
log.exception(
f"Failed send final result to 'parent'-side of IPC-ctx!\n"
f'\n'
f'{chan}\n'
f'Channel already disconnected ??\n'
f'\n'
f'{pretty_struct.pformat(return_msg)}'
)
# ?TODO? will this ever be true though?
if chan.connected():
raise
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,