Factor non-yield stream msg processing into helper

Since both `MsgStream.receive()` and `.receive_nowait()` need the same
raising logic when a non-stream msg arrives (so that maybe an
appropriate IPC translated error can be raised) move the `KeyError`
handler code into a new `._streaming._raise_from_no_yield_msg()` func
and call it from both methods to make the error-interface-raising
symmetrical across both methods.
multihomed
Tyler Goodlet 2023-10-16 15:35:16 -04:00
parent 6d951c526a
commit 2fdb8fc25a
1 changed files with 72 additions and 40 deletions

View File

@ -54,6 +54,60 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]): # messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``? # - use __slots__ on ``Context``?
def _raise_from_no_yield_msg(
stream: MsgStream,
msg: dict,
src_err: KeyError,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10+ match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or stream._eoc:
log.debug(f"{stream} was stopped at remote end")
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel from src_err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error
# case is activated above.
raise src_err
# raise RuntimeError(
# 'Unknown non-yield stream msg?\n'
# f'{msg}'
# )
class MsgStream(trio.abc.Channel): class MsgStream(trio.abc.Channel):
''' '''
@ -91,11 +145,20 @@ class MsgStream(trio.abc.Channel):
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait(self): def receive_nowait(self):
msg = self._rx_chan.receive_nowait() msg = self._rx_chan.receive_nowait()
return msg['yield'] try:
return msg['yield']
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
msg=msg,
src_err=kerr,
)
async def receive(self): async def receive(self):
'''Async receive a single msg from the IPC transport, the next '''
in sequence for this stream. Receive a single msg from the IPC transport, the next in
sequence sent by the far end task (possibly in order as
determined by the underlying protocol).
''' '''
# see ``.aclose()`` for notes on the old behaviour prior to # see ``.aclose()`` for notes on the old behaviour prior to
@ -110,43 +173,12 @@ class MsgStream(trio.abc.Channel):
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
except KeyError as err: except KeyError as kerr:
# internal error should never get here _raise_from_no_yield_msg(
assert msg.get('cid'), ("Received internal error at portal?") stream=self,
msg=msg,
# TODO: handle 2 cases with 3.10 match syntax src_err=kerr,
# - 'stop' )
# - 'error'
# possibly just handle msg['stop'] here!
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or self._eoc:
log.debug(f"{self} was stopped at remote end")
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
self._eoc = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await self.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel from err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, self._ctx.chan)
else:
raise
except ( except (
trio.ClosedResourceError, # by self._rx_chan trio.ClosedResourceError, # by self._rx_chan