Pass `tuple` from `._invoke()` unmasker usage

Since `maybe_raise_from_masking_exc()` now requires the general case
instead explicitly pass `unmask_from=(Cancelled,)` (yes i know it's the
current default).

Also add some extra `TransportClosed`-handling for some
IPC-disconnects-during-teardown edge cases,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying chan already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports the exc but raises it through (as prior).
  * I originally thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * Hence the masked-out `debug_filter` / guard expression around the
    `await debug._maybe_enter_pm()` line.
to_asyncio_eoc_signal
Tyler Goodlet 2025-07-25 10:52:06 -04:00
parent 02062c5dc0
commit c05d08e426
1 changed files with 39 additions and 7 deletions

View File

@ -284,6 +284,10 @@ async def _errors_relayed_via_ipc(
try:
yield # run RPC invoke body
except TransportClosed:
log.exception('Tpt disconnect during remote-exc relay?')
raise
# box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel.
except (
@ -319,6 +323,9 @@ async def _errors_relayed_via_ipc(
and debug_kbis
)
)
# TODO? better then `debug_filter` below?
# and
# not isinstance(err, TransportClosed)
):
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
@ -327,13 +334,25 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going
# to keep this open ended for now.
log.debug(
'RPC task crashed, attempting to enter debugger\n'
f'|_{ctx}'
)
if _state.debug_mode():
log.exception(
f'RPC task crashed!\n'
f'Attempting to enter debugger\n'
f'\n'
f'{ctx}'
)
entered_debug = await debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
)
if not entered_debug:
# if we prolly should have entered the REPL but
@ -450,7 +469,7 @@ async def _invoke(
kwargs: dict[str, Any],
is_rpc: bool = True,
hide_tb: bool = True,
hide_tb: bool = False,
return_msg_type: Return|CancelAck = Return,
task_status: TaskStatus[
@ -655,7 +674,7 @@ async def _invoke(
# *should* never be interfered with!!
maybe_raise_from_masking_exc(
tn=tn,
unmask_from=Cancelled,
unmask_from=(Cancelled,),
) as _mbme, # maybe boxed masked exc
):
ctx._scope_nursery = tn
@ -675,7 +694,20 @@ async def _invoke(
f'\n'
f'{pretty_struct.pformat(return_msg)}\n'
)
await chan.send(return_msg)
try:
await chan.send(return_msg)
except TransportClosed:
log.exception(
f"Failed send final result to 'parent'-side of IPC-ctx!\n"
f'\n'
f'{chan}\n'
f'Channel already disconnected ??\n'
f'\n'
f'{pretty_struct.pformat(return_msg)}'
)
# ?TODO? will this ever be true though?
if chan.connected():
raise
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,