forked from goodboy/tractor
				
			Quieter `Stop` handling on ctx result capture
In the `drain_to_final_msg()` impl, since a stream terminating gracefully requires this msg, there's really no reason to `log.cancel()` about it; go `.runtime()` level instead since we're trying de-noise under "normal operation". Also, - passthrough `hide_tb` to taskc-handler's `ctx.maybe_raise()` call. - raise `MessagingError` for the `MsgType` unmatched `case _:`. - detail the doc string motivation a little more.remotes/1757153874605917753/main
							parent
							
								
									950a2ec30f
								
							
						
					
					
						commit
						bac84a5e23
					
				|  | @ -374,7 +374,7 @@ class PldRx(Struct): | |||
| 
 | ||||
|             case _: | ||||
|                 src_err = InternalError( | ||||
|                     'Unknown IPC msg ??\n\n' | ||||
|                     'Invalid IPC msg ??\n\n' | ||||
|                     f'{msg}\n' | ||||
|                 ) | ||||
| 
 | ||||
|  | @ -499,7 +499,7 @@ async def maybe_limit_plds( | |||
|         yield None | ||||
|         return | ||||
| 
 | ||||
|     # sanity on scoping | ||||
|     # sanity check on IPC scoping | ||||
|     curr_ctx: Context = current_ipc_ctx() | ||||
|     assert ctx is curr_ctx | ||||
| 
 | ||||
|  | @ -510,6 +510,8 @@ async def maybe_limit_plds( | |||
|     ) as msgdec: | ||||
|         yield msgdec | ||||
| 
 | ||||
|     # when the applied spec is unwound/removed, the same IPC-ctx | ||||
|     # should still be in scope. | ||||
|     curr_ctx: Context = current_ipc_ctx() | ||||
|     assert ctx is curr_ctx | ||||
| 
 | ||||
|  | @ -525,16 +527,26 @@ async def drain_to_final_msg( | |||
|     list[MsgType] | ||||
| ]: | ||||
|     ''' | ||||
|     Drain IPC msgs delivered to the underlying IPC primitive's | ||||
|     rx-mem-chan (eg. `Context._rx_chan`) from the runtime in | ||||
|     search for a final result or error. | ||||
|     Drain IPC msgs delivered to the underlying IPC context's | ||||
|     rx-mem-chan (i.e. from `Context._rx_chan`) in search for a final | ||||
|     `Return` or `Error` msg. | ||||
| 
 | ||||
|     The motivation here is to ideally capture errors during ctxc | ||||
|     conditions where a canc-request/or local error is sent but the | ||||
|     local task also excepts and enters the | ||||
|     `Portal.open_context().__aexit__()` block wherein we prefer to | ||||
|     capture and raise any remote error or ctxc-ack as part of the | ||||
|     `ctx.result()` cleanup and teardown sequence. | ||||
|     Deliver the `Return` + preceding drained msgs (`list[MsgType]`) | ||||
|     as a pair unless an `Error` is found, in which unpack and raise | ||||
|     it. | ||||
| 
 | ||||
|     The motivation here is to always capture any remote error relayed | ||||
|     by the remote peer task during a ctxc condition. | ||||
| 
 | ||||
|     For eg. a ctxc-request may be sent to the peer as part of the | ||||
|     local task's (request for) cancellation but then that same task | ||||
|     **also errors** before executing the teardown in the | ||||
|     `Portal.open_context().__aexit__()` block. In such error-on-exit | ||||
|     cases we want to always capture and raise any delivered remote | ||||
|     error (like an expected ctxc-ACK) as part of the final | ||||
|     `ctx.wait_for_result()` teardown sequence such that the | ||||
|     `Context.outcome` related state always reflect what transpired | ||||
|     even after ctx closure and the `.open_context()` block exit. | ||||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__: bool = hide_tb | ||||
|  | @ -572,7 +584,6 @@ async def drain_to_final_msg( | |||
|             # |_from tractor.devx._debug import pause | ||||
|             #   await pause() | ||||
| 
 | ||||
| 
 | ||||
|         # NOTE: we get here if the far end was | ||||
|         # `ContextCancelled` in 2 cases: | ||||
|         # 1. we requested the cancellation and thus | ||||
|  | @ -580,13 +591,13 @@ async def drain_to_final_msg( | |||
|         # 2. WE DID NOT REQUEST that cancel and thus | ||||
|         #    SHOULD RAISE HERE! | ||||
|         except trio.Cancelled as taskc: | ||||
| 
 | ||||
|             # CASE 2: mask the local cancelled-error(s) | ||||
|             # only when we are sure the remote error is | ||||
|             # the source cause of this local task's | ||||
|             # cancellation. | ||||
|             ctx.maybe_raise( | ||||
|                 # TODO: when use this/ | ||||
|                 hide_tb=hide_tb, | ||||
|                 # TODO: when use this? | ||||
|                 # from_src_exc=taskc, | ||||
|             ) | ||||
| 
 | ||||
|  | @ -659,7 +670,7 @@ async def drain_to_final_msg( | |||
|             # Stop() | ||||
|             case Stop(): | ||||
|                 pre_result_drained.append(msg) | ||||
|                 log.cancel( | ||||
|                 log.runtime(  # normal/expected shutdown transaction | ||||
|                     'Remote stream terminated due to "stop" msg:\n\n' | ||||
|                     f'{pretty_struct.pformat(msg)}\n' | ||||
|                 ) | ||||
|  | @ -719,13 +730,19 @@ async def drain_to_final_msg( | |||
|                 pre_result_drained.append(msg) | ||||
|                 # It's definitely an internal error if any other | ||||
|                 # msg type without a`'cid'` field arrives here! | ||||
|                 report: str = ( | ||||
|                     f'Invalid or unknown msg type {type(msg)!r}!?\n' | ||||
|                 ) | ||||
|                 if not msg.cid: | ||||
|                     raise InternalError( | ||||
|                         'Unexpected cid-missing msg?\n\n' | ||||
|                         f'{msg}\n' | ||||
|                     report += ( | ||||
|                         '\nWhich also has no `.cid` field?\n' | ||||
|                     ) | ||||
| 
 | ||||
|                 raise RuntimeError('Unknown msg type: {msg}') | ||||
|                 raise MessagingError( | ||||
|                     report | ||||
|                     + | ||||
|                     f'\n{msg}\n' | ||||
|                 ) | ||||
| 
 | ||||
|     else: | ||||
|         log.cancel( | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue