forked from goodboy/tractor
				
			Add pre-stream open error conditions
							parent
							
								
									c6cdaf9c31
								
							
						
					
					
						commit
						b21e2a6caa
					
				| 
						 | 
					@ -15,7 +15,7 @@ import warnings
 | 
				
			||||||
import trio
 | 
					import trio
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from ._ipc import Channel
 | 
					from ._ipc import Channel
 | 
				
			||||||
from ._exceptions import unpack_error
 | 
					from ._exceptions import unpack_error, ContextCancelled
 | 
				
			||||||
from ._state import current_actor
 | 
					from ._state import current_actor
 | 
				
			||||||
from .log import get_logger
 | 
					from .log import get_logger
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -135,16 +135,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            raise  # propagate
 | 
					            raise  # propagate
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # except trio.Cancelled:
 | 
					 | 
				
			||||||
        #     if not self._shielded:
 | 
					 | 
				
			||||||
        #         # if shielded we don't propagate a cancelled
 | 
					 | 
				
			||||||
        #         raise
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # except trio.Cancelled:
 | 
					 | 
				
			||||||
        #     # relay cancels to the remote task
 | 
					 | 
				
			||||||
        #     await self.aclose()
 | 
					 | 
				
			||||||
        #     raise
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @contextmanager
 | 
					    @contextmanager
 | 
				
			||||||
    def shield(
 | 
					    def shield(
 | 
				
			||||||
        self
 | 
					        self
 | 
				
			||||||
| 
						 | 
					@ -171,7 +161,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
 | 
				
			||||||
        # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
 | 
					        # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
 | 
				
			||||||
        rx_chan = self._rx_chan
 | 
					        rx_chan = self._rx_chan
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if rx_chan._closed: # or self._eoc:
 | 
					        if rx_chan._closed:
 | 
				
			||||||
            log.warning(f"{self} is already closed")
 | 
					            log.warning(f"{self} is already closed")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # this stream has already been closed so silently succeed as
 | 
					            # this stream has already been closed so silently succeed as
 | 
				
			||||||
| 
						 | 
					@ -440,19 +430,25 @@ class Context:
 | 
				
			||||||
            self.cid
 | 
					            self.cid
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # XXX: If the underlying receive mem chan has been closed then
 | 
					        # Likewise if the surrounding context has been cancelled we error here
 | 
				
			||||||
        # likely client code has already exited a ``.open_stream()``
 | 
					        # since it likely means the surrounding block was exited or
 | 
				
			||||||
        # block prior. we error here until such a time that we decide
 | 
					        # killed
 | 
				
			||||||
        # allowing streams to be "re-connected" is supported and/or
 | 
					
 | 
				
			||||||
        # a good idea.
 | 
					        if self._cancel_called:
 | 
				
			||||||
        if recv_chan._closed:
 | 
					 | 
				
			||||||
            task = trio.lowlevel.current_task().name
 | 
					            task = trio.lowlevel.current_task().name
 | 
				
			||||||
            raise trio.ClosedResourceError(
 | 
					            raise ContextCancelled(
 | 
				
			||||||
                f'stream for {actor.uid[0]}:{task} has already been closed.'
 | 
					                f'Context around {actor.uid[0]}:{task} was already cancelled!'
 | 
				
			||||||
                '\nRe-opening a closed stream is not yet supported!'
 | 
					 | 
				
			||||||
                '\nConsider re-calling the containing `@tractor.context` func'
 | 
					 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # XXX: If the underlying channel feeder receive mem chan has
 | 
				
			||||||
 | 
					        # been closed then likely client code has already exited
 | 
				
			||||||
 | 
					        # a ``.open_stream()`` block prior or there was some other
 | 
				
			||||||
 | 
					        # unanticipated error or cancellation from ``trio``.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if recv_chan._closed:
 | 
				
			||||||
 | 
					            raise trio.ClosedResourceError(
 | 
				
			||||||
 | 
					                'The underlying channel for this stream was already closed!?')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with MsgStream(
 | 
					        async with MsgStream(
 | 
				
			||||||
            ctx=self,
 | 
					            ctx=self,
 | 
				
			||||||
            rx_chan=recv_chan,
 | 
					            rx_chan=recv_chan,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue