forked from goodboy/tractor
				
			Add pre-stream open error conditions
							parent
							
								
									ff0226cd22
								
							
						
					
					
						commit
						3a52feca74
					
				|  | @ -15,7 +15,7 @@ import warnings | |||
| import trio | ||||
| 
 | ||||
| from ._ipc import Channel | ||||
| from ._exceptions import unpack_error | ||||
| from ._exceptions import unpack_error, ContextCancelled | ||||
| from ._state import current_actor | ||||
| from .log import get_logger | ||||
| 
 | ||||
|  | @ -135,16 +135,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
| 
 | ||||
|             raise  # propagate | ||||
| 
 | ||||
|         # except trio.Cancelled: | ||||
|         #     if not self._shielded: | ||||
|         #         # if shielded we don't propagate a cancelled | ||||
|         #         raise | ||||
| 
 | ||||
|         # except trio.Cancelled: | ||||
|         #     # relay cancels to the remote task | ||||
|         #     await self.aclose() | ||||
|         #     raise | ||||
| 
 | ||||
|     @contextmanager | ||||
|     def shield( | ||||
|         self | ||||
|  | @ -171,7 +161,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): | |||
|         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose | ||||
|         rx_chan = self._rx_chan | ||||
| 
 | ||||
|         if rx_chan._closed: # or self._eoc: | ||||
|         if rx_chan._closed: | ||||
|             log.warning(f"{self} is already closed") | ||||
| 
 | ||||
|             # this stream has already been closed so silently succeed as | ||||
|  | @ -440,19 +430,25 @@ class Context: | |||
|             self.cid | ||||
|         ) | ||||
| 
 | ||||
|         # XXX: If the underlying receive mem chan has been closed then | ||||
|         # likely client code has already exited a ``.open_stream()`` | ||||
|         # block prior. we error here until such a time that we decide | ||||
|         # allowing streams to be "re-connected" is supported and/or | ||||
|         # a good idea. | ||||
|         if recv_chan._closed: | ||||
|         # Likewise if the surrounding context has been cancelled we error here | ||||
|         # since it likely means the surrounding block was exited or | ||||
|         # killed | ||||
| 
 | ||||
|         if self._cancel_called: | ||||
|             task = trio.lowlevel.current_task().name | ||||
|             raise trio.ClosedResourceError( | ||||
|                 f'stream for {actor.uid[0]}:{task} has already been closed.' | ||||
|                 '\nRe-opening a closed stream is not yet supported!' | ||||
|                 '\nConsider re-calling the containing `@tractor.context` func' | ||||
|             raise ContextCancelled( | ||||
|                 f'Context around {actor.uid[0]}:{task} was already cancelled!' | ||||
|             ) | ||||
| 
 | ||||
|         # XXX: If the underlying channel feeder receive mem chan has | ||||
|         # been closed then likely client code has already exited | ||||
|         # a ``.open_stream()`` block prior or there was some other | ||||
|         # unanticipated error or cancellation from ``trio``. | ||||
| 
 | ||||
|         if recv_chan._closed: | ||||
|             raise trio.ClosedResourceError( | ||||
|                 'The underlying channel for this stream was already closed!?') | ||||
| 
 | ||||
|         async with MsgStream( | ||||
|             ctx=self, | ||||
|             rx_chan=recv_chan, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue