From 6f9ef99776cc822ce38ad3abd2b5c78c1d5b9956 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 15 Feb 2021 20:36:33 -0500 Subject: [PATCH] Add support for stream cloning --- tractor/_portal.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 9ad4c2f..ca56b68 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -71,15 +71,18 @@ class ReceiveStream(trio.abc.ReceiveChannel): try: msg = await self._rx_chan.receive() return msg['yield'] + except trio.ClosedResourceError: # when the send is closed we assume the stream has # terminated and signal this local iterator to stop await self.aclose() raise StopAsyncIteration + except trio.Cancelled: # relay cancels to the remote task await self.aclose() raise + except KeyError: # internal error should never get here assert msg.get('cid'), ( @@ -102,14 +105,25 @@ class ReceiveStream(trio.abc.ReceiveChannel): """Cancel associated remote actor task and local memory channel on close. """ - if self._rx_chan._closed: + rx_chan = self._rx_chan + stats = rx_chan.statistics() + + if rx_chan._closed: log.warning(f"{self} is already closed") return + if stats.open_receive_channels > 1: + # if we've been cloned don't kill the stream + log.debug("there are still consumers running keeping stream alive") + return + if self._shielded: log.warning(f"{self} is shielded, portal channel being kept alive") return + # close the local mem chan + rx_chan.close() + cid = self._cid with trio.move_on_after(0.5) as cs: cs.shield = True @@ -131,11 +145,16 @@ class ReceiveStream(trio.abc.ReceiveChannel): "May have failed to cancel remote task " f"{cid} for {self._portal.channel.uid}") - with trio.CancelScope(shield=True): - await self._rx_chan.aclose() - def clone(self): - return self + """Clone this receive channel allowing for multi-task + consumption from the same channel. + + """ + return ReceiveStream( + self._cid, + self._rx_chan.clone(), + self._portal, + ) class Portal: