forked from goodboy/tractor
1
0
Fork 0

Add pre-stream open error conditions

bi_streaming
Tyler Goodlet 2021-06-30 13:47:38 -04:00
parent 6e75913480
commit 377b8c163c
1 changed files with 18 additions and 22 deletions

View File

@ -15,7 +15,7 @@ import warnings
import trio import trio
from ._ipc import Channel from ._ipc import Channel
from ._exceptions import unpack_error from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor from ._state import current_actor
from .log import get_logger from .log import get_logger
@ -135,16 +135,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
raise # propagate raise # propagate
# except trio.Cancelled:
# if not self._shielded:
# # if shielded we don't propagate a cancelled
# raise
# except trio.Cancelled:
# # relay cancels to the remote task
# await self.aclose()
# raise
@contextmanager @contextmanager
def shield( def shield(
self self
@ -171,7 +161,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan rx_chan = self._rx_chan
if rx_chan._closed: # or self._eoc: if rx_chan._closed:
log.warning(f"{self} is already closed") log.warning(f"{self} is already closed")
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
@ -440,19 +430,25 @@ class Context:
self.cid self.cid
) )
# XXX: If the underlying receive mem chan has been closed then # Likewise if the surrounding context has been cancelled we error here
# likely client code has already exited a ``.open_stream()`` # since it likely means the surrounding block was exited or
# block prior. we error here until such a time that we decide # killed
# allowing streams to be "re-connected" is supported and/or
# a good idea. if self._cancel_called:
if recv_chan._closed:
task = trio.lowlevel.current_task().name task = trio.lowlevel.current_task().name
raise trio.ClosedResourceError( raise ContextCancelled(
f'stream for {actor.uid[0]}:{task} has already been closed.' f'Context around {actor.uid[0]}:{task} was already cancelled!'
'\nRe-opening a closed stream is not yet supported!'
'\nConsider re-calling the containing `@tractor.context` func'
) )
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?')
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=recv_chan, rx_chan=recv_chan,