Compare commits
1 Commits
master
...
stream_clo
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 6f9ef99776 |
|
@ -71,15 +71,18 @@ class ReceiveStream(trio.abc.ReceiveChannel):
|
|||
try:
|
||||
msg = await self._rx_chan.receive()
|
||||
return msg['yield']
|
||||
|
||||
except trio.ClosedResourceError:
|
||||
# when the send is closed we assume the stream has
|
||||
# terminated and signal this local iterator to stop
|
||||
await self.aclose()
|
||||
raise StopAsyncIteration
|
||||
|
||||
except trio.Cancelled:
|
||||
# relay cancels to the remote task
|
||||
await self.aclose()
|
||||
raise
|
||||
|
||||
except KeyError:
|
||||
# internal error should never get here
|
||||
assert msg.get('cid'), (
|
||||
|
@ -102,14 +105,25 @@ class ReceiveStream(trio.abc.ReceiveChannel):
|
|||
"""Cancel associated remote actor task and local memory channel
|
||||
on close.
|
||||
"""
|
||||
if self._rx_chan._closed:
|
||||
rx_chan = self._rx_chan
|
||||
stats = rx_chan.statistics()
|
||||
|
||||
if rx_chan._closed:
|
||||
log.warning(f"{self} is already closed")
|
||||
return
|
||||
|
||||
if stats.open_receive_channels > 1:
|
||||
# if we've been cloned don't kill the stream
|
||||
log.debug("there are still consumers running keeping stream alive")
|
||||
return
|
||||
|
||||
if self._shielded:
|
||||
log.warning(f"{self} is shielded, portal channel being kept alive")
|
||||
return
|
||||
|
||||
# close the local mem chan
|
||||
rx_chan.close()
|
||||
|
||||
cid = self._cid
|
||||
with trio.move_on_after(0.5) as cs:
|
||||
cs.shield = True
|
||||
|
@ -131,11 +145,16 @@ class ReceiveStream(trio.abc.ReceiveChannel):
|
|||
"May have failed to cancel remote task "
|
||||
f"{cid} for {self._portal.channel.uid}")
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
await self._rx_chan.aclose()
|
||||
|
||||
def clone(self):
|
||||
return self
|
||||
"""Clone this receive channel allowing for multi-task
|
||||
consumption from the same channel.
|
||||
|
||||
"""
|
||||
return ReceiveStream(
|
||||
self._cid,
|
||||
self._rx_chan.clone(),
|
||||
self._portal,
|
||||
)
|
||||
|
||||
|
||||
class Portal:
|
||||
|
|
Loading…
Reference in New Issue