diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 8c8d07e..38fbee4 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -313,12 +313,12 @@ async def test_respawn_consumer_task( task_status.started(cs) # shield stream's underlying channel from cancellation - with stream.shield(): + # with stream.shield(): - async for v in stream: - print(f'from stream: {v}') - expect.remove(v) - received.append(v) + async for v in stream: + print(f'from stream: {v}') + expect.remove(v) + received.append(v) print('exited consume') diff --git a/tractor/_streaming.py b/tractor/_streaming.py index eead6f6..c3c9490 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -46,11 +46,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.abc.ReceiveChannel, - shield: bool = False, ) -> None: self._ctx = ctx self._rx_chan = rx_chan - self._shielded = shield # flag to denote end of stream self._eoc: bool = False @@ -103,7 +101,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): except ( trio.ClosedResourceError, # by self._rx_chan trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end - trio.Cancelled, # by local cancellation + + # Wait why would we do an implicit close on cancel? THAT'S + # NOT HOW MEM CHANS WORK!!?!?!?!? + # trio.Cancelled, # by local cancellation ): # XXX: we close the stream on any of these error conditions: @@ -135,23 +136,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise # propagate - @contextmanager - def shield( - self - ) -> Iterator['ReceiveMsgStream']: # noqa - """Shield this stream's underlying channel such that a local consumer task - can be cancelled (and possibly restarted) using ``trio.Cancelled``. - - Note that here, "shielding" here guards against relaying - a ``'stop'`` message to the far end of the stream thus keeping - the stream machinery active and ready for further use, it does - not have anything to do with an internal ``trio.CancelScope``. - - """ - self._shielded = True - yield self - self._shielded = False - async def aclose(self): """Cancel associated remote actor task and local memory channel on close. @@ -169,18 +153,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose return - # TODO: broadcasting to multiple consumers - # stats = rx_chan.statistics() - # if stats.open_receive_channels > 1: - # # if we've been cloned don't kill the stream - # log.debug( - # "there are still consumers running keeping stream alive") - # return - - if self._shielded: - log.warning(f"{self} is shielded, portal channel being kept alive") - return - # XXX: This must be set **AFTER** the shielded test above! self._eoc = True @@ -397,7 +369,6 @@ class Context: async def open_stream( self, - shield: bool = False, ) -> AsyncGenerator[MsgStream, None]: '''Open a ``MsgStream``, a bi-directional stream connected to the @@ -455,7 +426,6 @@ class Context: async with MsgStream( ctx=self, rx_chan=recv_chan, - shield=shield, ) as rchan: if self._portal: