diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9bc32e3..085d994 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -53,7 +53,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # delegate directly to underlying mem channel def receive_nowait(self): - return self._rx_chan.receive_nowait() + msg = self._rx_chan.receive_nowait() + return msg['yield'] async def receive(self): try: @@ -106,6 +107,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # when the send is closed we assume the stream has # terminated and signal this local iterator to stop await self.aclose() + # await self._ctx.send_stop() raise StopAsyncIteration @@ -326,10 +328,14 @@ class Context: try: yield rchan - finally: + except trio.EndOfChannel: + raise + + else: # signal ``StopAsyncIteration`` on far end. await self.send_stop() + finally: if self._portal: self._portal._streams.remove(rchan)