From af85d35685f617ce8b7f4bb154dcee83dd0e9daa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 20:47:50 -0400 Subject: [PATCH 1/3] Drop stream shielding; it was from a legacy design The whole origin was not having an explicit open/close semantic for streams. We have that now so this internal mechanic isn't needed and further our streams become more correct by having `.aclose()` be independent of cancellation. --- tests/test_streaming.py | 10 +++++----- tractor/_streaming.py | 38 ++++---------------------------------- 2 files changed, 9 insertions(+), 39 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 8c8d07e..38fbee4 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -313,12 +313,12 @@ async def test_respawn_consumer_task( task_status.started(cs) # shield stream's underlying channel from cancellation - with stream.shield(): + # with stream.shield(): - async for v in stream: - print(f'from stream: {v}') - expect.remove(v) - received.append(v) + async for v in stream: + print(f'from stream: {v}') + expect.remove(v) + received.append(v) print('exited consume') diff --git a/tractor/_streaming.py b/tractor/_streaming.py index eead6f6..c3c9490 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -46,11 +46,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.abc.ReceiveChannel, - shield: bool = False, ) -> None: self._ctx = ctx self._rx_chan = rx_chan - self._shielded = shield # flag to denote end of stream self._eoc: bool = False @@ -103,7 +101,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): except ( trio.ClosedResourceError, # by self._rx_chan trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end - trio.Cancelled, # by local cancellation + + # Wait why would we do an implicit close on cancel? THAT'S + # NOT HOW MEM CHANS WORK!!?!?!?!? + # trio.Cancelled, # by local cancellation ): # XXX: we close the stream on any of these error conditions: @@ -135,23 +136,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise # propagate - @contextmanager - def shield( - self - ) -> Iterator['ReceiveMsgStream']: # noqa - """Shield this stream's underlying channel such that a local consumer task - can be cancelled (and possibly restarted) using ``trio.Cancelled``. - - Note that here, "shielding" here guards against relaying - a ``'stop'`` message to the far end of the stream thus keeping - the stream machinery active and ready for further use, it does - not have anything to do with an internal ``trio.CancelScope``. - - """ - self._shielded = True - yield self - self._shielded = False - async def aclose(self): """Cancel associated remote actor task and local memory channel on close. @@ -169,18 +153,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose return - # TODO: broadcasting to multiple consumers - # stats = rx_chan.statistics() - # if stats.open_receive_channels > 1: - # # if we've been cloned don't kill the stream - # log.debug( - # "there are still consumers running keeping stream alive") - # return - - if self._shielded: - log.warning(f"{self} is shielded, portal channel being kept alive") - return - # XXX: This must be set **AFTER** the shielded test above! self._eoc = True @@ -397,7 +369,6 @@ class Context: async def open_stream( self, - shield: bool = False, ) -> AsyncGenerator[MsgStream, None]: '''Open a ``MsgStream``, a bi-directional stream connected to the @@ -455,7 +426,6 @@ class Context: async with MsgStream( ctx=self, rx_chan=recv_chan, - shield=shield, ) as rchan: if self._portal: From b4d95e9543908e969a796d10d1958e0a818d0fb3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Sep 2021 08:24:13 -0400 Subject: [PATCH 2/3] Update docs to new close semantics --- tractor/_streaming.py | 44 ++++++++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index c3c9490..5f04554 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -29,23 +29,26 @@ log = get_logger(__name__) class ReceiveMsgStream(trio.abc.ReceiveChannel): - """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with - special behaviour for signalling stream termination across an - inter-actor ``Channel``. This is the type returned to a local task - which invoked a remote streaming function using `Portal.run()`. + '''A IPC message stream for receiving logically sequenced values + over an inter-actor ``Channel``. This is the type returned to + a local task which entered either ``Portal.open_stream_from()`` or + ``Context.open_stream()``. Termination rules: - - if the local task signals stop iteration a cancel signal is - relayed to the remote task indicating to stop streaming - - if the remote task signals the end of a stream, raise - a ``StopAsyncIteration`` to terminate the local ``async for`` + - on cancellation the stream is **not** implicitly closed and the + surrounding ``Context`` is expected to handle how that cancel + is relayed to any task on the remote side. + - if the remote task signals the end of a stream the + ``ReceiveChannel`` semantics dictate that a ``StopAsyncIteration`` + to terminate the local ``async for``. - """ + ''' def __init__( self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.abc.ReceiveChannel, + ) -> None: self._ctx = ctx self._rx_chan = rx_chan @@ -59,13 +62,16 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): return msg['yield'] async def receive(self): + '''Async receive a single msg from the IPC transport, the next + in sequence for this stream. + + ''' # see ``.aclose()`` for notes on the old behaviour prior to # introducing this if self._eoc: raise trio.EndOfChannel try: - msg = await self._rx_chan.receive() return msg['yield'] @@ -101,10 +107,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): except ( trio.ClosedResourceError, # by self._rx_chan trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end - - # Wait why would we do an implicit close on cancel? THAT'S - # NOT HOW MEM CHANS WORK!!?!?!?!? - # trio.Cancelled, # by local cancellation ): # XXX: we close the stream on any of these error conditions: @@ -153,7 +155,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose return - # XXX: This must be set **AFTER** the shielded test above! self._eoc = True # NOTE: this is super subtle IPC messaging stuff: @@ -175,9 +176,14 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # ``__aexit__()`` on teardown so it **does not** need to be # called here. if not self._ctx._portal: + # Only for 2 way streams can we can send stop from the + # caller side. try: - # only for 2 way streams can we can send - # stop from the caller side + # NOTE: if this call is cancelled we expect this end to + # handle as though the stop was never sent (though if it + # was it shouldn't matter since it's unlikely a user + # will try to re-use a stream after attemping to close + # it). await self._ctx.send_stop() except ( @@ -189,9 +195,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # it can't traverse the transport. log.debug(f'Channel for {self} was already closed') - # close the local mem chan ``self._rx_chan`` ??!? + # Do we close the local mem chan ``self._rx_chan`` ??!? - # DEFINITELY NOT if we're a bi-dir ``MsgStream``! + # NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``! # BECAUSE this same core-msg-loop mem recv-chan is used to deliver # the potential final result from the surrounding inter-actor # `Context` so we don't want to close it until that context has From 558c44fdbec9119b197782bfab98a44d1bd3d73b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Sep 2021 08:33:29 -0400 Subject: [PATCH 3/3] Add newsfragment --- newsfragments/230.removal.rst | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 newsfragments/230.removal.rst diff --git a/newsfragments/230.removal.rst b/newsfragments/230.removal.rst new file mode 100644 index 0000000..afb7cdb --- /dev/null +++ b/newsfragments/230.removal.rst @@ -0,0 +1,9 @@ +Drop stream "shielding" support which was originally added to sidestep +a cancelled call to ``.receive()`` + +In the original api design a stream instance was returned directly from +a call to ``Portal.run()`` and thus there was no "exit phase" to handle +cancellations and errors which would trigger implicit closure. Now that +we have said enter/exit semantics with ``Portal.open_stream_from()`` and +``Context.open_stream()`` we can drop this implicit (and arguably +confusing) behavior.