diff --git a/tractor/_streaming.py b/tractor/_streaming.py
index b2cfe485..50a32ae9 100644
--- a/tractor/_streaming.py
+++ b/tractor/_streaming.py
@@ -90,19 +90,29 @@ class MsgStream(trio.abc.Channel):
         self._closed: bool|trio.ClosedResourceError = False
 
     # delegate directly to underlying mem channel
-    def receive_nowait(self):
-        msg = self._rx_chan.receive_nowait()
-        try:
-            return msg['yield']
-        except KeyError as kerr:
-            _raise_from_no_key_in_msg(
-                ctx=self._ctx,
-                msg=msg,
-                src_err=kerr,
-                log=log,
-                expect_key='yield',
-                stream=self,
-            )
+    def receive_nowait(
+        self,
+        allow_msg_keys: list[str] = ['yield'],
+    ):
+        msg: dict = self._rx_chan.receive_nowait()
+        for (
+            i,
+            key,
+        ) in enumerate(allow_msg_keys):
+            try:
+                return msg[key]
+            except KeyError as kerr:
+                if i < (len(allow_msg_keys) - 1):
+                    continue
+
+                _raise_from_no_key_in_msg(
+                    ctx=self._ctx,
+                    msg=msg,
+                    src_err=kerr,
+                    log=log,
+                    expect_key=key,
+                    stream=self,
+                )
 
     async def receive(self):
         '''
@@ -263,7 +273,9 @@ class MsgStream(trio.abc.Channel):
         drained: list[Exception|dict] = []
         while not drained:
             try:
-                maybe_final_msg = self.receive_nowait()
+                maybe_final_msg = self.receive_nowait(
+                    allow_msg_keys=['yield', 'return'],
+                )
                 if maybe_final_msg:
                     log.debug(
                         'Drained un-processed stream msg:\n'