From 78b4eef7ee091d05d10a323bda467f7f7a81fcfc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jun 2021 13:30:09 -0400 Subject: [PATCH] Don't clobber msg loop mem chan on rx stream close Revert this change since it really is poking at internals and doesn't make a lot of sense. If the context is going to be cancelled then the msg loop will tear down the feed memory channel when ready, we don't need to be clobbering it and confusing the runtime machinery lol. --- tractor/_portal.py | 6 ------ tractor/_streaming.py | 7 +++---- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 32e71b0d..3d1bd9ce 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -332,12 +332,6 @@ class Portal: # message right now since there shouldn't be a reason to # stop and restart the stream, right? try: - - # We are for sure done with this stream and no more - # messages are expected to be delivered from the - # runtime's msg loop. - await recv_chan.aclose() - await ctx.cancel() except trio.ClosedResourceError: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index ea012644..0d6b59dd 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -61,7 +61,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): return msg['yield'] async def receive(self): - # see ``.aclose()`` to an alt to always checking this + # see ``.aclose()`` for an alt to always checking this if self._eoc: raise trio.EndOfChannel @@ -80,7 +80,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if msg.get('stop'): log.debug(f"{self} was stopped at remote end") - self._eoc = True # when the send is closed we assume the stream has # terminated and signal this local iterator to stop @@ -150,6 +149,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): on close. """ + self._eoc = True + # XXX: keep proper adherance to trio's `.aclose()` semantics: # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan @@ -200,8 +201,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # it can't traverse the transport. log.debug(f'Channel for {self} was already closed') - self._eoc = True - # close the local mem chan??!? # NOT if we're a ``MsgStream``!