forked from goodboy/tractor
				
			Set any `._eoc` to the err in `_raise_from_no_key_in_msg()`
Since that's what we're now doing in `MsgStream._eoc` internal assignments (coming in future patch), do the same in this exception re-raise-helper and include more extensive doc string detailing all the msg-type-to-raised-error cases. Also expose a `hide_tb: bool` like we have already in `unpack_error()`.remotes/1757153874605917753/main
							parent
							
								
									e403d63eb7
								
							
						
					
					
						commit
						ca1b8e0224
					
				|  | @ -237,8 +237,10 @@ def pack_error( | |||
| def unpack_error( | ||||
| 
 | ||||
|     msg: dict[str, Any], | ||||
| 
 | ||||
|     chan=None, | ||||
|     err_type=RemoteActorError, | ||||
| 
 | ||||
|     hide_tb: bool = True, | ||||
| 
 | ||||
| ) -> None|Exception: | ||||
|  | @ -314,37 +316,61 @@ def _raise_from_no_key_in_msg( | |||
|     msg: dict, | ||||
|     src_err: KeyError, | ||||
|     log: StackLevelAdapter,  # caller specific `log` obj | ||||
| 
 | ||||
|     expect_key: str = 'yield', | ||||
|     stream: MsgStream | None = None, | ||||
| 
 | ||||
|     # allow "deeper" tbs when debugging B^o | ||||
|     hide_tb: bool = True, | ||||
| 
 | ||||
| ) -> bool: | ||||
|     ''' | ||||
|     Raise an appopriate local error when a `MsgStream` msg arrives | ||||
|     which does not contain the expected (under normal operation) | ||||
|     `'yield'` field. | ||||
|     Raise an appopriate local error when a | ||||
|     `MsgStream` msg arrives which does not | ||||
|     contain the expected (at least under normal | ||||
|     operation) `'yield'` field. | ||||
| 
 | ||||
|     `Context` and any embedded `MsgStream` termination, | ||||
|     as well as remote task errors are handled in order | ||||
|     of priority as: | ||||
| 
 | ||||
|     - any 'error' msg is re-boxed and raised locally as | ||||
|       -> `RemoteActorError`|`ContextCancelled` | ||||
| 
 | ||||
|     - a `MsgStream` 'stop' msg is constructed, assigned | ||||
|       and raised locally as -> `trio.EndOfChannel` | ||||
| 
 | ||||
|     - All other mis-keyed msgss (like say a "final result" | ||||
|       'return' msg, normally delivered from `Context.result()`) | ||||
|       are re-boxed inside a `MessagingError` with an explicit | ||||
|       exc content describing the missing IPC-msg-key. | ||||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__: bool = True | ||||
|     __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|     # internal error should never get here | ||||
|     # an internal error should never get here | ||||
|     try: | ||||
|         cid: str = msg['cid'] | ||||
|     except KeyError as src_err: | ||||
|         raise MessagingError( | ||||
|             f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n' | ||||
|             f'cid: {cid}\n' | ||||
|             'received msg:\n' | ||||
|             f'cid: {cid}\n\n' | ||||
| 
 | ||||
|             f'{pformat(msg)}\n' | ||||
|         ) from src_err | ||||
| 
 | ||||
|     # TODO: test that shows stream raising an expected error!!! | ||||
| 
 | ||||
|     # raise the error message in a boxed exception type! | ||||
|     if msg.get('error'): | ||||
|         # raise the error message | ||||
|         raise unpack_error( | ||||
|             msg, | ||||
|             ctx.chan, | ||||
|             hide_tb=hide_tb, | ||||
| 
 | ||||
|         ) from None | ||||
| 
 | ||||
|     # `MsgStream` termination msg. | ||||
|     elif ( | ||||
|         msg.get('stop') | ||||
|         or ( | ||||
|  | @ -357,29 +383,26 @@ def _raise_from_no_key_in_msg( | |||
|             f'cid: {cid}\n' | ||||
|         ) | ||||
| 
 | ||||
|         # XXX: important to set so that a new ``.receive()`` | ||||
|         # call (likely by another task using a broadcast receiver) | ||||
|         # doesn't accidentally pull the ``return`` message | ||||
|         # value out of the underlying feed mem chan! | ||||
|         stream._eoc: bool = True | ||||
| 
 | ||||
|         # TODO: if the a local task is already blocking on | ||||
|         # a `Context.result()` and thus a `.receive()` on the | ||||
|         # rx-chan, we close the chan and set state ensuring that | ||||
|         # an eoc is raised! | ||||
| 
 | ||||
|         # # when the send is closed we assume the stream has | ||||
|         # # terminated and signal this local iterator to stop | ||||
|         # await stream.aclose() | ||||
| 
 | ||||
|         # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||
|         # raise a ``StopAsyncIteration`` **and** in our catch | ||||
|         # block below it will trigger ``.aclose()``. | ||||
|         raise trio.EndOfChannel( | ||||
|         eoc = trio.EndOfChannel( | ||||
|             f'Context stream ended due to msg:\n\n' | ||||
|             f'{pformat(msg)}\n' | ||||
|         ) from src_err | ||||
|         ) | ||||
|         # XXX: important to set so that a new `.receive()` | ||||
|         # call (likely by another task using a broadcast receiver) | ||||
|         # doesn't accidentally pull the `return` message | ||||
|         # value out of the underlying feed mem chan which is | ||||
|         # destined for the `Context.result()` call during ctx-exit! | ||||
|         stream._eoc: Exception = eoc | ||||
| 
 | ||||
|         raise eoc from src_err | ||||
| 
 | ||||
|     if ( | ||||
|         stream | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue