forked from goodboy/tractor
1
0
Fork 0

Add support for stream cloning

stream_clones
Tyler Goodlet 2021-02-15 20:36:33 -05:00
parent 0f4f7f05cb
commit 6f9ef99776
1 changed files with 24 additions and 5 deletions

View File

@ -71,15 +71,18 @@ class ReceiveStream(trio.abc.ReceiveChannel):
try: try:
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
except trio.ClosedResourceError: except trio.ClosedResourceError:
# when the send is closed we assume the stream has # when the send is closed we assume the stream has
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
await self.aclose() await self.aclose()
raise StopAsyncIteration raise StopAsyncIteration
except trio.Cancelled: except trio.Cancelled:
# relay cancels to the remote task # relay cancels to the remote task
await self.aclose() await self.aclose()
raise raise
except KeyError: except KeyError:
# internal error should never get here # internal error should never get here
assert msg.get('cid'), ( assert msg.get('cid'), (
@ -102,14 +105,25 @@ class ReceiveStream(trio.abc.ReceiveChannel):
"""Cancel associated remote actor task and local memory channel """Cancel associated remote actor task and local memory channel
on close. on close.
""" """
if self._rx_chan._closed: rx_chan = self._rx_chan
stats = rx_chan.statistics()
if rx_chan._closed:
log.warning(f"{self} is already closed") log.warning(f"{self} is already closed")
return return
if stats.open_receive_channels > 1:
# if we've been cloned don't kill the stream
log.debug("there are still consumers running keeping stream alive")
return
if self._shielded: if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive") log.warning(f"{self} is shielded, portal channel being kept alive")
return return
# close the local mem chan
rx_chan.close()
cid = self._cid cid = self._cid
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
@ -131,11 +145,16 @@ class ReceiveStream(trio.abc.ReceiveChannel):
"May have failed to cancel remote task " "May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
with trio.CancelScope(shield=True):
await self._rx_chan.aclose()
def clone(self): def clone(self):
return self """Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return ReceiveStream(
self._cid,
self._rx_chan.clone(),
self._portal,
)
class Portal: class Portal: