From 734bc09b67ef8345be0cc41d6d11ab13c662b748 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 2 Jan 2024 18:34:15 -0500 Subject: [PATCH] Move missing-key-in-msg raiser to `._exceptions` Since we use basically the exact same set of logic in `Portal.open_context()` when expecting the first `'started'` msg factor and generalize `._streaming._raise_from_no_yield_msg()` into a new `._exceptions._raise_from_no_key_in_msg()` (as per the lingering todo) which obvi requires a more generalized / optional signature including a caller specific `log` obj. Obvi call the new func from all the other modules X) --- tractor/_context.py | 2 + tractor/_exceptions.py | 96 +++++++++++++++++++++++++++++++++++++++++- tractor/_portal.py | 38 +++++++---------- tractor/_streaming.py | 83 +++++------------------------------- 4 files changed, 122 insertions(+), 97 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index 9e19f2a..4d56fb3 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -44,9 +44,11 @@ import warnings import trio from ._exceptions import ( + # _raise_from_no_key_in_msg, unpack_error, pack_error, ContextCancelled, + # MessagingError, StreamOverrun, ) from .log import get_logger diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 214dc88..7e14858 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -14,16 +14,18 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Our classy exception set. -""" +''' +from __future__ import annotations import builtins import importlib from pprint import pformat from typing import ( Any, Type, + TYPE_CHECKING, ) import traceback @@ -32,6 +34,11 @@ import trio from ._state import current_actor +if TYPE_CHECKING: + from ._context import Context + from ._stream import MsgStream + from .log import StackLevelAdapter + _this_mod = importlib.import_module(__name__) @@ -246,3 +253,88 @@ def is_multi_cancelled(exc: BaseException) -> bool: ) is not None return False + + +def _raise_from_no_key_in_msg( + ctx: Context, + msg: dict, + src_err: KeyError, + log: StackLevelAdapter, # caller specific `log` obj + expect_key: str = 'yield', + stream: MsgStream | None = None, + +) -> bool: + ''' + Raise an appopriate local error when a `MsgStream` msg arrives + which does not contain the expected (under normal operation) + `'yield'` field. + + ''' + __tracebackhide__: bool = True + + # internal error should never get here + try: + cid: str = msg['cid'] + except KeyError as src_err: + raise MessagingError( + f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n' + f'cid: {cid}\n' + 'received msg:\n' + f'{pformat(msg)}\n' + ) from src_err + + # TODO: test that shows stream raising an expected error!!! + if msg.get('error'): + # raise the error message + raise unpack_error( + msg, + ctx.chan, + ) from None + + elif ( + msg.get('stop') + or ( + stream + and stream._eoc + ) + ): + log.debug( + f'Context[{cid}] stream was stopped by remote side\n' + f'cid: {cid}\n' + ) + + # XXX: important to set so that a new ``.receive()`` + # call (likely by another task using a broadcast receiver) + # doesn't accidentally pull the ``return`` message + # value out of the underlying feed mem chan! + stream._eoc: bool = True + + # # when the send is closed we assume the stream has + # # terminated and signal this local iterator to stop + # await stream.aclose() + + # XXX: this causes ``ReceiveChannel.__anext__()`` to + # raise a ``StopAsyncIteration`` **and** in our catch + # block below it will trigger ``.aclose()``. + raise trio.EndOfChannel( + 'Context[{cid}] stream ended due to msg:\n' + f'{pformat(msg)}' + ) from src_err + + + if ( + stream + and stream._closed + ): + raise trio.ClosedResourceError('This stream was closed') + + + # always re-raise the source error if no translation error case + # is activated above. + _type: str = 'Stream' if stream else 'Context' + raise MessagingError( + f'{_type} was expecting a `{expect_key}` message' + ' BUT received a non-`error` msg:\n' + f'cid: {cid}\n' + '{pformat(msg)}' + ) from src_err diff --git a/tractor/_portal.py b/tractor/_portal.py index 4c0587a..f310643 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -33,7 +33,6 @@ from typing import ( ) from functools import partial from dataclasses import dataclass -from pprint import pformat import warnings import trio @@ -45,13 +44,17 @@ from ._ipc import Channel from .log import get_logger from .msg import NamespacePath from ._exceptions import ( + _raise_from_no_key_in_msg, unpack_error, NoResult, ContextCancelled, - MessagingError, ) -from ._context import Context -from ._streaming import MsgStream +from ._context import ( + Context, +) +from ._streaming import ( + MsgStream, +) from .devx._debug import maybe_wait_for_debugger @@ -465,26 +468,15 @@ class Portal: first: Any = msg['started'] ctx._started_called: bool = True - except KeyError: + except KeyError as src_error: - # TODO: can we maybe factor this into the new raiser - # `_streaming._raise_from_no_yield_msg()` and make that - # helper more generic, say with a `_no__msg()`? - if not (cid := msg.get('cid')): - raise MessagingError( - 'Received internal error at context?\n' - 'No call-id (cid) in startup msg?' - ) - - if msg.get('error'): - # NOTE: mask the key error with the remote one - raise unpack_error(msg, self.channel) from None - else: - raise MessagingError( - f'Context for {cid} was expecting a `started` message' - ' but received a non-error msg:\n' - f'{pformat(msg)}' - ) + _raise_from_no_key_in_msg( + ctx=ctx, + msg=msg, + src_err=src_error, + log=log, + expect_key='started', + ) ctx._portal: Portal = self uid: tuple = self.channel.uid diff --git a/tractor/_streaming.py b/tractor/_streaming.py index f02197b..4530e14 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -23,7 +23,6 @@ The machinery and types behind ``Context.open_stream()`` from __future__ import annotations import inspect from contextlib import asynccontextmanager as acm -from pprint import pformat from typing import ( Any, Callable, @@ -35,8 +34,7 @@ import warnings import trio from ._exceptions import ( - unpack_error, - MessagingError, + _raise_from_no_key_in_msg, ) from .log import get_logger from .trionics import ( @@ -56,71 +54,6 @@ log = get_logger(__name__) # messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]): # - use __slots__ on ``Context``? -def _raise_from_no_yield_msg( - stream: MsgStream, - msg: dict, - src_err: KeyError, - -) -> bool: - ''' - Raise an appopriate local error when a `MsgStream` msg arrives - which does not contain the expected (under normal operation) - `'yield'` field. - - ''' - __tracebackhide__: bool = True - - # internal error should never get here - assert msg.get('cid'), ("Received internal error at portal?") - - # TODO: handle 2 cases with 3.10+ match syntax - # - 'stop' - # - 'error' - # possibly just handle msg['stop'] here! - # breakpoint() - - if stream._closed: - raise trio.ClosedResourceError('This stream was closed') - - if ( - msg.get('stop') - or stream._eoc - ): - log.debug(f'{stream} was stopped at remote end') - - # XXX: important to set so that a new ``.receive()`` - # call (likely by another task using a broadcast receiver) - # doesn't accidentally pull the ``return`` message - # value out of the underlying feed mem chan! - stream._eoc: bool = True - - # # when the send is closed we assume the stream has - # # terminated and signal this local iterator to stop - # await stream.aclose() - - # XXX: this causes ``ReceiveChannel.__anext__()`` to - # raise a ``StopAsyncIteration`` **and** in our catch - # block below it will trigger ``.aclose()``. - raise trio.EndOfChannel( - 'Stream ended due to msg:\n' - f'{pformat(msg)}' - ) from src_err - - # TODO: test that shows stream raising an expected error!!! - elif msg.get('error'): - # raise the error message - raise unpack_error(msg, stream._ctx.chan) - - # always re-raise the source error if no translation error case - # is activated above. - raise MessagingError( - f'Context received unexpected non-error msg!?\n' - f'cid: {cid}\n' - 'received msg:\n' - f'{pformat(msg)}' - ) from src_err - - class MsgStream(trio.abc.Channel): ''' A bidirectional message stream for receiving logically sequenced @@ -160,10 +93,13 @@ class MsgStream(trio.abc.Channel): try: return msg['yield'] except KeyError as kerr: - _raise_from_no_yield_msg( - stream=self, + _raise_from_no_key_in_msg( + ctx=self._ctx, msg=msg, src_err=kerr, + log=log, + expect_key='yield', + stream=self, ) async def receive(self): @@ -196,10 +132,13 @@ class MsgStream(trio.abc.Channel): return msg['yield'] except KeyError as kerr: - _raise_from_no_yield_msg( - stream=self, + _raise_from_no_key_in_msg( + ctx=self._ctx, msg=msg, src_err=kerr, + log=log, + expect_key='yield', + stream=self, ) except (