diff --git a/tractor/_streaming.py b/tractor/_streaming.py index ea01264..7b01707 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -61,7 +61,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): return msg['yield'] async def receive(self): - # see ``.aclose()`` to an alt to always checking this + # see ``.aclose()`` for an alt to always checking this if self._eoc: raise trio.EndOfChannel @@ -80,7 +80,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if msg.get('stop'): log.debug(f"{self} was stopped at remote end") - self._eoc = True # when the send is closed we assume the stream has # terminated and signal this local iterator to stop @@ -121,7 +120,16 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # terminated and signal this local iterator to stop await self.aclose() - raise # propagate + raise trio.EndOfChannel + + # if not isinstance(self, MsgStream): + # # XXX: this was how we handled this originally for the + # # single direction case? + # raise trio.EndOfChannel + + # else: + # # in 2-way case raise the closed error + # raise # propagate except trio.Cancelled: # relay cancels to the remote task @@ -150,6 +158,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): on close. """ + self._eoc = True + # XXX: keep proper adherance to trio's `.aclose()` semantics: # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan @@ -200,8 +210,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # it can't traverse the transport. log.debug(f'Channel for {self} was already closed') - self._eoc = True - # close the local mem chan??!? # NOT if we're a ``MsgStream``!