From 38a648385999e3aef0d69db10d22468cedf62277 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 14 Apr 2024 18:31:41 -0400 Subject: [PATCH] Use `_raise_from_no_key_in_msg(allow_msgs)` Instead of `allow_msg_keys` since we've fully flipped over to struct-types for msgs in the runtime. - drop the loop from `MsgStream.receive_nowait()` since `Yield/Return.pld` getting will handle both (instead of a loop of `dict`-key reads). --- tractor/_context.py | 1 - tractor/_exceptions.py | 3 +-- tractor/_streaming.py | 48 ++++++++++++++++-------------------------- 3 files changed, 19 insertions(+), 33 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index fc16289..9495654 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -2151,7 +2151,6 @@ async def open_context_from_portal( src_err=src_error, log=log, expect_msg=Started, - # expect_key='started', ) uid: tuple = portal.channel.uid diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 259994a..65637fb 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -938,7 +938,6 @@ def _raise_from_no_key_in_msg( src_err: KeyError, log: StackLevelAdapter, # caller specific `log` obj - expect_key: str = 'yield', expect_msg: str = Yield, stream: MsgStream | None = None, @@ -1053,7 +1052,7 @@ def _raise_from_no_key_in_msg( # is activated above. _type: str = 'Stream' if stream else 'Context' raise MessagingError( - f"{_type} was expecting a '{expect_key.upper()}' message" + f"{_type} was expecting a {expect_msg} message" " BUT received a non-error msg:\n" f'{pformat(msg)}' ) from src_err diff --git a/tractor/_streaming.py b/tractor/_streaming.py index fcf8daf..ac4d482 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -44,6 +44,7 @@ from .trionics import ( BroadcastReceiver, ) from tractor.msg import ( + Return, Stop, Yield, ) @@ -82,7 +83,7 @@ class MsgStream(trio.abc.Channel): self, ctx: Context, # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, - _broadcaster: BroadcastReceiver | None = None, + _broadcaster: BroadcastReceiver|None = None, ) -> None: self._ctx = ctx @@ -96,36 +97,26 @@ class MsgStream(trio.abc.Channel): # delegate directly to underlying mem channel def receive_nowait( self, - allow_msg_keys: list[str] = ['yield'], + allow_msgs: list[str] = Yield, ): - # msg: dict = self._rx_chan.receive_nowait() msg: Yield|Stop = self._rx_chan.receive_nowait() - for ( - i, - key, - ) in enumerate(allow_msg_keys): - try: - # return msg[key] - return msg.pld - # except KeyError as kerr: - except AttributeError as attrerr: - if i < (len(allow_msg_keys) - 1): - continue - - _raise_from_no_key_in_msg( - ctx=self._ctx, - msg=msg, - # src_err=kerr, - src_err=attrerr, - log=log, - expect_key=key, - stream=self, - ) + # TODO: replace msg equiv of this or does the `.pld` + # interface read already satisfy it? I think so, yes? + try: + return msg.pld + except AttributeError as attrerr: + _raise_from_no_key_in_msg( + ctx=self._ctx, + msg=msg, + src_err=attrerr, + log=log, + stream=self, + ) async def receive( self, - hide_tb: bool = True, + hide_tb: bool = False, ): ''' Receive a single msg from the IPC transport, the next in @@ -157,10 +148,9 @@ class MsgStream(trio.abc.Channel): try: try: msg: Yield = await self._rx_chan.receive() - # return msg['yield'] return msg.pld - # except KeyError as kerr: + # TODO: implement with match: instead? except AttributeError as attrerr: # src_err = kerr src_err = attrerr @@ -170,10 +160,8 @@ class MsgStream(trio.abc.Channel): _raise_from_no_key_in_msg( ctx=self._ctx, msg=msg, - # src_err=kerr, src_err=attrerr, log=log, - expect_key='yield', stream=self, ) @@ -304,7 +292,7 @@ class MsgStream(trio.abc.Channel): while not drained: try: maybe_final_msg = self.receive_nowait( - allow_msg_keys=['yield', 'return'], + allow_msgs=[Yield, Return], ) if maybe_final_msg: log.debug(