forked from goodboy/tractor
				
			Factor non-yield stream msg processing into helper
Since both `MsgStream.receive()` and `.receive_nowait()` need the same raising logic when a non-stream msg arrives (so that maybe an appropriate IPC translated error can be raised) move the `KeyError` handler code into a new `._streaming._raise_from_no_yield_msg()` func and call it from both methods to make the error-interface-raising symmetrical across both methods.remotes/1757153874605917753/main
							parent
							
								
									6d10f0c516
								
							
						
					
					
						commit
						d4d09b6071
					
				|  | @ -54,6 +54,60 @@ log = get_logger(__name__) | |||
| #   messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]): | ||||
| # - use __slots__ on ``Context``? | ||||
| 
 | ||||
| def _raise_from_no_yield_msg( | ||||
|     stream: MsgStream, | ||||
|     msg: dict, | ||||
|     src_err: KeyError, | ||||
| 
 | ||||
| ) -> bool: | ||||
|     ''' | ||||
|     Raise an appopriate local error when a `MsgStream` msg arrives | ||||
|     which does not contain the expected (under normal operation) | ||||
|     `'yield'` field. | ||||
| 
 | ||||
|     ''' | ||||
|     # internal error should never get here | ||||
|     assert msg.get('cid'), ("Received internal error at portal?") | ||||
| 
 | ||||
|     # TODO: handle 2 cases with 3.10+ match syntax | ||||
|     # - 'stop' | ||||
|     # - 'error' | ||||
|     # possibly just handle msg['stop'] here! | ||||
| 
 | ||||
|     if stream._closed: | ||||
|         raise trio.ClosedResourceError('This stream was closed') | ||||
| 
 | ||||
|     if msg.get('stop') or stream._eoc: | ||||
|         log.debug(f"{stream} was stopped at remote end") | ||||
| 
 | ||||
|         # XXX: important to set so that a new ``.receive()`` | ||||
|         # call (likely by another task using a broadcast receiver) | ||||
|         # doesn't accidentally pull the ``return`` message | ||||
|         # value out of the underlying feed mem chan! | ||||
|         stream._eoc = True | ||||
| 
 | ||||
|         # # when the send is closed we assume the stream has | ||||
|         # # terminated and signal this local iterator to stop | ||||
|         # await stream.aclose() | ||||
| 
 | ||||
|         # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||
|         # raise a ``StopAsyncIteration`` **and** in our catch | ||||
|         # block below it will trigger ``.aclose()``. | ||||
|         raise trio.EndOfChannel from src_err | ||||
| 
 | ||||
|     # TODO: test that shows stream raising an expected error!!! | ||||
|     elif msg.get('error'): | ||||
|         # raise the error message | ||||
|         raise unpack_error(msg, stream._ctx.chan) | ||||
| 
 | ||||
|     # always re-raise the source error if no translation error | ||||
|     # case is activated above. | ||||
|     raise src_err | ||||
|     # raise RuntimeError( | ||||
|     #     'Unknown non-yield stream msg?\n' | ||||
|     #     f'{msg}' | ||||
|     # ) | ||||
| 
 | ||||
| 
 | ||||
| class MsgStream(trio.abc.Channel): | ||||
|     ''' | ||||
|  | @ -91,11 +145,20 @@ class MsgStream(trio.abc.Channel): | |||
|     # delegate directly to underlying mem channel | ||||
|     def receive_nowait(self): | ||||
|         msg = self._rx_chan.receive_nowait() | ||||
|         return msg['yield'] | ||||
|         try: | ||||
|             return msg['yield'] | ||||
|         except KeyError as kerr: | ||||
|             _raise_from_no_yield_msg( | ||||
|                 stream=self, | ||||
|                 msg=msg, | ||||
|                 src_err=kerr, | ||||
|             ) | ||||
| 
 | ||||
|     async def receive(self): | ||||
|         '''Async receive a single msg from the IPC transport, the next | ||||
|         in sequence for this stream. | ||||
|         ''' | ||||
|         Receive a single msg from the IPC transport, the next in | ||||
|         sequence sent by the far end task (possibly in order as | ||||
|         determined by the underlying protocol). | ||||
| 
 | ||||
|         ''' | ||||
|         # see ``.aclose()`` for notes on the old behaviour prior to | ||||
|  | @ -110,43 +173,12 @@ class MsgStream(trio.abc.Channel): | |||
|             msg = await self._rx_chan.receive() | ||||
|             return msg['yield'] | ||||
| 
 | ||||
|         except KeyError as err: | ||||
|             # internal error should never get here | ||||
|             assert msg.get('cid'), ("Received internal error at portal?") | ||||
| 
 | ||||
|             # TODO: handle 2 cases with 3.10 match syntax | ||||
|             # - 'stop' | ||||
|             # - 'error' | ||||
|             # possibly just handle msg['stop'] here! | ||||
| 
 | ||||
|             if self._closed: | ||||
|                 raise trio.ClosedResourceError('This stream was closed') | ||||
| 
 | ||||
|             if msg.get('stop') or self._eoc: | ||||
|                 log.debug(f"{self} was stopped at remote end") | ||||
| 
 | ||||
|                 # XXX: important to set so that a new ``.receive()`` | ||||
|                 # call (likely by another task using a broadcast receiver) | ||||
|                 # doesn't accidentally pull the ``return`` message | ||||
|                 # value out of the underlying feed mem chan! | ||||
|                 self._eoc = True | ||||
| 
 | ||||
|                 # # when the send is closed we assume the stream has | ||||
|                 # # terminated and signal this local iterator to stop | ||||
|                 # await self.aclose() | ||||
| 
 | ||||
|                 # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||
|                 # raise a ``StopAsyncIteration`` **and** in our catch | ||||
|                 # block below it will trigger ``.aclose()``. | ||||
|                 raise trio.EndOfChannel from err | ||||
| 
 | ||||
|             # TODO: test that shows stream raising an expected error!!! | ||||
|             elif msg.get('error'): | ||||
|                 # raise the error message | ||||
|                 raise unpack_error(msg, self._ctx.chan) | ||||
| 
 | ||||
|             else: | ||||
|                 raise | ||||
|         except KeyError as kerr: | ||||
|             _raise_from_no_yield_msg( | ||||
|                 stream=self, | ||||
|                 msg=msg, | ||||
|                 src_err=kerr, | ||||
|             ) | ||||
| 
 | ||||
|         except ( | ||||
|             trio.ClosedResourceError,  # by self._rx_chan | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue