From a2e2f7e7a89dad8c4271a21b62eac812394ff71a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 May 2021 23:42:34 -0400 Subject: [PATCH] Only send stop msg if not received from far end --- tractor/_streaming.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9bc32e3..085d994 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -53,7 +53,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # delegate directly to underlying mem channel def receive_nowait(self): - return self._rx_chan.receive_nowait() + msg = self._rx_chan.receive_nowait() + return msg['yield'] async def receive(self): try: @@ -106,6 +107,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # when the send is closed we assume the stream has # terminated and signal this local iterator to stop await self.aclose() + # await self._ctx.send_stop() raise StopAsyncIteration @@ -326,10 +328,14 @@ class Context: try: yield rchan - finally: + except trio.EndOfChannel: + raise + + else: # signal ``StopAsyncIteration`` on far end. await self.send_stop() + finally: if self._portal: self._portal._streams.remove(rchan)