diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 11ff47d..699a906 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -97,6 +97,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if self._eoc: raise trio.EndOfChannel + if self._closed: + raise trio.ClosedResourceError('This stream was closed') + try: msg = await self._rx_chan.receive() return msg['yield'] @@ -110,6 +113,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # - 'error' # possibly just handle msg['stop'] here! + if self._closed: + raise trio.ClosedResourceError('This stream was closed') + if msg.get('stop') or self._eoc: log.debug(f"{self} was stopped at remote end") @@ -189,7 +195,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): return self._eoc = True - self._closed = True # NOTE: this is super subtle IPC messaging stuff: # Relay stop iteration to far end **iff** we're @@ -206,29 +211,32 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # In the bidirectional case, `Context.open_stream()` will create # the `Actor._cids2qs` entry from a call to - # `Actor.get_context()` and will send the stop message in - # ``__aexit__()`` on teardown so it **does not** need to be - # called here. - if not self._ctx._portal: - # Only for 2 way streams can we can send stop from the - # caller side. - try: - # NOTE: if this call is cancelled we expect this end to - # handle as though the stop was never sent (though if it - # was it shouldn't matter since it's unlikely a user - # will try to re-use a stream after attemping to close - # it). - with trio.CancelScope(shield=True): - await self._ctx.send_stop() + # `Actor.get_context()` and will call us here to send the stop + # msg in ``__aexit__()`` on teardown. + try: + # NOTE: if this call is cancelled we expect this end to + # handle as though the stop was never sent (though if it + # was it shouldn't matter since it's unlikely a user + # will try to re-use a stream after attemping to close + # it). + with trio.CancelScope(shield=True): + await self._ctx.send_stop() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - # the underlying channel may already have been pulled - # in which case our stop message is meaningless since - # it can't traverse the transport. - log.debug(f'Channel for {self} was already closed') + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # the underlying channel may already have been pulled + # in which case our stop message is meaningless since + # it can't traverse the transport. + ctx = self._ctx + log.warning( + f'Stream was already destroyed?\n' + f'actor: {ctx.chan.uid}\n' + f'ctx id: {ctx.cid}' + ) + + self._closed = True # Do we close the local mem chan ``self._rx_chan`` ??!? @@ -594,30 +602,23 @@ class Context: async with MsgStream( ctx=self, rx_chan=ctx._recv_chan, - ) as rchan: + ) as stream: if self._portal: - self._portal._streams.add(rchan) + self._portal._streams.add(stream) try: self._stream_opened = True - # ensure we aren't cancelled before delivering - # the stream + # XXX: do we need this? + # ensure we aren't cancelled before yielding the stream # await trio.lowlevel.checkpoint() - yield rchan + yield stream - # XXX: Make the stream "one-shot use". On exit, signal + # NOTE: Make the stream "one-shot use". On exit, signal # ``trio.EndOfChannel``/``StopAsyncIteration`` to the # far end. - try: - await self.send_stop() - except trio.BrokenResourceError: - log.warning( - f"Couldn't close: stream already broken?\n" - f'actor: {self.chan.uid}\n' - f'ctx id: {self.cid}' - ) + await stream.aclose() finally: if self._portal: