diff --git a/tractor/_streaming.py b/tractor/_streaming.py index d69bd44..f23376a 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -15,7 +15,7 @@ import warnings import trio from ._ipc import Channel -from ._exceptions import unpack_error +from ._exceptions import unpack_error, ContextCancelled from ._state import current_actor from .log import get_logger @@ -135,16 +135,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): raise # propagate - # except trio.Cancelled: - # if not self._shielded: - # # if shielded we don't propagate a cancelled - # raise - - # except trio.Cancelled: - # # relay cancels to the remote task - # await self.aclose() - # raise - @contextmanager def shield( self @@ -171,7 +161,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose rx_chan = self._rx_chan - if rx_chan._closed: # or self._eoc: + if rx_chan._closed: log.warning(f"{self} is already closed") # this stream has already been closed so silently succeed as @@ -440,19 +430,25 @@ class Context: self.cid ) - # XXX: If the underlying receive mem chan has been closed then - # likely client code has already exited a ``.open_stream()`` - # block prior. we error here until such a time that we decide - # allowing streams to be "re-connected" is supported and/or - # a good idea. - if recv_chan._closed: + # Likewise if the surrounding context has been cancelled we error here + # since it likely means the surrounding block was exited or + # killed + + if self._cancel_called: task = trio.lowlevel.current_task().name - raise trio.ClosedResourceError( - f'stream for {actor.uid[0]}:{task} has already been closed.' - '\nRe-opening a closed stream is not yet supported!' - '\nConsider re-calling the containing `@tractor.context` func' + raise ContextCancelled( + f'Context around {actor.uid[0]}:{task} was already cancelled!' ) + # XXX: If the underlying channel feeder receive mem chan has + # been closed then likely client code has already exited + # a ``.open_stream()`` block prior or there was some other + # unanticipated error or cancellation from ``trio``. + + if recv_chan._closed: + raise trio.ClosedResourceError( + 'The underlying channel for this stream was already closed!?') + async with MsgStream( ctx=self, rx_chan=recv_chan,