forked from goodboy/tractor
				
			Raise a `MessagingError` from the src error on msging edge cases
							parent
							
								
									8cb8390201
								
							
						
					
					
						commit
						04217f319a
					
				|  | @ -23,6 +23,7 @@ The machinery and types behind ``Context.open_stream()`` | |||
| from __future__ import annotations | ||||
| import inspect | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from pprint import pformat | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Callable, | ||||
|  | @ -35,6 +36,7 @@ import trio | |||
| 
 | ||||
| from ._exceptions import ( | ||||
|     unpack_error, | ||||
|     MessagingError, | ||||
| ) | ||||
| from .log import get_logger | ||||
| from .trionics import ( | ||||
|  | @ -66,6 +68,8 @@ def _raise_from_no_yield_msg( | |||
|     `'yield'` field. | ||||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__: bool = True | ||||
| 
 | ||||
|     # internal error should never get here | ||||
|     assert msg.get('cid'), ("Received internal error at portal?") | ||||
| 
 | ||||
|  | @ -73,18 +77,22 @@ def _raise_from_no_yield_msg( | |||
|     # - 'stop' | ||||
|     # - 'error' | ||||
|     # possibly just handle msg['stop'] here! | ||||
|     # breakpoint() | ||||
| 
 | ||||
|     if stream._closed: | ||||
|         raise trio.ClosedResourceError('This stream was closed') | ||||
| 
 | ||||
|     if msg.get('stop') or stream._eoc: | ||||
|         log.debug(f"{stream} was stopped at remote end") | ||||
|     if ( | ||||
|         msg.get('stop') | ||||
|         or stream._eoc | ||||
|     ): | ||||
|         log.debug(f'{stream} was stopped at remote end') | ||||
| 
 | ||||
|         # XXX: important to set so that a new ``.receive()`` | ||||
|         # call (likely by another task using a broadcast receiver) | ||||
|         # doesn't accidentally pull the ``return`` message | ||||
|         # value out of the underlying feed mem chan! | ||||
|         stream._eoc = True | ||||
|         stream._eoc: bool = True | ||||
| 
 | ||||
|         # # when the send is closed we assume the stream has | ||||
|         # # terminated and signal this local iterator to stop | ||||
|  | @ -93,20 +101,24 @@ def _raise_from_no_yield_msg( | |||
|         # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||
|         # raise a ``StopAsyncIteration`` **and** in our catch | ||||
|         # block below it will trigger ``.aclose()``. | ||||
|         raise trio.EndOfChannel from src_err | ||||
|         raise trio.EndOfChannel( | ||||
|                 'Stream ended due to msg:\n' | ||||
|                 f'{pformat(msg)}' | ||||
|         ) from src_err | ||||
| 
 | ||||
|     # TODO: test that shows stream raising an expected error!!! | ||||
|     elif msg.get('error'): | ||||
|         # raise the error message | ||||
|         raise unpack_error(msg, stream._ctx.chan) | ||||
| 
 | ||||
|     # always re-raise the source error if no translation error | ||||
|     # case is activated above. | ||||
|     raise src_err | ||||
|     # raise RuntimeError( | ||||
|     #     'Unknown non-yield stream msg?\n' | ||||
|     #     f'{msg}' | ||||
|     # ) | ||||
|     # always re-raise the source error if no translation error case | ||||
|     # is activated above. | ||||
|     raise MessagingError( | ||||
|         f'Context received unexpected non-error msg!?\n' | ||||
|         f'cid: {cid}\n' | ||||
|         'received msg:\n' | ||||
|         f'{pformat(msg)}' | ||||
|     ) from src_err | ||||
| 
 | ||||
| 
 | ||||
| class MsgStream(trio.abc.Channel): | ||||
|  | @ -161,6 +173,16 @@ class MsgStream(trio.abc.Channel): | |||
|         determined by the underlying protocol). | ||||
| 
 | ||||
|         ''' | ||||
|         # NOTE: `trio.ReceiveChannel` implements | ||||
|         # EOC handling as follows (aka uses it | ||||
|         # to gracefully exit async for loops): | ||||
|         # | ||||
|         # async def __anext__(self) -> ReceiveType: | ||||
|         #     try: | ||||
|         #         return await self.receive() | ||||
|         #     except trio.EndOfChannel: | ||||
|         #         raise StopAsyncIteration | ||||
| 
 | ||||
|         # see ``.aclose()`` for notes on the old behaviour prior to | ||||
|         # introducing this | ||||
|         if self._eoc: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue