From 37ee477aeef83fa7ac5e9b1478a838603abd70ce Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Mar 2024 10:20:55 -0400 Subject: [PATCH] Let `MsgStream.receive_nowait()` take in msg key list Call it `allow_msg_keys: list[str] = ['yield']` and set it to accept `['yield', 'return']` from the drain loop in `.aclose()`. Only pass the last key error to `_raise_from_no_key_in_msg()` in the fall-through case. Somehow this seems to prevent all the intermittent test failures i was seeing in local runs including when running the entire suite all in sequence; i ain't complaining B) --- tractor/_streaming.py | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index b2cfe48..50a32ae 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -90,19 +90,29 @@ class MsgStream(trio.abc.Channel): self._closed: bool|trio.ClosedResourceError = False # delegate directly to underlying mem channel - def receive_nowait(self): - msg = self._rx_chan.receive_nowait() - try: - return msg['yield'] - except KeyError as kerr: - _raise_from_no_key_in_msg( - ctx=self._ctx, - msg=msg, - src_err=kerr, - log=log, - expect_key='yield', - stream=self, - ) + def receive_nowait( + self, + allow_msg_keys: list[str] = ['yield'], + ): + msg: dict = self._rx_chan.receive_nowait() + for ( + i, + key, + ) in enumerate(allow_msg_keys): + try: + return msg[key] + except KeyError as kerr: + if i < (len(allow_msg_keys) - 1): + continue + + _raise_from_no_key_in_msg( + ctx=self._ctx, + msg=msg, + src_err=kerr, + log=log, + expect_key=key, + stream=self, + ) async def receive(self): ''' @@ -263,7 +273,9 @@ class MsgStream(trio.abc.Channel): drained: list[Exception|dict] = [] while not drained: try: - maybe_final_msg = self.receive_nowait() + maybe_final_msg = self.receive_nowait( + allow_msg_keys=['yield', 'return'], + ) if maybe_final_msg: log.debug( 'Drained un-processed stream msg:\n'