forked from goodboy/tractor
				
			Compare commits
	
		
			1 Commits 
		
	
	
		
			master
			...
			stream_clo
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 6f9ef99776 | 
|  | @ -71,15 +71,18 @@ class ReceiveStream(trio.abc.ReceiveChannel): | ||||||
|         try: |         try: | ||||||
|             msg = await self._rx_chan.receive() |             msg = await self._rx_chan.receive() | ||||||
|             return msg['yield'] |             return msg['yield'] | ||||||
|  | 
 | ||||||
|         except trio.ClosedResourceError: |         except trio.ClosedResourceError: | ||||||
|             # when the send is closed we assume the stream has |             # when the send is closed we assume the stream has | ||||||
|             # terminated and signal this local iterator to stop |             # terminated and signal this local iterator to stop | ||||||
|             await self.aclose() |             await self.aclose() | ||||||
|             raise StopAsyncIteration |             raise StopAsyncIteration | ||||||
|  | 
 | ||||||
|         except trio.Cancelled: |         except trio.Cancelled: | ||||||
|             # relay cancels to the remote task |             # relay cancels to the remote task | ||||||
|             await self.aclose() |             await self.aclose() | ||||||
|             raise |             raise | ||||||
|  | 
 | ||||||
|         except KeyError: |         except KeyError: | ||||||
|             # internal error should never get here |             # internal error should never get here | ||||||
|             assert msg.get('cid'), ( |             assert msg.get('cid'), ( | ||||||
|  | @ -102,14 +105,25 @@ class ReceiveStream(trio.abc.ReceiveChannel): | ||||||
|         """Cancel associated remote actor task and local memory channel |         """Cancel associated remote actor task and local memory channel | ||||||
|         on close. |         on close. | ||||||
|         """ |         """ | ||||||
|         if self._rx_chan._closed: |         rx_chan = self._rx_chan | ||||||
|  |         stats = rx_chan.statistics() | ||||||
|  | 
 | ||||||
|  |         if rx_chan._closed: | ||||||
|             log.warning(f"{self} is already closed") |             log.warning(f"{self} is already closed") | ||||||
|             return |             return | ||||||
| 
 | 
 | ||||||
|  |         if stats.open_receive_channels > 1: | ||||||
|  |             # if we've been cloned don't kill the stream | ||||||
|  |             log.debug("there are still consumers running keeping stream alive") | ||||||
|  |             return | ||||||
|  | 
 | ||||||
|         if self._shielded: |         if self._shielded: | ||||||
|             log.warning(f"{self} is shielded, portal channel being kept alive") |             log.warning(f"{self} is shielded, portal channel being kept alive") | ||||||
|             return |             return | ||||||
| 
 | 
 | ||||||
|  |         # close the local mem chan | ||||||
|  |         rx_chan.close() | ||||||
|  | 
 | ||||||
|         cid = self._cid |         cid = self._cid | ||||||
|         with trio.move_on_after(0.5) as cs: |         with trio.move_on_after(0.5) as cs: | ||||||
|             cs.shield = True |             cs.shield = True | ||||||
|  | @ -131,11 +145,16 @@ class ReceiveStream(trio.abc.ReceiveChannel): | ||||||
|                     "May have failed to cancel remote task " |                     "May have failed to cancel remote task " | ||||||
|                     f"{cid} for {self._portal.channel.uid}") |                     f"{cid} for {self._portal.channel.uid}") | ||||||
| 
 | 
 | ||||||
|         with trio.CancelScope(shield=True): |  | ||||||
|             await self._rx_chan.aclose() |  | ||||||
| 
 |  | ||||||
|     def clone(self): |     def clone(self): | ||||||
|         return self |         """Clone this receive channel allowing for multi-task | ||||||
|  |         consumption from the same channel. | ||||||
|  | 
 | ||||||
|  |         """ | ||||||
|  |         return ReceiveStream( | ||||||
|  |             self._cid, | ||||||
|  |             self._rx_chan.clone(), | ||||||
|  |             self._portal, | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class Portal: | class Portal: | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue