forked from goodboy/tractor
1
0
Fork 0

Raise a `MessagingError` from the src error on msging edge cases

multihomed
Tyler Goodlet 2023-10-23 14:34:12 -04:00
parent 0518b3ab04
commit 5a94e8fb5b
1 changed files with 33 additions and 11 deletions

View File

@ -23,6 +23,7 @@ The machinery and types behind ``Context.open_stream()``
from __future__ import annotations from __future__ import annotations
import inspect import inspect
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -35,6 +36,7 @@ import trio
from ._exceptions import ( from ._exceptions import (
unpack_error, unpack_error,
MessagingError,
) )
from .log import get_logger from .log import get_logger
from .trionics import ( from .trionics import (
@ -66,6 +68,8 @@ def _raise_from_no_yield_msg(
`'yield'` field. `'yield'` field.
''' '''
__tracebackhide__: bool = True
# internal error should never get here # internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?") assert msg.get('cid'), ("Received internal error at portal?")
@ -73,18 +77,22 @@ def _raise_from_no_yield_msg(
# - 'stop' # - 'stop'
# - 'error' # - 'error'
# possibly just handle msg['stop'] here! # possibly just handle msg['stop'] here!
# breakpoint()
if stream._closed: if stream._closed:
raise trio.ClosedResourceError('This stream was closed') raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or stream._eoc: if (
log.debug(f"{stream} was stopped at remote end") msg.get('stop')
or stream._eoc
):
log.debug(f'{stream} was stopped at remote end')
# XXX: important to set so that a new ``.receive()`` # XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver) # call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message # doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan! # value out of the underlying feed mem chan!
stream._eoc = True stream._eoc: bool = True
# # when the send is closed we assume the stream has # # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop # # terminated and signal this local iterator to stop
@ -93,20 +101,24 @@ def _raise_from_no_yield_msg(
# XXX: this causes ``ReceiveChannel.__anext__()`` to # XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch # raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``. # block below it will trigger ``.aclose()``.
raise trio.EndOfChannel from src_err raise trio.EndOfChannel(
'Stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
# TODO: test that shows stream raising an expected error!!! # TODO: test that shows stream raising an expected error!!!
elif msg.get('error'): elif msg.get('error'):
# raise the error message # raise the error message
raise unpack_error(msg, stream._ctx.chan) raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error # always re-raise the source error if no translation error case
# case is activated above. # is activated above.
raise src_err raise MessagingError(
# raise RuntimeError( f'Context received unexpected non-error msg!?\n'
# 'Unknown non-yield stream msg?\n' f'cid: {cid}\n'
# f'{msg}' 'received msg:\n'
# ) f'{pformat(msg)}'
) from src_err
class MsgStream(trio.abc.Channel): class MsgStream(trio.abc.Channel):
@ -161,6 +173,16 @@ class MsgStream(trio.abc.Channel):
determined by the underlying protocol). determined by the underlying protocol).
''' '''
# NOTE: `trio.ReceiveChannel` implements
# EOC handling as follows (aka uses it
# to gracefully exit async for loops):
#
# async def __anext__(self) -> ReceiveType:
# try:
# return await self.receive()
# except trio.EndOfChannel:
# raise StopAsyncIteration
# see ``.aclose()`` for notes on the old behaviour prior to # see ``.aclose()`` for notes on the old behaviour prior to
# introducing this # introducing this
if self._eoc: if self._eoc: