forked from goodboy/tractor
1
0
Fork 0

Add pre-stream open error conditions

context_finesse
Tyler Goodlet 2021-06-30 13:47:38 -04:00
parent 477d0bc697
commit aa6902fcfc
1 changed files with 18 additions and 22 deletions

View File

@ -15,7 +15,7 @@ import warnings
import trio
from ._ipc import Channel
from ._exceptions import unpack_error
from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor
from .log import get_logger
@ -135,16 +135,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
raise # propagate
# except trio.Cancelled:
# if not self._shielded:
# # if shielded we don't propagate a cancelled
# raise
# except trio.Cancelled:
# # relay cancels to the remote task
# await self.aclose()
# raise
@contextmanager
def shield(
self
@ -171,7 +161,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan
if rx_chan._closed: # or self._eoc:
if rx_chan._closed:
log.warning(f"{self} is already closed")
# this stream has already been closed so silently succeed as
@ -440,19 +430,25 @@ class Context:
self.cid
)
# XXX: If the underlying receive mem chan has been closed then
# likely client code has already exited a ``.open_stream()``
# block prior. we error here until such a time that we decide
# allowing streams to be "re-connected" is supported and/or
# a good idea.
if recv_chan._closed:
# Likewise if the surrounding context has been cancelled we error here
# since it likely means the surrounding block was exited or
# killed
if self._cancel_called:
task = trio.lowlevel.current_task().name
raise trio.ClosedResourceError(
f'stream for {actor.uid[0]}:{task} has already been closed.'
'\nRe-opening a closed stream is not yet supported!'
'\nConsider re-calling the containing `@tractor.context` func'
raise ContextCancelled(
f'Context around {actor.uid[0]}:{task} was already cancelled!'
)
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?')
async with MsgStream(
ctx=self,
rx_chan=recv_chan,