Move missing-key-in-msg raiser to `._exceptions`

Since we use basically the exact same set of logic in
`Portal.open_context()` when expecting the first `'started'` msg factor
and generalize `._streaming._raise_from_no_yield_msg()` into a new
`._exceptions._raise_from_no_key_in_msg()` (as per the lingering todo)
which obvi requires a more generalized / optional signature including
a caller specific `log` obj. Obvi call the new func from all the other
modules X)
multihomed
Tyler Goodlet 2024-01-02 18:34:15 -05:00
parent 0bcdea28a0
commit 734bc09b67
4 changed files with 122 additions and 97 deletions

View File

@ -44,9 +44,11 @@ import warnings
import trio
from ._exceptions import (
# _raise_from_no_key_in_msg,
unpack_error,
pack_error,
ContextCancelled,
# MessagingError,
StreamOverrun,
)
from .log import get_logger

View File

@ -14,16 +14,18 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
Our classy exception set.
"""
'''
from __future__ import annotations
import builtins
import importlib
from pprint import pformat
from typing import (
Any,
Type,
TYPE_CHECKING,
)
import traceback
@ -32,6 +34,11 @@ import trio
from ._state import current_actor
if TYPE_CHECKING:
from ._context import Context
from ._stream import MsgStream
from .log import StackLevelAdapter
_this_mod = importlib.import_module(__name__)
@ -246,3 +253,88 @@ def is_multi_cancelled(exc: BaseException) -> bool:
) is not None
return False
def _raise_from_no_key_in_msg(
ctx: Context,
msg: dict,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
stream: MsgStream | None = None,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
try:
cid: str = msg['cid']
except KeyError as src_err:
raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}\n'
) from src_err
# TODO: test that shows stream raising an expected error!!!
if msg.get('error'):
# raise the error message
raise unpack_error(
msg,
ctx.chan,
) from None
elif (
msg.get('stop')
or (
stream
and stream._eoc
)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
'Context[{cid}] stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
if (
stream
and stream._closed
):
raise trio.ClosedResourceError('This stream was closed')
# always re-raise the source error if no translation error case
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f'{_type} was expecting a `{expect_key}` message'
' BUT received a non-`error` msg:\n'
f'cid: {cid}\n'
'{pformat(msg)}'
) from src_err

View File

@ -33,7 +33,6 @@ from typing import (
)
from functools import partial
from dataclasses import dataclass
from pprint import pformat
import warnings
import trio
@ -45,13 +44,17 @@ from ._ipc import Channel
from .log import get_logger
from .msg import NamespacePath
from ._exceptions import (
_raise_from_no_key_in_msg,
unpack_error,
NoResult,
ContextCancelled,
MessagingError,
)
from ._context import Context
from ._streaming import MsgStream
from ._context import (
Context,
)
from ._streaming import (
MsgStream,
)
from .devx._debug import maybe_wait_for_debugger
@ -465,26 +468,15 @@ class Portal:
first: Any = msg['started']
ctx._started_called: bool = True
except KeyError:
except KeyError as src_error:
# TODO: can we maybe factor this into the new raiser
# `_streaming._raise_from_no_yield_msg()` and make that
# helper more generic, say with a `_no_<blah>_msg()`?
if not (cid := msg.get('cid')):
raise MessagingError(
'Received internal error at context?\n'
'No call-id (cid) in startup msg?'
)
if msg.get('error'):
# NOTE: mask the key error with the remote one
raise unpack_error(msg, self.channel) from None
else:
raise MessagingError(
f'Context for {cid} was expecting a `started` message'
' but received a non-error msg:\n'
f'{pformat(msg)}'
)
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
src_err=src_error,
log=log,
expect_key='started',
)
ctx._portal: Portal = self
uid: tuple = self.channel.uid

View File

@ -23,7 +23,6 @@ The machinery and types behind ``Context.open_stream()``
from __future__ import annotations
import inspect
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
Callable,
@ -35,8 +34,7 @@ import warnings
import trio
from ._exceptions import (
unpack_error,
MessagingError,
_raise_from_no_key_in_msg,
)
from .log import get_logger
from .trionics import (
@ -56,71 +54,6 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``?
def _raise_from_no_yield_msg(
stream: MsgStream,
msg: dict,
src_err: KeyError,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10+ match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
# breakpoint()
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
if (
msg.get('stop')
or stream._eoc
):
log.debug(f'{stream} was stopped at remote end')
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
'Stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error case
# is activated above.
raise MessagingError(
f'Context received unexpected non-error msg!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}'
) from src_err
class MsgStream(trio.abc.Channel):
'''
A bidirectional message stream for receiving logically sequenced
@ -160,10 +93,13 @@ class MsgStream(trio.abc.Channel):
try:
return msg['yield']
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
async def receive(self):
@ -196,10 +132,13 @@ class MsgStream(trio.abc.Channel):
return msg['yield']
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
except (