diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 7b01707..6b544d0 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -61,7 +61,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): return msg['yield'] async def receive(self): - # see ``.aclose()`` for an alt to always checking this + # see ``.aclose()`` for notes on the old behaviour prior to + # introducing this if self._eoc: raise trio.EndOfChannel @@ -81,12 +82,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if msg.get('stop'): log.debug(f"{self} was stopped at remote end") - # when the send is closed we assume the stream has - # terminated and signal this local iterator to stop - await self.aclose() + # # when the send is closed we assume the stream has + # # terminated and signal this local iterator to stop + # await self.aclose() # XXX: this causes ``ReceiveChannel.__anext__()`` to - # raise a ``StopAsyncIteration``. + # raise a ``StopAsyncIteration`` **and** in our catch + # block below it will trigger ``.aclose()``. raise trio.EndOfChannel # TODO: test that shows stream raising an expected error!!! @@ -97,44 +99,45 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): else: raise - except trio.ClosedResourceError: - # XXX: this indicates that a `stop` message was - # sent by the far side of the underlying channel. - # Currently this is triggered by calling ``.aclose()`` on + except ( + trio.ClosedResourceError, # by self._rx_chan + trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end + trio.Cancelled, # by local cancellation + ): + # XXX: we close the stream on any of these error conditions: + + # a ``ClosedResourceError`` indicates that the internal + # feeder memory receive channel was closed likely by the + # runtime after the associated transport-channel + # disconnected or broke. + + # an ``EndOfChannel`` indicates either the internal recv + # memchan exhausted **or** we raisesd it just above after + # receiving a `stop` message from the far end of the stream. + + # Previously this was triggered by calling ``.aclose()`` on # the send side of the channel inside - # ``Actor._push_result()``, but maybe it should be put here? - # to avoid exposing the internal mem chan closing mechanism? - # in theory we could instead do some flushing of the channel - # if needed to ensure all consumers are complete before - # triggering closure too early? + # ``Actor._push_result()`` (should still be commented code + # there - which should eventually get removed), but now the + # 'stop' message handling has been put just above. - # Locally, we want to close this stream gracefully, by + # TODO: Locally, we want to close this stream gracefully, by # terminating any local consumers tasks deterministically. - # We **don't** want to be closing this send channel and not - # relaying a final value to remaining consumers who may not - # have been scheduled to receive it yet? - - # lots of testing to do here + # One we have broadcast support, we **don't** want to be + # closing this stream and not flushing a final value to + # remaining (clone) consumers who may not have been + # scheduled to receive it yet. # when the send is closed we assume the stream has # terminated and signal this local iterator to stop await self.aclose() - raise trio.EndOfChannel + raise # propagate - # if not isinstance(self, MsgStream): - # # XXX: this was how we handled this originally for the - # # single direction case? - # raise trio.EndOfChannel - - # else: - # # in 2-way case raise the closed error - # raise # propagate - - except trio.Cancelled: - # relay cancels to the remote task - await self.aclose() - raise + # except trio.Cancelled: + # # relay cancels to the remote task + # await self.aclose() + # raise @contextmanager def shield( @@ -158,8 +161,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): on close. """ - self._eoc = True - # XXX: keep proper adherance to trio's `.aclose()` semantics: # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan @@ -184,6 +185,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): log.warning(f"{self} is shielded, portal channel being kept alive") return + # XXX: This must be set **AFTER** the shielded test above! + self._eoc = True + # NOTE: this is super subtle IPC messaging stuff: # Relay stop iteration to far end **iff** we're # in bidirectional mode. If we're only streaming @@ -195,9 +199,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # isn't expecting messages to be sent by the caller. # Thus, we must check that this context DOES NOT # have a portal reference to ensure this is indeed the callee - # side and can relay a 'stop'. In the bidirectional case, - # `Context.open_stream()` will create the `Actor._cids2qs` - # entry from a call to `Actor.get_memchans()`. + # side and can relay a 'stop'. + + # In the bidirectional case, `Context.open_stream()` will create + # the `Actor._cids2qs` entry from a call to + # `Actor.get_memchans()` and will send the stop message in + # ``__aexit__()`` on teardown so it **does not** need to be + # called here. if not self._ctx._portal: try: # only for 2 way streams can we can send @@ -210,32 +218,41 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # it can't traverse the transport. log.debug(f'Channel for {self} was already closed') - # close the local mem chan??!? + # close the local mem chan ``self._rx_chan`` ??!? - # NOT if we're a ``MsgStream``! + # DEFINITELY NOT if we're a bi-dir ``MsgStream``! # BECAUSE this same core-msg-loop mem recv-chan is used to deliver # the potential final result from the surrounding inter-actor # `Context` so we don't want to close it until that context has # run to completion. - # XXX: Notes on old behaviour. + # XXX: Notes on old behaviour: + # await rx_chan.aclose() - # In the receive-only case, ``Portal.open_stream_from()`` should - # call this explicitly on teardown but additionally if for some - # reason stream consumer code tries to manually receive a new + # In the receive-only case, ``Portal.open_stream_from()`` used + # to rely on this call explicitly on teardown such that a new + # call to ``.receive()`` after ``rx_chan`` had been closed, would + # result in us raising a ``trio.EndOfChannel`` (since we + # remapped the ``trio.ClosedResourceError`). However, now if for some + # reason the stream's consumer code tries to manually receive a new # value before ``.aclose()`` is called **but** the far end has # stopped `.receive()` **must** raise ``trio.EndofChannel`` in - # order to avoid an infinite hang on ``.__anext__()``. So we can - # instead uncomment this check and close the underlying msg-loop - # mem chan below and not then **not** check for ``self._eoc`` in - # ``.receive()`` (if for some reason we think that check is - # a bottle neck - not likely) such that the - # ``trio.ClosedResourceError`` would instead trigger the - # ``trio.EndOfChannel`` in ``.receive()`` (as it originally was - # before bi-dir streaming support). + # order to avoid an infinite hang on ``.__anext__()``; this is + # why we added ``self._eoc`` to denote stream closure indepedent + # of ``rx_chan``. - # if not isinstance(self, MsgStream): - # await rx_chan.aclose() + # In theory we could still use this old method and close the + # underlying msg-loop mem chan as above and then **not** check + # for ``self._eoc`` in ``.receive()`` (if for some reason we + # think that check is a bottle neck - not likely) **but** then + # we would need to map the resulting + # ``trio.ClosedResourceError`` to a ``trio.EndOfChannel`` in + # ``.receive()`` (as it originally was before bi-dir streaming + # support) in order to trigger stream closure. The old behaviour + # is arguably more confusing since we lose detection of the + # runtime's closure of ``rx_chan`` in the case where we may + # still need to consume msgs that are "in transit" from the far + # end (eg. for ``Context.result()``). # TODO: but make it broadcasting to consumers # def clone(self): @@ -260,6 +277,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): self, data: Any ) -> None: + '''Send a message over this stream to the far end. + + ''' await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -418,7 +438,8 @@ class Context: yield rchan except trio.EndOfChannel: - # stream iteration stop signal + # likely the far end sent us a 'stop' message to + # terminate the stream. raise else: