Pass `tuple` from `._invoke()` unmasker usage
Since `maybe_raise_from_masking_exc()` now requires the general case
instead explicitly pass `unmask_from=(Cancelled,)` (yes i know it's the
current default).
Also add some extra `TransportClosed`-handling for some
IPC-disconnects-during-teardown edge cases,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying chan already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports the exc but raises it through (as prior).
  * I originally thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * Hence the masked-out `debug_filter` / guard expression around the
    `await debug._maybe_enter_pm()` line.
			
			
				to_asyncio_eoc_signal
			
			
		
							parent
							
								
									02062c5dc0
								
							
						
					
					
						commit
						c05d08e426
					
				|  | @ -284,6 +284,10 @@ async def _errors_relayed_via_ipc( | |||
|     try: | ||||
|         yield  # run RPC invoke body | ||||
| 
 | ||||
|     except TransportClosed: | ||||
|         log.exception('Tpt disconnect during remote-exc relay?') | ||||
|         raise | ||||
| 
 | ||||
|     # box and ship RPC errors for wire-transit via | ||||
|     # the task's requesting parent IPC-channel. | ||||
|     except ( | ||||
|  | @ -319,6 +323,9 @@ async def _errors_relayed_via_ipc( | |||
|                         and debug_kbis | ||||
|                     ) | ||||
|                 ) | ||||
|                 # TODO? better then `debug_filter` below? | ||||
|                 # and | ||||
|                 # not isinstance(err, TransportClosed) | ||||
|             ): | ||||
|                 # XXX QUESTION XXX: is there any case where we'll | ||||
|                 # want to debug IPC disconnects as a default? | ||||
|  | @ -327,13 +334,25 @@ async def _errors_relayed_via_ipc( | |||
|                 # recovery logic - the only case is some kind of | ||||
|                 # strange bug in our transport layer itself? Going | ||||
|                 # to keep this open ended for now. | ||||
|                 log.debug( | ||||
|                     'RPC task crashed, attempting to enter debugger\n' | ||||
|                     f'|_{ctx}' | ||||
|                 ) | ||||
| 
 | ||||
|                 if _state.debug_mode(): | ||||
|                     log.exception( | ||||
|                         f'RPC task crashed!\n' | ||||
|                         f'Attempting to enter debugger\n' | ||||
|                         f'\n' | ||||
|                         f'{ctx}' | ||||
|                     ) | ||||
| 
 | ||||
|                 entered_debug = await debug._maybe_enter_pm( | ||||
|                     err, | ||||
|                     api_frame=inspect.currentframe(), | ||||
| 
 | ||||
|                     # don't REPL any psuedo-expected tpt-disconnect | ||||
|                     # debug_filter=lambda exc: ( | ||||
|                     #     type (exc) not in { | ||||
|                     #         TransportClosed, | ||||
|                     #     } | ||||
|                     # ), | ||||
|                 ) | ||||
|                 if not entered_debug: | ||||
|                     # if we prolly should have entered the REPL but | ||||
|  | @ -450,7 +469,7 @@ async def _invoke( | |||
|     kwargs: dict[str, Any], | ||||
| 
 | ||||
|     is_rpc: bool = True, | ||||
|     hide_tb: bool = True, | ||||
|     hide_tb: bool = False, | ||||
|     return_msg_type: Return|CancelAck = Return, | ||||
| 
 | ||||
|     task_status: TaskStatus[ | ||||
|  | @ -655,7 +674,7 @@ async def _invoke( | |||
|                 # *should* never be interfered with!! | ||||
|                 maybe_raise_from_masking_exc( | ||||
|                     tn=tn, | ||||
|                     unmask_from=Cancelled, | ||||
|                     unmask_from=(Cancelled,), | ||||
|                 ) as _mbme,  # maybe boxed masked exc | ||||
|             ): | ||||
|                 ctx._scope_nursery = tn | ||||
|  | @ -675,7 +694,20 @@ async def _invoke( | |||
|                     f'\n' | ||||
|                     f'{pretty_struct.pformat(return_msg)}\n' | ||||
|                 ) | ||||
|                 await chan.send(return_msg) | ||||
|                 try: | ||||
|                     await chan.send(return_msg) | ||||
|                 except TransportClosed: | ||||
|                     log.exception( | ||||
|                         f"Failed send final result to 'parent'-side of IPC-ctx!\n" | ||||
|                         f'\n' | ||||
|                         f'{chan}\n' | ||||
|                         f'Channel already disconnected ??\n' | ||||
|                         f'\n' | ||||
|                         f'{pretty_struct.pformat(return_msg)}' | ||||
|                     ) | ||||
|                     # ?TODO? will this ever be true though? | ||||
|                     if chan.connected(): | ||||
|                         raise | ||||
| 
 | ||||
|             # NOTE: this happens IFF `ctx._scope.cancel()` is | ||||
|             # called by any of, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue