diff --git a/tractor/_context.py b/tractor/_context.py
index 9e19f2a..4d56fb3 100644
--- a/tractor/_context.py
+++ b/tractor/_context.py
@@ -44,9 +44,11 @@ import warnings
import trio
from ._exceptions import (
+ # _raise_from_no_key_in_msg,
unpack_error,
pack_error,
ContextCancelled,
+ # MessagingError,
StreamOverrun,
)
from .log import get_logger
diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py
index 214dc88..7e14858 100644
--- a/tractor/_exceptions.py
+++ b/tractor/_exceptions.py
@@ -14,16 +14,18 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-"""
+'''
Our classy exception set.
-"""
+'''
+from __future__ import annotations
import builtins
import importlib
from pprint import pformat
from typing import (
Any,
Type,
+ TYPE_CHECKING,
)
import traceback
@@ -32,6 +34,11 @@ import trio
from ._state import current_actor
+if TYPE_CHECKING:
+ from ._context import Context
+ from ._stream import MsgStream
+ from .log import StackLevelAdapter
+
_this_mod = importlib.import_module(__name__)
@@ -246,3 +253,88 @@ def is_multi_cancelled(exc: BaseException) -> bool:
) is not None
return False
+
+
+def _raise_from_no_key_in_msg(
+ ctx: Context,
+ msg: dict,
+ src_err: KeyError,
+ log: StackLevelAdapter, # caller specific `log` obj
+ expect_key: str = 'yield',
+ stream: MsgStream | None = None,
+
+) -> bool:
+ '''
+ Raise an appopriate local error when a `MsgStream` msg arrives
+ which does not contain the expected (under normal operation)
+ `'yield'` field.
+
+ '''
+ __tracebackhide__: bool = True
+
+ # internal error should never get here
+ try:
+ cid: str = msg['cid']
+ except KeyError as src_err:
+ raise MessagingError(
+ f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
+ f'cid: {cid}\n'
+ 'received msg:\n'
+ f'{pformat(msg)}\n'
+ ) from src_err
+
+ # TODO: test that shows stream raising an expected error!!!
+ if msg.get('error'):
+ # raise the error message
+ raise unpack_error(
+ msg,
+ ctx.chan,
+ ) from None
+
+ elif (
+ msg.get('stop')
+ or (
+ stream
+ and stream._eoc
+ )
+ ):
+ log.debug(
+ f'Context[{cid}] stream was stopped by remote side\n'
+ f'cid: {cid}\n'
+ )
+
+ # XXX: important to set so that a new ``.receive()``
+ # call (likely by another task using a broadcast receiver)
+ # doesn't accidentally pull the ``return`` message
+ # value out of the underlying feed mem chan!
+ stream._eoc: bool = True
+
+ # # when the send is closed we assume the stream has
+ # # terminated and signal this local iterator to stop
+ # await stream.aclose()
+
+ # XXX: this causes ``ReceiveChannel.__anext__()`` to
+ # raise a ``StopAsyncIteration`` **and** in our catch
+ # block below it will trigger ``.aclose()``.
+ raise trio.EndOfChannel(
+ 'Context[{cid}] stream ended due to msg:\n'
+ f'{pformat(msg)}'
+ ) from src_err
+
+
+ if (
+ stream
+ and stream._closed
+ ):
+ raise trio.ClosedResourceError('This stream was closed')
+
+
+ # always re-raise the source error if no translation error case
+ # is activated above.
+ _type: str = 'Stream' if stream else 'Context'
+ raise MessagingError(
+ f'{_type} was expecting a `{expect_key}` message'
+ ' BUT received a non-`error` msg:\n'
+ f'cid: {cid}\n'
+ '{pformat(msg)}'
+ ) from src_err
diff --git a/tractor/_portal.py b/tractor/_portal.py
index 4c0587a..f310643 100644
--- a/tractor/_portal.py
+++ b/tractor/_portal.py
@@ -33,7 +33,6 @@ from typing import (
)
from functools import partial
from dataclasses import dataclass
-from pprint import pformat
import warnings
import trio
@@ -45,13 +44,17 @@ from ._ipc import Channel
from .log import get_logger
from .msg import NamespacePath
from ._exceptions import (
+ _raise_from_no_key_in_msg,
unpack_error,
NoResult,
ContextCancelled,
- MessagingError,
)
-from ._context import Context
-from ._streaming import MsgStream
+from ._context import (
+ Context,
+)
+from ._streaming import (
+ MsgStream,
+)
from .devx._debug import maybe_wait_for_debugger
@@ -465,26 +468,15 @@ class Portal:
first: Any = msg['started']
ctx._started_called: bool = True
- except KeyError:
+ except KeyError as src_error:
- # TODO: can we maybe factor this into the new raiser
- # `_streaming._raise_from_no_yield_msg()` and make that
- # helper more generic, say with a `_no__msg()`?
- if not (cid := msg.get('cid')):
- raise MessagingError(
- 'Received internal error at context?\n'
- 'No call-id (cid) in startup msg?'
- )
-
- if msg.get('error'):
- # NOTE: mask the key error with the remote one
- raise unpack_error(msg, self.channel) from None
- else:
- raise MessagingError(
- f'Context for {cid} was expecting a `started` message'
- ' but received a non-error msg:\n'
- f'{pformat(msg)}'
- )
+ _raise_from_no_key_in_msg(
+ ctx=ctx,
+ msg=msg,
+ src_err=src_error,
+ log=log,
+ expect_key='started',
+ )
ctx._portal: Portal = self
uid: tuple = self.channel.uid
diff --git a/tractor/_streaming.py b/tractor/_streaming.py
index f02197b..4530e14 100644
--- a/tractor/_streaming.py
+++ b/tractor/_streaming.py
@@ -23,7 +23,6 @@ The machinery and types behind ``Context.open_stream()``
from __future__ import annotations
import inspect
from contextlib import asynccontextmanager as acm
-from pprint import pformat
from typing import (
Any,
Callable,
@@ -35,8 +34,7 @@ import warnings
import trio
from ._exceptions import (
- unpack_error,
- MessagingError,
+ _raise_from_no_key_in_msg,
)
from .log import get_logger
from .trionics import (
@@ -56,71 +54,6 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``?
-def _raise_from_no_yield_msg(
- stream: MsgStream,
- msg: dict,
- src_err: KeyError,
-
-) -> bool:
- '''
- Raise an appopriate local error when a `MsgStream` msg arrives
- which does not contain the expected (under normal operation)
- `'yield'` field.
-
- '''
- __tracebackhide__: bool = True
-
- # internal error should never get here
- assert msg.get('cid'), ("Received internal error at portal?")
-
- # TODO: handle 2 cases with 3.10+ match syntax
- # - 'stop'
- # - 'error'
- # possibly just handle msg['stop'] here!
- # breakpoint()
-
- if stream._closed:
- raise trio.ClosedResourceError('This stream was closed')
-
- if (
- msg.get('stop')
- or stream._eoc
- ):
- log.debug(f'{stream} was stopped at remote end')
-
- # XXX: important to set so that a new ``.receive()``
- # call (likely by another task using a broadcast receiver)
- # doesn't accidentally pull the ``return`` message
- # value out of the underlying feed mem chan!
- stream._eoc: bool = True
-
- # # when the send is closed we assume the stream has
- # # terminated and signal this local iterator to stop
- # await stream.aclose()
-
- # XXX: this causes ``ReceiveChannel.__anext__()`` to
- # raise a ``StopAsyncIteration`` **and** in our catch
- # block below it will trigger ``.aclose()``.
- raise trio.EndOfChannel(
- 'Stream ended due to msg:\n'
- f'{pformat(msg)}'
- ) from src_err
-
- # TODO: test that shows stream raising an expected error!!!
- elif msg.get('error'):
- # raise the error message
- raise unpack_error(msg, stream._ctx.chan)
-
- # always re-raise the source error if no translation error case
- # is activated above.
- raise MessagingError(
- f'Context received unexpected non-error msg!?\n'
- f'cid: {cid}\n'
- 'received msg:\n'
- f'{pformat(msg)}'
- ) from src_err
-
-
class MsgStream(trio.abc.Channel):
'''
A bidirectional message stream for receiving logically sequenced
@@ -160,10 +93,13 @@ class MsgStream(trio.abc.Channel):
try:
return msg['yield']
except KeyError as kerr:
- _raise_from_no_yield_msg(
- stream=self,
+ _raise_from_no_key_in_msg(
+ ctx=self._ctx,
msg=msg,
src_err=kerr,
+ log=log,
+ expect_key='yield',
+ stream=self,
)
async def receive(self):
@@ -196,10 +132,13 @@ class MsgStream(trio.abc.Channel):
return msg['yield']
except KeyError as kerr:
- _raise_from_no_yield_msg(
- stream=self,
+ _raise_from_no_key_in_msg(
+ ctx=self._ctx,
msg=msg,
src_err=kerr,
+ log=log,
+ expect_key='yield',
+ stream=self,
)
except (