diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index c88e0dfe..4242a543 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -17,6 +17,7 @@ from tractor import ( MsgStream, _testing, trionics, + TransportClosed, ) import trio import pytest @@ -208,12 +209,16 @@ async def main( # TODO: is this needed or no? raise - except trio.ClosedResourceError: + except ( + trio.ClosedResourceError, + TransportClosed, + ) as _tpt_err: # NOTE: don't send if we already broke the # connection to avoid raising a closed-error # such that we drop through to the ctl-c # mashing by user. - await trio.sleep(0.01) + with trio.CancelScope(shield=True): + await trio.sleep(0.01) # timeout: int = 1 # with trio.move_on_after(timeout) as cs: @@ -247,6 +252,7 @@ async def main( await stream.send(i) pytest.fail('stream not closed?') except ( + TransportClosed, trio.ClosedResourceError, trio.EndOfChannel, ) as send_err: diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 061ae5aa..cfd9cd8b 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -98,7 +98,8 @@ def test_ipc_channel_break_during_stream( expect_final_exc = TransportClosed mod: ModuleType = import_path( - examples_dir() / 'advanced_faults' + examples_dir() + / 'advanced_faults' / 'ipc_failure_during_stream.py', root=examples_dir(), consider_namespace_packages=False, @@ -113,8 +114,9 @@ def test_ipc_channel_break_during_stream( if ( # only expect EoC if trans is broken on the child side, ipc_break['break_child_ipc_after'] is not False + and # AND we tell the child to call `MsgStream.aclose()`. - and pre_aclose_msgstream + pre_aclose_msgstream ): # expect_final_exc = trio.EndOfChannel # ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this @@ -160,7 +162,8 @@ def test_ipc_channel_break_during_stream( ipc_break['break_child_ipc_after'] is not False and ( ipc_break['break_parent_ipc_after'] - > ipc_break['break_child_ipc_after'] + > + ipc_break['break_child_ipc_after'] ) ): if pre_aclose_msgstream: @@ -248,8 +251,13 @@ def test_ipc_channel_break_during_stream( # get raw instance from pytest wrapper value = excinfo.value if isinstance(value, ExceptionGroup): - excs = value.exceptions - assert len(excs) == 1 + excs: tuple[Exception] = value.exceptions + assert ( + len(excs) <= 2 + and + (isinstance(exc, TransportClosed) + for exc in excs) + ) final_exc = excs[0] assert isinstance(final_exc, expect_final_exc) diff --git a/tractor/_context.py b/tractor/_context.py index 8ace1b4d..c5b4afc5 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -70,6 +70,7 @@ from ._exceptions import ( MsgTypeError, RemoteActorError, StreamOverrun, + TransportClosed, pack_from_raise, unpack_error, ) @@ -2431,6 +2432,7 @@ async def open_context_from_portal( except ( trio.BrokenResourceError, trio.ClosedResourceError, + TransportClosed, ): log.warning( 'IPC connection for context is broken?\n' diff --git a/tractor/_streaming.py b/tractor/_streaming.py index fb870f1c..cf3eaab0 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -38,6 +38,7 @@ import trio from ._exceptions import ( ContextCancelled, RemoteActorError, + TransportClosed, ) from .log import get_logger from .trionics import ( @@ -412,7 +413,8 @@ class MsgStream(trio.abc.Channel): except ( trio.BrokenResourceError, - trio.ClosedResourceError + trio.ClosedResourceError, + TransportClosed, ) as re: # the underlying channel may already have been pulled # in which case our stop message is meaningless since @@ -596,6 +598,7 @@ class MsgStream(trio.abc.Channel): trio.ClosedResourceError, trio.BrokenResourceError, BrokenPipeError, + TransportClosed, ) as _trans_err: trans_err = _trans_err if (