forked from goodboy/tractor
				
			Move missing-key-in-msg raiser to `._exceptions`
Since we use basically the exact same set of logic in `Portal.open_context()` when expecting the first `'started'` msg factor and generalize `._streaming._raise_from_no_yield_msg()` into a new `._exceptions._raise_from_no_key_in_msg()` (as per the lingering todo) which obvi requires a more generalized / optional signature including a caller specific `log` obj. Obvi call the new func from all the other modules X)remotes/1757153874605917753/main
							parent
							
								
									914efd80eb
								
							
						
					
					
						commit
						27c5ffe5a7
					
				|  | @ -44,9 +44,11 @@ import warnings | |||
| import trio | ||||
| 
 | ||||
| from ._exceptions import ( | ||||
|     # _raise_from_no_key_in_msg, | ||||
|     unpack_error, | ||||
|     pack_error, | ||||
|     ContextCancelled, | ||||
|     # MessagingError, | ||||
|     StreamOverrun, | ||||
| ) | ||||
| from .log import get_logger | ||||
|  |  | |||
|  | @ -14,16 +14,18 @@ | |||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| """ | ||||
| ''' | ||||
| Our classy exception set. | ||||
| 
 | ||||
| """ | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| import builtins | ||||
| import importlib | ||||
| from pprint import pformat | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Type, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| import traceback | ||||
| 
 | ||||
|  | @ -32,6 +34,11 @@ import trio | |||
| 
 | ||||
| from ._state import current_actor | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._context import Context | ||||
|     from ._stream import MsgStream | ||||
|     from .log import StackLevelAdapter | ||||
| 
 | ||||
| _this_mod = importlib.import_module(__name__) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -246,3 +253,88 @@ def is_multi_cancelled(exc: BaseException) -> bool: | |||
|         ) is not None | ||||
| 
 | ||||
|     return False | ||||
| 
 | ||||
| 
 | ||||
| def _raise_from_no_key_in_msg( | ||||
|     ctx: Context, | ||||
|     msg: dict, | ||||
|     src_err: KeyError, | ||||
|     log: StackLevelAdapter,  # caller specific `log` obj | ||||
|     expect_key: str = 'yield', | ||||
|     stream: MsgStream | None = None, | ||||
| 
 | ||||
| ) -> bool: | ||||
|     ''' | ||||
|     Raise an appopriate local error when a `MsgStream` msg arrives | ||||
|     which does not contain the expected (under normal operation) | ||||
|     `'yield'` field. | ||||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__: bool = True | ||||
| 
 | ||||
|     # internal error should never get here | ||||
|     try: | ||||
|         cid: str = msg['cid'] | ||||
|     except KeyError as src_err: | ||||
|         raise MessagingError( | ||||
|             f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n' | ||||
|             f'cid: {cid}\n' | ||||
|             'received msg:\n' | ||||
|             f'{pformat(msg)}\n' | ||||
|         ) from src_err | ||||
| 
 | ||||
|     # TODO: test that shows stream raising an expected error!!! | ||||
|     if msg.get('error'): | ||||
|         # raise the error message | ||||
|         raise unpack_error( | ||||
|             msg, | ||||
|             ctx.chan, | ||||
|         ) from None | ||||
| 
 | ||||
|     elif ( | ||||
|         msg.get('stop') | ||||
|         or ( | ||||
|             stream | ||||
|             and stream._eoc | ||||
|         ) | ||||
|     ): | ||||
|         log.debug( | ||||
|             f'Context[{cid}] stream was stopped by remote side\n' | ||||
|             f'cid: {cid}\n' | ||||
|         ) | ||||
| 
 | ||||
|         # XXX: important to set so that a new ``.receive()`` | ||||
|         # call (likely by another task using a broadcast receiver) | ||||
|         # doesn't accidentally pull the ``return`` message | ||||
|         # value out of the underlying feed mem chan! | ||||
|         stream._eoc: bool = True | ||||
| 
 | ||||
|         # # when the send is closed we assume the stream has | ||||
|         # # terminated and signal this local iterator to stop | ||||
|         # await stream.aclose() | ||||
| 
 | ||||
|         # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||
|         # raise a ``StopAsyncIteration`` **and** in our catch | ||||
|         # block below it will trigger ``.aclose()``. | ||||
|         raise trio.EndOfChannel( | ||||
|                 'Context[{cid}] stream ended due to msg:\n' | ||||
|                 f'{pformat(msg)}' | ||||
|         ) from src_err | ||||
| 
 | ||||
| 
 | ||||
|     if ( | ||||
|         stream | ||||
|         and stream._closed | ||||
|     ): | ||||
|         raise trio.ClosedResourceError('This stream was closed') | ||||
| 
 | ||||
| 
 | ||||
|     # always re-raise the source error if no translation error case | ||||
|     # is activated above. | ||||
|     _type: str = 'Stream' if stream else 'Context' | ||||
|     raise MessagingError( | ||||
|         f'{_type} was expecting a `{expect_key}` message' | ||||
|         ' BUT received a non-`error` msg:\n' | ||||
|         f'cid: {cid}\n' | ||||
|         '{pformat(msg)}' | ||||
|     ) from src_err | ||||
|  |  | |||
|  | @ -33,7 +33,6 @@ from typing import ( | |||
| ) | ||||
| from functools import partial | ||||
| from dataclasses import dataclass | ||||
| from pprint import pformat | ||||
| import warnings | ||||
| 
 | ||||
| import trio | ||||
|  | @ -45,13 +44,17 @@ from ._ipc import Channel | |||
| from .log import get_logger | ||||
| from .msg import NamespacePath | ||||
| from ._exceptions import ( | ||||
|     _raise_from_no_key_in_msg, | ||||
|     unpack_error, | ||||
|     NoResult, | ||||
|     ContextCancelled, | ||||
|     MessagingError, | ||||
| ) | ||||
| from ._context import Context | ||||
| from ._streaming import MsgStream | ||||
| from ._context import ( | ||||
|     Context, | ||||
| ) | ||||
| from ._streaming import ( | ||||
|     MsgStream, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
|  | @ -464,26 +467,15 @@ class Portal: | |||
|             first: Any = msg['started'] | ||||
|             ctx._started_called: bool = True | ||||
| 
 | ||||
|         except KeyError: | ||||
|         except KeyError as src_error: | ||||
| 
 | ||||
|             # TODO: can we maybe factor this into the new raiser | ||||
|             # `_streaming._raise_from_no_yield_msg()` and make that | ||||
|             # helper more generic, say with a `_no_<blah>_msg()`? | ||||
|             if not (cid := msg.get('cid')): | ||||
|                 raise MessagingError( | ||||
|                     'Received internal error at context?\n' | ||||
|                     'No call-id (cid) in startup msg?' | ||||
|                 ) | ||||
| 
 | ||||
|             if msg.get('error'): | ||||
|                 # NOTE: mask the key error with the remote one | ||||
|                 raise unpack_error(msg, self.channel) from None | ||||
|             else: | ||||
|                 raise MessagingError( | ||||
|                     f'Context for {cid} was expecting a `started` message' | ||||
|                     ' but received a non-error msg:\n' | ||||
|                     f'{pformat(msg)}' | ||||
|                 ) | ||||
|             _raise_from_no_key_in_msg( | ||||
|                 ctx=ctx, | ||||
|                 msg=msg, | ||||
|                 src_err=src_error, | ||||
|                 log=log, | ||||
|                 expect_key='started', | ||||
|             ) | ||||
| 
 | ||||
|         ctx._portal: Portal = self | ||||
|         uid: tuple = self.channel.uid | ||||
|  |  | |||
|  | @ -23,7 +23,6 @@ The machinery and types behind ``Context.open_stream()`` | |||
| from __future__ import annotations | ||||
| import inspect | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from pprint import pformat | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Callable, | ||||
|  | @ -35,8 +34,7 @@ import warnings | |||
| import trio | ||||
| 
 | ||||
| from ._exceptions import ( | ||||
|     unpack_error, | ||||
|     MessagingError, | ||||
|     _raise_from_no_key_in_msg, | ||||
| ) | ||||
| from .log import get_logger | ||||
| from .trionics import ( | ||||
|  | @ -56,71 +54,6 @@ log = get_logger(__name__) | |||
| #   messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]): | ||||
| # - use __slots__ on ``Context``? | ||||
| 
 | ||||
| def _raise_from_no_yield_msg( | ||||
|     stream: MsgStream, | ||||
|     msg: dict, | ||||
|     src_err: KeyError, | ||||
| 
 | ||||
| ) -> bool: | ||||
|     ''' | ||||
|     Raise an appopriate local error when a `MsgStream` msg arrives | ||||
|     which does not contain the expected (under normal operation) | ||||
|     `'yield'` field. | ||||
| 
 | ||||
|     ''' | ||||
|     __tracebackhide__: bool = True | ||||
| 
 | ||||
|     # internal error should never get here | ||||
|     assert msg.get('cid'), ("Received internal error at portal?") | ||||
| 
 | ||||
|     # TODO: handle 2 cases with 3.10+ match syntax | ||||
|     # - 'stop' | ||||
|     # - 'error' | ||||
|     # possibly just handle msg['stop'] here! | ||||
|     # breakpoint() | ||||
| 
 | ||||
|     if stream._closed: | ||||
|         raise trio.ClosedResourceError('This stream was closed') | ||||
| 
 | ||||
|     if ( | ||||
|         msg.get('stop') | ||||
|         or stream._eoc | ||||
|     ): | ||||
|         log.debug(f'{stream} was stopped at remote end') | ||||
| 
 | ||||
|         # XXX: important to set so that a new ``.receive()`` | ||||
|         # call (likely by another task using a broadcast receiver) | ||||
|         # doesn't accidentally pull the ``return`` message | ||||
|         # value out of the underlying feed mem chan! | ||||
|         stream._eoc: bool = True | ||||
| 
 | ||||
|         # # when the send is closed we assume the stream has | ||||
|         # # terminated and signal this local iterator to stop | ||||
|         # await stream.aclose() | ||||
| 
 | ||||
|         # XXX: this causes ``ReceiveChannel.__anext__()`` to | ||||
|         # raise a ``StopAsyncIteration`` **and** in our catch | ||||
|         # block below it will trigger ``.aclose()``. | ||||
|         raise trio.EndOfChannel( | ||||
|                 'Stream ended due to msg:\n' | ||||
|                 f'{pformat(msg)}' | ||||
|         ) from src_err | ||||
| 
 | ||||
|     # TODO: test that shows stream raising an expected error!!! | ||||
|     elif msg.get('error'): | ||||
|         # raise the error message | ||||
|         raise unpack_error(msg, stream._ctx.chan) | ||||
| 
 | ||||
|     # always re-raise the source error if no translation error case | ||||
|     # is activated above. | ||||
|     raise MessagingError( | ||||
|         f'Context received unexpected non-error msg!?\n' | ||||
|         f'cid: {cid}\n' | ||||
|         'received msg:\n' | ||||
|         f'{pformat(msg)}' | ||||
|     ) from src_err | ||||
| 
 | ||||
| 
 | ||||
| class MsgStream(trio.abc.Channel): | ||||
|     ''' | ||||
|     A bidirectional message stream for receiving logically sequenced | ||||
|  | @ -160,10 +93,13 @@ class MsgStream(trio.abc.Channel): | |||
|         try: | ||||
|             return msg['yield'] | ||||
|         except KeyError as kerr: | ||||
|             _raise_from_no_yield_msg( | ||||
|                 stream=self, | ||||
|             _raise_from_no_key_in_msg( | ||||
|                 ctx=self._ctx, | ||||
|                 msg=msg, | ||||
|                 src_err=kerr, | ||||
|                 log=log, | ||||
|                 expect_key='yield', | ||||
|                 stream=self, | ||||
|             ) | ||||
| 
 | ||||
|     async def receive(self): | ||||
|  | @ -196,10 +132,13 @@ class MsgStream(trio.abc.Channel): | |||
|             return msg['yield'] | ||||
| 
 | ||||
|         except KeyError as kerr: | ||||
|             _raise_from_no_yield_msg( | ||||
|                 stream=self, | ||||
|             _raise_from_no_key_in_msg( | ||||
|                 ctx=self._ctx, | ||||
|                 msg=msg, | ||||
|                 src_err=kerr, | ||||
|                 log=log, | ||||
|                 expect_key='yield', | ||||
|                 stream=self, | ||||
|             ) | ||||
| 
 | ||||
|         except ( | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue