diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py new file mode 100644 index 0000000..6728b8d --- /dev/null +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -0,0 +1,151 @@ +''' +Complex edge case where during real-time streaming the IPC tranport +channels are wiped out (purposely in this example though it could have +been an outage) and we want to ensure that despite being in debug mode +(or not) the user can sent SIGINT once they notice the hang and the +actor tree will eventually be cancelled without leaving any zombies. + +''' +import trio +from tractor import ( + open_nursery, + context, + Context, + MsgStream, +) + + +async def break_channel_silently_then_error( + stream: MsgStream, +): + async for msg in stream: + await stream.send(msg) + + # XXX: close the channel right after an error is raised + # purposely breaking the IPC transport to make sure the parent + # doesn't get stuck in debug or hang on the connection join. + # this more or less simulates an infinite msg-receive hang on + # the other end. + await stream._ctx.chan.send(None) + assert 0 + + +async def close_stream_and_error( + stream: MsgStream, +): + async for msg in stream: + await stream.send(msg) + + # wipe out channel right before raising + await stream._ctx.chan.send(None) + await stream.aclose() + assert 0 + + +@context +async def recv_and_spawn_net_killers( + + ctx: Context, + break_ipc_after: bool | int = False, + +) -> None: + ''' + Receive stream msgs and spawn some IPC killers mid-stream. + + ''' + await ctx.started() + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): + async for i in stream: + print(f'child echoing {i}') + await stream.send(i) + if ( + break_ipc_after + and i > break_ipc_after + ): + '#################################\n' + 'Simulating child-side IPC BREAK!\n' + '#################################' + n.start_soon(break_channel_silently_then_error, stream) + n.start_soon(close_stream_and_error, stream) + + +async def main( + debug_mode: bool = False, + start_method: str = 'trio', + + # by default we break the parent IPC first (if configured to break + # at all), but this can be changed so the child does first (even if + # both are set to break). + break_parent_ipc_after: int | bool = False, + break_child_ipc_after: int | bool = False, + +) -> None: + + async with ( + open_nursery( + start_method=start_method, + + # NOTE: even debugger is used we shouldn't get + # a hang since it never engages due to broken IPC + debug_mode=debug_mode, + loglevel='warning', + + ) as an, + ): + portal = await an.start_actor( + 'chitty_hijo', + enable_modules=[__name__], + ) + + async with portal.open_context( + recv_and_spawn_net_killers, + break_ipc_after=break_child_ipc_after, + + ) as (ctx, sent): + async with ctx.open_stream() as stream: + for i in range(1000): + + if ( + break_parent_ipc_after + and i > break_parent_ipc_after + ): + print( + '#################################\n' + 'Simulating parent-side IPC BREAK!\n' + '#################################' + ) + await stream._ctx.chan.send(None) + + # it actually breaks right here in the + # mp_spawn/forkserver backends and thus the zombie + # reaper never even kicks in? + print(f'parent sending {i}') + await stream.send(i) + + with trio.move_on_after(2) as cs: + + # NOTE: in the parent side IPC failure case this + # will raise an ``EndOfChannel`` after the child + # is killed and sends a stop msg back to it's + # caller/this-parent. + rx = await stream.receive() + + print(f"I'm a happy user and echoed to me is {rx}") + + if cs.cancelled_caught: + # pretend to be a user seeing no streaming action + # thinking it's a hang, and then hitting ctl-c.. + print("YOO i'm a user anddd thingz hangin..") + + print( + "YOO i'm mad send side dun but thingz hangin..\n" + 'MASHING CTlR-C Ctl-c..' + ) + raise KeyboardInterrupt + + +if __name__ == '__main__': + trio.run(main) diff --git a/nooz/346.bugfix.rst b/nooz/346.bugfix.rst new file mode 100644 index 0000000..6ffe636 --- /dev/null +++ b/nooz/346.bugfix.rst @@ -0,0 +1,15 @@ +Fixes to ensure IPC (channel) breakage doesn't result in hung actor +trees; the zombie reaping and general supervision machinery will always +clean up and terminate. + +This includes not only the (mostly minor) fixes to solve these cases but +also a new extensive test suite in `test_advanced_faults.py` with an +accompanying highly configurable example module-script in +`examples/advanced_faults/ipc_failure_during_stream.py`. Tests ensure we +never get hang or zombies despite operating in debug mode and attempt to +simulate all possible IPC transport failure cases for a local-host actor +tree. + +Further we simplify `Context.open_stream.__aexit__()` to just call +`MsgStream.aclose()` directly more or less avoiding a pure duplicate +code path. diff --git a/tests/conftest.py b/tests/conftest.py index af42b85..3363cf5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,7 @@ import os import random import signal import platform +import pathlib import time import inspect from functools import partial, wraps @@ -113,14 +114,21 @@ no_windows = pytest.mark.skipif( ) -def repodir(): - """Return the abspath to the repo directory. - """ - dirname = os.path.dirname - dirpath = os.path.abspath( - dirname(dirname(os.path.realpath(__file__))) - ) - return dirpath +def repodir() -> pathlib.Path: + ''' + Return the abspath to the repo directory. + + ''' + # 2 parents up to step up through tests/ + return pathlib.Path(__file__).parent.parent.absolute() + + +def examples_dir() -> pathlib.Path: + ''' + Return the abspath to the examples directory as `pathlib.Path`. + + ''' + return repodir() / 'examples' def pytest_addoption(parser): @@ -151,7 +159,7 @@ def loglevel(request): @pytest.fixture(scope='session') -def spawn_backend(request): +def spawn_backend(request) -> str: return request.config.option.spawn_backend diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py new file mode 100644 index 0000000..a48866e --- /dev/null +++ b/tests/test_advanced_faults.py @@ -0,0 +1,193 @@ +''' +Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la +cancelacion?.. + +''' +from functools import partial + +import pytest +from _pytest.pathlib import import_path +import trio +import tractor + +from conftest import ( + examples_dir, +) + + +@pytest.mark.parametrize( + 'debug_mode', + [False, True], + ids=['no_debug_mode', 'debug_mode'], +) +@pytest.mark.parametrize( + 'ipc_break', + [ + # no breaks + { + 'break_parent_ipc_after': False, + 'break_child_ipc_after': False, + }, + + # only parent breaks + { + 'break_parent_ipc_after': 500, + 'break_child_ipc_after': False, + }, + + # only child breaks + { + 'break_parent_ipc_after': False, + 'break_child_ipc_after': 500, + }, + + # both: break parent first + { + 'break_parent_ipc_after': 500, + 'break_child_ipc_after': 800, + }, + # both: break child first + { + 'break_parent_ipc_after': 800, + 'break_child_ipc_after': 500, + }, + + ], + ids=[ + 'no_break', + 'break_parent', + 'break_child', + 'break_both_parent_first', + 'break_both_child_first', + ], +) +def test_ipc_channel_break_during_stream( + debug_mode: bool, + spawn_backend: str, + ipc_break: dict | None, +): + ''' + Ensure we can have an IPC channel break its connection during + streaming and it's still possible for the (simulated) user to kill + the actor tree using SIGINT. + + We also verify the type of connection error expected in the parent + depending on which side if the IPC breaks first. + + ''' + if spawn_backend != 'trio': + if debug_mode: + pytest.skip('`debug_mode` only supported on `trio` spawner') + + # non-`trio` spawners should never hit the hang condition that + # requires the user to do ctl-c to cancel the actor tree. + expect_final_exc = trio.ClosedResourceError + + mod = import_path( + examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py', + root=examples_dir(), + ) + + expect_final_exc = KeyboardInterrupt + + # when ONLY the child breaks we expect the parent to get a closed + # resource error on the next `MsgStream.receive()` and then fail out + # and cancel the child from there. + if ( + + # only child breaks + ( + ipc_break['break_child_ipc_after'] + and ipc_break['break_parent_ipc_after'] is False + ) + + # both break but, parent breaks first + or ( + ipc_break['break_child_ipc_after'] is not False + and ( + ipc_break['break_parent_ipc_after'] + > ipc_break['break_child_ipc_after'] + ) + ) + + ): + expect_final_exc = trio.ClosedResourceError + + # when the parent IPC side dies (even if the child's does as well + # but the child fails BEFORE the parent) we expect the channel to be + # sent a stop msg from the child at some point which will signal the + # parent that the stream has been terminated. + # NOTE: when the parent breaks "after" the child you get this same + # case as well, the child breaks the IPC channel with a stop msg + # before any closure takes place. + elif ( + # only parent breaks + ( + ipc_break['break_parent_ipc_after'] + and ipc_break['break_child_ipc_after'] is False + ) + + # both break but, child breaks first + or ( + ipc_break['break_parent_ipc_after'] is not False + and ( + ipc_break['break_child_ipc_after'] + > ipc_break['break_parent_ipc_after'] + ) + ) + ): + expect_final_exc = trio.EndOfChannel + + with pytest.raises(expect_final_exc): + trio.run( + partial( + mod.main, + debug_mode=debug_mode, + start_method=spawn_backend, + **ipc_break, + ) + ) + + +@tractor.context +async def break_ipc_after_started( + ctx: tractor.Context, +) -> None: + await ctx.started() + async with ctx.open_stream() as stream: + await stream.aclose() + await trio.sleep(0.2) + await ctx.chan.send(None) + print('child broke IPC and terminating') + + +def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages(): + ''' + Verify that is a subactor's IPC goes down just after bringing up a stream + the parent can trigger a SIGINT and the child will be reaped out-of-IPC by + the localhost process supervision machinery: aka "zombie lord". + + ''' + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'ipc_breaker', + enable_modules=[__name__], + ) + + with trio.move_on_after(1): + async with ( + portal.open_context( + break_ipc_after_started + ) as (ctx, sent), + ): + async with ctx.open_stream(): + await trio.sleep(0.5) + + print('parent waiting on context') + + print('parent exited context') + raise KeyboardInterrupt + + with pytest.raises(KeyboardInterrupt): + trio.run(main) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 0793df5..7885034 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -14,6 +14,7 @@ import itertools from os import path from typing import Optional import platform +import pathlib import sys import time @@ -24,7 +25,10 @@ from pexpect.exceptions import ( EOF, ) -from conftest import repodir, _ci_env +from conftest import ( + examples_dir, + _ci_env, +) # TODO: The next great debugger audit could be done by you! # - recurrent entry to breakpoint() from single actor *after* and an @@ -43,19 +47,13 @@ if platform.system() == 'Windows': ) -def examples_dir(): - """Return the abspath to the examples directory. - """ - return path.join(repodir(), 'examples', 'debugging/') - - def mk_cmd(ex_name: str) -> str: - """Generate a command suitable to pass to ``pexpect.spawn()``. - """ - return ' '.join( - ['python', - path.join(examples_dir(), f'{ex_name}.py')] - ) + ''' + Generate a command suitable to pass to ``pexpect.spawn()``. + + ''' + script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py' + return ' '.join(['python', str(script_path)]) # TODO: was trying to this xfail style but some weird bug i see in CI diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 4139836..f134c71 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -12,17 +12,17 @@ import shutil import pytest -from conftest import repodir - - -def examples_dir(): - """Return the abspath to the examples directory. - """ - return os.path.join(repodir(), 'examples') +from conftest import ( + examples_dir, +) @pytest.fixture -def run_example_in_subproc(loglevel, testdir, arb_addr): +def run_example_in_subproc( + loglevel: str, + testdir, + arb_addr: tuple[str, int], +): @contextmanager def run(script_code): @@ -32,8 +32,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): # on windows we need to create a special __main__.py which will # be executed with ``python -m `` on windows.. shutil.copyfile( - os.path.join(examples_dir(), '__main__.py'), - os.path.join(str(testdir), '__main__.py') + examples_dir() / '__main__.py', + str(testdir / '__main__.py'), ) # drop the ``if __name__ == '__main__'`` guard onwards from @@ -88,6 +88,7 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): and f[0] != '_' and 'debugging' not in p[0] and 'integration' not in p[0] + and 'advanced_faults' not in p[0] ], ids=lambda t: t[1], diff --git a/tests/test_streaming.py b/tests/test_legacy_one_way_streaming.py similarity index 99% rename from tests/test_streaming.py rename to tests/test_legacy_one_way_streaming.py index 4e54e02..17e94ba 100644 --- a/tests/test_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -251,7 +251,7 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend): results, diff = time_quad_ex assert results - this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.666 + this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3 assert diff < this_fast diff --git a/tractor/_portal.py b/tractor/_portal.py index 5546949..05504bd 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -497,6 +497,10 @@ class Portal: f'actor: {uid}' ) result = await ctx.result() + log.runtime( + f'Context {fn_name} returned ' + f'value from callee `{result}`' + ) # though it should be impossible for any tasks # operating *in* this scope to have survived @@ -518,12 +522,6 @@ class Portal: f'task:{cid}\n' f'actor:{uid}' ) - else: - log.runtime( - f'Context {fn_name} returned ' - f'value from callee `{result}`' - ) - # XXX: (MEGA IMPORTANT) if this is a root opened process we # wait for any immediate child in debug before popping the # context from the runtime msg loop otherwise inside diff --git a/tractor/_runtime.py b/tractor/_runtime.py index e826258..707b9dd 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -228,11 +228,11 @@ async def _invoke( fname = func.__name__ if ctx._cancel_called: - msg = f'{fname} cancelled itself' + msg = f'`{fname}()` cancelled itself' elif cs.cancel_called: msg = ( - f'{fname} was remotely cancelled by its caller ' + f'`{fname}()` was remotely cancelled by its caller ' f'{ctx.chan.uid}' ) @@ -319,7 +319,7 @@ async def _invoke( BrokenPipeError, ): # if we can't propagate the error that's a big boo boo - log.error( + log.exception( f"Failed to ship error to caller @ {chan.uid} !?" ) @@ -455,7 +455,7 @@ class Actor: self._mods: dict[str, ModuleType] = {} self.loglevel = loglevel - self._arb_addr = ( + self._arb_addr: tuple[str, int] | None = ( str(arbiter_addr[0]), int(arbiter_addr[1]) ) if arbiter_addr else None @@ -488,7 +488,10 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ tuple[Any, Any, Any, Any, Any]] = None - self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa + self._actoruid2nursery: dict[ + tuple[str, str], + ActorNursery | None, + ] = {} # type: ignore # noqa async def wait_for_peer( self, uid: tuple[str, str] @@ -826,7 +829,12 @@ class Actor: if ctx._backpressure: log.warning(text) - await send_chan.send(msg) + try: + await send_chan.send(msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{chan} is already closed") else: try: raise StreamOverrun(text) from None @@ -1371,10 +1379,12 @@ async def async_main( actor.lifetime_stack.close() # Unregister actor from the arbiter - if registered_with_arbiter and ( - actor._arb_addr is not None + if ( + registered_with_arbiter + and not actor.is_arbiter ): failed = False + assert isinstance(actor._arb_addr, tuple) with trio.move_on_after(0.5) as cs: cs.shield = True try: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 6c0ce5b..900aea2 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -260,7 +260,9 @@ async def soft_wait( if proc.poll() is None: # type: ignore log.warning( - f'Process still alive after cancel request:\n{uid}') + 'Actor still alive after cancel request:\n' + f'{uid}' + ) n.cancel_scope.cancel() raise diff --git a/tractor/_streaming.py b/tractor/_streaming.py index bb99dc5..699a906 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -97,6 +97,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if self._eoc: raise trio.EndOfChannel + if self._closed: + raise trio.ClosedResourceError('This stream was closed') + try: msg = await self._rx_chan.receive() return msg['yield'] @@ -110,6 +113,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # - 'error' # possibly just handle msg['stop'] here! + if self._closed: + raise trio.ClosedResourceError('This stream was closed') + if msg.get('stop') or self._eoc: log.debug(f"{self} was stopped at remote end") @@ -189,7 +195,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): return self._eoc = True - self._closed = True # NOTE: this is super subtle IPC messaging stuff: # Relay stop iteration to far end **iff** we're @@ -206,29 +211,32 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # In the bidirectional case, `Context.open_stream()` will create # the `Actor._cids2qs` entry from a call to - # `Actor.get_context()` and will send the stop message in - # ``__aexit__()`` on teardown so it **does not** need to be - # called here. - if not self._ctx._portal: - # Only for 2 way streams can we can send stop from the - # caller side. - try: - # NOTE: if this call is cancelled we expect this end to - # handle as though the stop was never sent (though if it - # was it shouldn't matter since it's unlikely a user - # will try to re-use a stream after attemping to close - # it). - with trio.CancelScope(shield=True): - await self._ctx.send_stop() + # `Actor.get_context()` and will call us here to send the stop + # msg in ``__aexit__()`` on teardown. + try: + # NOTE: if this call is cancelled we expect this end to + # handle as though the stop was never sent (though if it + # was it shouldn't matter since it's unlikely a user + # will try to re-use a stream after attemping to close + # it). + with trio.CancelScope(shield=True): + await self._ctx.send_stop() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - # the underlying channel may already have been pulled - # in which case our stop message is meaningless since - # it can't traverse the transport. - log.debug(f'Channel for {self} was already closed') + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # the underlying channel may already have been pulled + # in which case our stop message is meaningless since + # it can't traverse the transport. + ctx = self._ctx + log.warning( + f'Stream was already destroyed?\n' + f'actor: {ctx.chan.uid}\n' + f'ctx id: {ctx.cid}' + ) + + self._closed = True # Do we close the local mem chan ``self._rx_chan`` ??!? @@ -271,7 +279,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ) -> AsyncIterator[BroadcastReceiver]: - '''Allocate and return a ``BroadcastReceiver`` which delegates + ''' + Allocate and return a ``BroadcastReceiver`` which delegates to this message stream. This allows multiple local tasks to receive each their own copy @@ -593,23 +602,23 @@ class Context: async with MsgStream( ctx=self, rx_chan=ctx._recv_chan, - ) as rchan: + ) as stream: if self._portal: - self._portal._streams.add(rchan) + self._portal._streams.add(stream) try: self._stream_opened = True - # ensure we aren't cancelled before delivering - # the stream + # XXX: do we need this? + # ensure we aren't cancelled before yielding the stream # await trio.lowlevel.checkpoint() - yield rchan + yield stream - # XXX: Make the stream "one-shot use". On exit, signal + # NOTE: Make the stream "one-shot use". On exit, signal # ``trio.EndOfChannel``/``StopAsyncIteration`` to the # far end. - await self.send_stop() + await stream.aclose() finally: if self._portal: diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 9a7cf7f..5621f79 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -133,13 +133,13 @@ async def gather_contexts( # deliver control once all managers have started up await all_entered.wait() - # NOTE: order *should* be preserved in the output values - # since ``dict``s are now implicitly ordered. - yield tuple(unwrapped.values()) - - # we don't need a try/finally since cancellation will be triggered - # by the surrounding nursery on error. - parent_exit.set() + try: + yield tuple(unwrapped.values()) + finally: + # NOTE: this is ABSOLUTELY REQUIRED to avoid + # the following wacky bug: + # + parent_exit.set() # Per actor task caching helpers.