More `TransportClosed`-handling around IPC-IO
For IPC-disconnects-during-teardown edge cases, augment some `._rpc`
machinery,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying `Channel` already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports-n-reraises the exc (same as prior behaviour).
  * originally i thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * hence the also-added-bu-masked-out `debug_filter` / guard expression
    around the `await debug._maybe_enter_pm()` line.
- show the `._invoke()` frame for the moment.
			
			
				final_eg_refinements
			
			
		
							parent
							
								
									8842b758d7
								
							
						
					
					
						commit
						c538cb3004
					
				|  | @ -284,6 +284,10 @@ async def _errors_relayed_via_ipc( | ||||||
|     try: |     try: | ||||||
|         yield  # run RPC invoke body |         yield  # run RPC invoke body | ||||||
| 
 | 
 | ||||||
|  |     except TransportClosed: | ||||||
|  |         log.exception('Tpt disconnect during remote-exc relay?') | ||||||
|  |         raise | ||||||
|  | 
 | ||||||
|     # box and ship RPC errors for wire-transit via |     # box and ship RPC errors for wire-transit via | ||||||
|     # the task's requesting parent IPC-channel. |     # the task's requesting parent IPC-channel. | ||||||
|     except ( |     except ( | ||||||
|  | @ -319,6 +323,9 @@ async def _errors_relayed_via_ipc( | ||||||
|                         and debug_kbis |                         and debug_kbis | ||||||
|                     ) |                     ) | ||||||
|                 ) |                 ) | ||||||
|  |                 # TODO? better then `debug_filter` below? | ||||||
|  |                 # and | ||||||
|  |                 # not isinstance(err, TransportClosed) | ||||||
|             ): |             ): | ||||||
|                 # XXX QUESTION XXX: is there any case where we'll |                 # XXX QUESTION XXX: is there any case where we'll | ||||||
|                 # want to debug IPC disconnects as a default? |                 # want to debug IPC disconnects as a default? | ||||||
|  | @ -327,13 +334,25 @@ async def _errors_relayed_via_ipc( | ||||||
|                 # recovery logic - the only case is some kind of |                 # recovery logic - the only case is some kind of | ||||||
|                 # strange bug in our transport layer itself? Going |                 # strange bug in our transport layer itself? Going | ||||||
|                 # to keep this open ended for now. |                 # to keep this open ended for now. | ||||||
|                 log.debug( | 
 | ||||||
|                     'RPC task crashed, attempting to enter debugger\n' |                 if _state.debug_mode(): | ||||||
|                     f'|_{ctx}' |                     log.exception( | ||||||
|  |                         f'RPC task crashed!\n' | ||||||
|  |                         f'Attempting to enter debugger\n' | ||||||
|  |                         f'\n' | ||||||
|  |                         f'{ctx}' | ||||||
|                     ) |                     ) | ||||||
|  | 
 | ||||||
|                 entered_debug = await debug._maybe_enter_pm( |                 entered_debug = await debug._maybe_enter_pm( | ||||||
|                     err, |                     err, | ||||||
|                     api_frame=inspect.currentframe(), |                     api_frame=inspect.currentframe(), | ||||||
|  | 
 | ||||||
|  |                     # don't REPL any psuedo-expected tpt-disconnect | ||||||
|  |                     # debug_filter=lambda exc: ( | ||||||
|  |                     #     type (exc) not in { | ||||||
|  |                     #         TransportClosed, | ||||||
|  |                     #     } | ||||||
|  |                     # ), | ||||||
|                 ) |                 ) | ||||||
|                 if not entered_debug: |                 if not entered_debug: | ||||||
|                     # if we prolly should have entered the REPL but |                     # if we prolly should have entered the REPL but | ||||||
|  | @ -450,7 +469,7 @@ async def _invoke( | ||||||
|     kwargs: dict[str, Any], |     kwargs: dict[str, Any], | ||||||
| 
 | 
 | ||||||
|     is_rpc: bool = True, |     is_rpc: bool = True, | ||||||
|     hide_tb: bool = True, |     hide_tb: bool = False, | ||||||
|     return_msg_type: Return|CancelAck = Return, |     return_msg_type: Return|CancelAck = Return, | ||||||
| 
 | 
 | ||||||
|     task_status: TaskStatus[ |     task_status: TaskStatus[ | ||||||
|  | @ -674,7 +693,20 @@ async def _invoke( | ||||||
|                     f'\n' |                     f'\n' | ||||||
|                     f'{pretty_struct.pformat(return_msg)}\n' |                     f'{pretty_struct.pformat(return_msg)}\n' | ||||||
|                 ) |                 ) | ||||||
|  |                 try: | ||||||
|                     await chan.send(return_msg) |                     await chan.send(return_msg) | ||||||
|  |                 except TransportClosed: | ||||||
|  |                     log.exception( | ||||||
|  |                         f"Failed send final result to 'parent'-side of IPC-ctx!\n" | ||||||
|  |                         f'\n' | ||||||
|  |                         f'{chan}\n' | ||||||
|  |                         f'Channel already disconnected ??\n' | ||||||
|  |                         f'\n' | ||||||
|  |                         f'{pretty_struct.pformat(return_msg)}' | ||||||
|  |                     ) | ||||||
|  |                     # ?TODO? will this ever be true though? | ||||||
|  |                     if chan.connected(): | ||||||
|  |                         raise | ||||||
| 
 | 
 | ||||||
|             # NOTE: this happens IFF `ctx._scope.cancel()` is |             # NOTE: this happens IFF `ctx._scope.cancel()` is | ||||||
|             # called by any of, |             # called by any of, | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue