From 4f977189c0f4ebcbee0f359e48d43a97c66deb65 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Nov 2022 19:22:33 -0500 Subject: [PATCH 01/23] Handle broken mem chan on `Actor._push_result()` When backpressure is used and a feeder mem chan breaks during msg delivery (usually because the IPC allocating task already terminated) instead of raising we simply warn as we do for the non-backpressure case. Also, add a proper `Actor.is_arbiter` test inside `._invoke()` to avoid doing an arbiter-registry lookup if the current actor **is** the registrar. --- tractor/_runtime.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index e826258..cbbb9ae 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -826,7 +826,12 @@ class Actor: if ctx._backpressure: log.warning(text) - await send_chan.send(msg) + try: + await send_chan.send(msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{chan} is already closed") else: try: raise StreamOverrun(text) from None @@ -1371,8 +1376,9 @@ async def async_main( actor.lifetime_stack.close() # Unregister actor from the arbiter - if registered_with_arbiter and ( - actor._arb_addr is not None + if ( + registered_with_arbiter + and not actor.is_arbiter ): failed = False with trio.move_on_after(0.5) as cs: From de04bbb2bb162bef44bd78f543e8de9d9e7cbaa0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Nov 2022 16:11:33 -0500 Subject: [PATCH 02/23] Don't raise on a broken IPC-context when sending stop msg --- tractor/_streaming.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index bb99dc5..11ff47d 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -271,7 +271,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ) -> AsyncIterator[BroadcastReceiver]: - '''Allocate and return a ``BroadcastReceiver`` which delegates + ''' + Allocate and return a ``BroadcastReceiver`` which delegates to this message stream. This allows multiple local tasks to receive each their own copy @@ -609,7 +610,14 @@ class Context: # XXX: Make the stream "one-shot use". On exit, signal # ``trio.EndOfChannel``/``StopAsyncIteration`` to the # far end. - await self.send_stop() + try: + await self.send_stop() + except trio.BrokenResourceError: + log.warning( + f"Couldn't close: stream already broken?\n" + f'actor: {self.chan.uid}\n' + f'ctx id: {self.cid}' + ) finally: if self._portal: From a4874a3227a94148940cd312f026f84eab3540f9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Nov 2022 19:10:59 -0500 Subject: [PATCH 03/23] Always set the `parent_exit: trio.Event` on exit --- tractor/trionics/_mngrs.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 9a7cf7f..5621f79 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -133,13 +133,13 @@ async def gather_contexts( # deliver control once all managers have started up await all_entered.wait() - # NOTE: order *should* be preserved in the output values - # since ``dict``s are now implicitly ordered. - yield tuple(unwrapped.values()) - - # we don't need a try/finally since cancellation will be triggered - # by the surrounding nursery on error. - parent_exit.set() + try: + yield tuple(unwrapped.values()) + finally: + # NOTE: this is ABSOLUTELY REQUIRED to avoid + # the following wacky bug: + # + parent_exit.set() # Per actor task caching helpers. From d27c081a15ef0a36d11526ed980298e072a9518b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 13 Dec 2022 00:23:27 -0500 Subject: [PATCH 04/23] Ensure arbiter sockaddr type before usage --- tractor/_runtime.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index cbbb9ae..cc780a7 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -455,7 +455,7 @@ class Actor: self._mods: dict[str, ModuleType] = {} self.loglevel = loglevel - self._arb_addr = ( + self._arb_addr: tuple[str, int] | None = ( str(arbiter_addr[0]), int(arbiter_addr[1]) ) if arbiter_addr else None @@ -1381,6 +1381,7 @@ async def async_main( and not actor.is_arbiter ): failed = False + assert isinstance(actor._arb_addr, tuple) with trio.move_on_after(0.5) as cs: cs.shield = True try: From 97d5f7233bfc016890c0370d81f43382b6e366c6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 17 Oct 2022 15:54:05 -0400 Subject: [PATCH 05/23] Fix uid2nursery lookup table type annot --- tractor/_runtime.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index cc780a7..2cb02e2 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -488,7 +488,10 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ tuple[Any, Any, Any, Any, Any]] = None - self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa + self._actoruid2nursery: dict[ + tuple[str, str], + ActorNursery | None, + ] = {} # type: ignore # noqa async def wait_for_peer( self, uid: tuple[str, str] From 158569adaeaf934a400ce8593fc16ae88ec0b454 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 26 Jan 2023 17:48:08 -0500 Subject: [PATCH 06/23] Add WIP example of silent IPC breaks while streaming --- examples/open_ctx_w_nursery.py | 96 ++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 examples/open_ctx_w_nursery.py diff --git a/examples/open_ctx_w_nursery.py b/examples/open_ctx_w_nursery.py new file mode 100644 index 0000000..99fc18d --- /dev/null +++ b/examples/open_ctx_w_nursery.py @@ -0,0 +1,96 @@ +import trio +from tractor import ( + open_nursery, + context, + Context, + MsgStream, +) + + +async def break_channel_silently( + stream: MsgStream, +): + async for msg in stream: + await stream.send(msg) + # XXX: close the channel right after an error is raised + # purposely breaking the IPC transport to make sure the parent + # doesn't get stuck in debug. this more or less simulates an + # infinite msg-receive hang on the other end. await + await stream._ctx.chan.send(None) + assert 0 + + +async def error_and_break_stream( + stream: MsgStream, +): + async for msg in stream: + # echo back msg + await stream.send(msg) + + try: + assert 0 + except Exception: + await stream._ctx.chan.send(None) + + # NOTE: doing this instead causes the error to propagate + # correctly. + # await stream.aclose() + raise + + +@context +async def just_sleep( + + ctx: Context, + **kwargs, + +) -> None: + ''' + Start and sleep. + + ''' + d = {} + await ctx.started() + try: + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): + for i in range(100): + await stream.send(i) + if i > 50: + n.start_soon(break_channel_silently, stream) + n.start_soon(error_and_break_stream, stream) + + finally: + d['10'] = 10 + + +async def main() -> None: + + async with open_nursery( + # loglevel='info', + debug_mode=True, + ) as n: + portal = await n.start_actor( + 'ctx_child', + + # XXX: we don't enable the current module in order + # to trigger `ModuleNotFound`. + enable_modules=[__name__], + + # add a debugger test to verify this works B) + # debug_mode=True, + ) + + async with portal.open_context( + just_sleep, # taken from pytest parameterization + ) as (ctx, sent): + async with ctx.open_stream() as stream: + await stream.send(10) + async for msg in stream: + print(msg) + + +if __name__ == '__main__': + trio.run(main) From ddf3d0d1b38e898bdc82d9f2262b1afd07707280 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 26 Jan 2023 17:49:14 -0500 Subject: [PATCH 07/23] Show tracebacks for un-shipped/propagated errors --- tractor/_runtime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 2cb02e2..1dcd865 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -319,7 +319,7 @@ async def _invoke( BrokenPipeError, ): # if we can't propagate the error that's a big boo boo - log.error( + log.exception( f"Failed to ship error to caller @ {chan.uid} !?" ) From df01294bb2235b6a3b568f7c9533dbf8c33e6644 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 8 Jan 2023 13:19:46 -0500 Subject: [PATCH 08/23] Show more functiony syntax in ctx-cancelled log msgs --- tractor/_runtime.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 1dcd865..707b9dd 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -228,11 +228,11 @@ async def _invoke( fname = func.__name__ if ctx._cancel_called: - msg = f'{fname} cancelled itself' + msg = f'`{fname}()` cancelled itself' elif cs.cancel_called: msg = ( - f'{fname} was remotely cancelled by its caller ' + f'`{fname}()` was remotely cancelled by its caller ' f'{ctx.chan.uid}' ) From 7394a187e073f583c9c8ba668e6443474b182e2e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 26 Jan 2023 19:43:58 -0500 Subject: [PATCH 09/23] Name one-way streaming (con generators) what it is --- tests/{test_streaming.py => test_legacy_one_way_streaming.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_streaming.py => test_legacy_one_way_streaming.py} (100%) diff --git a/tests/test_streaming.py b/tests/test_legacy_one_way_streaming.py similarity index 100% rename from tests/test_streaming.py rename to tests/test_legacy_one_way_streaming.py From 36a83cb306fd880d667c3b41a540944abc938734 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 27 Jan 2023 16:27:25 -0500 Subject: [PATCH 10/23] Refine example to drop IPC mid-stream Use a task nursery in the subactor to spawn tasks which cancel the IPC channel mid stream to simulate the most concurrent case we're likely to see. Make `main()` accept a `debug_mode: bool` for parametrization. Fill out detailed comments/docs on this example. --- examples/open_ctx_w_nursery.py | 97 +++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 44 deletions(-) diff --git a/examples/open_ctx_w_nursery.py b/examples/open_ctx_w_nursery.py index 99fc18d..8969222 100644 --- a/examples/open_ctx_w_nursery.py +++ b/examples/open_ctx_w_nursery.py @@ -1,3 +1,11 @@ +''' +Complex edge case where during real-time streaming the IPC tranport +channels are wiped out (purposely in this example though it could have +been an outage) and we want to ensure that despite being in debug mode +(or not) the user can sent SIGINT once they notice the hang and the +actor tree will eventually be cancelled without leaving any zombies. + +''' import trio from tractor import ( open_nursery, @@ -7,89 +15,90 @@ from tractor import ( ) -async def break_channel_silently( +async def break_channel_silently_then_error( stream: MsgStream, ): async for msg in stream: await stream.send(msg) + # XXX: close the channel right after an error is raised # purposely breaking the IPC transport to make sure the parent - # doesn't get stuck in debug. this more or less simulates an - # infinite msg-receive hang on the other end. await + # doesn't get stuck in debug or hang on the connection join. + # this more or less simulates an infinite msg-receive hang on + # the other end. + # if msg > 66: await stream._ctx.chan.send(None) assert 0 -async def error_and_break_stream( +async def close_stream_and_error( stream: MsgStream, ): async for msg in stream: - # echo back msg await stream.send(msg) - try: - assert 0 - except Exception: - await stream._ctx.chan.send(None) - - # NOTE: doing this instead causes the error to propagate - # correctly. - # await stream.aclose() - raise + # wipe out channel right before raising + await stream._ctx.chan.send(None) + await stream.aclose() + assert 0 @context -async def just_sleep( +async def recv_and_spawn_net_killers( ctx: Context, **kwargs, ) -> None: ''' - Start and sleep. + Receive stream msgs and spawn some IPC killers mid-stream. ''' - d = {} await ctx.started() - try: - async with ( - ctx.open_stream() as stream, - trio.open_nursery() as n, - ): - for i in range(100): - await stream.send(i) - if i > 50: - n.start_soon(break_channel_silently, stream) - n.start_soon(error_and_break_stream, stream) - - finally: - d['10'] = 10 + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): + for i in range(100): + await stream.send(i) + if i > 80: + n.start_soon(break_channel_silently_then_error, stream) + n.start_soon(close_stream_and_error, stream) -async def main() -> None: +async def main( + debug_mode: bool = True, + +) -> None: async with open_nursery( - # loglevel='info', - debug_mode=True, + + # NOTE: even debugger is used we shouldn't get + # a hang since it never engages due to broken IPC + debug_mode=debug_mode, + ) as n: portal = await n.start_actor( - 'ctx_child', - - # XXX: we don't enable the current module in order - # to trigger `ModuleNotFound`. + 'chitty_hijo', enable_modules=[__name__], - - # add a debugger test to verify this works B) - # debug_mode=True, ) async with portal.open_context( - just_sleep, # taken from pytest parameterization + recv_and_spawn_net_killers, ) as (ctx, sent): async with ctx.open_stream() as stream: - await stream.send(10) - async for msg in stream: - print(msg) + for i in range(100): + await stream.send(i) + + with trio.move_on_after(2) as cs: + rx = await stream.receive() + print(f'I a mad user and here is what i got {rx}') + + if cs.cancelled_caught: + # pretend to be a user seeing no streaming action + # thinking it's a hang, and then hitting ctl-c.. + print("YOO i'm a user and, thingz hangin.. CTL-C CTRL-C..") + raise KeyboardInterrupt if __name__ == '__main__': From fb9ff45745b5f303e6b8988338feda8de67530ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 27 Jan 2023 16:30:55 -0500 Subject: [PATCH 11/23] Move example to a new `advanced_faults` egs subset dir --- examples/{ => advanced_faults}/open_ctx_w_nursery.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename examples/{ => advanced_faults}/open_ctx_w_nursery.py (100%) diff --git a/examples/open_ctx_w_nursery.py b/examples/advanced_faults/open_ctx_w_nursery.py similarity index 100% rename from examples/open_ctx_w_nursery.py rename to examples/advanced_faults/open_ctx_w_nursery.py From 4f8586a928059619f109b21c246e3c4e77dbbd52 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 27 Jan 2023 16:40:28 -0500 Subject: [PATCH 12/23] Wrap ex in new test, change dir helpers to use `pathlib.Path` --- ...ursery.py => ipc_failure_during_stream.py} | 0 tests/conftest.py | 24 ++++++++---- tests/test_advanced_faults.py | 38 +++++++++++++++++++ 3 files changed, 54 insertions(+), 8 deletions(-) rename examples/advanced_faults/{open_ctx_w_nursery.py => ipc_failure_during_stream.py} (100%) create mode 100644 tests/test_advanced_faults.py diff --git a/examples/advanced_faults/open_ctx_w_nursery.py b/examples/advanced_faults/ipc_failure_during_stream.py similarity index 100% rename from examples/advanced_faults/open_ctx_w_nursery.py rename to examples/advanced_faults/ipc_failure_during_stream.py diff --git a/tests/conftest.py b/tests/conftest.py index af42b85..9966022 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,7 @@ import os import random import signal import platform +import pathlib import time import inspect from functools import partial, wraps @@ -113,14 +114,21 @@ no_windows = pytest.mark.skipif( ) -def repodir(): - """Return the abspath to the repo directory. - """ - dirname = os.path.dirname - dirpath = os.path.abspath( - dirname(dirname(os.path.realpath(__file__))) - ) - return dirpath +def repodir() -> pathlib.Path: + ''' + Return the abspath to the repo directory. + + ''' + # 2 parents up to step up through tests/ + return pathlib.Path(__file__).parent.parent.absolute() + + +def examples_dir() -> pathlib.Path: + ''' + Return the abspath to the examples directory as `pathlib.Path`. + + ''' + return repodir() / 'examples' def pytest_addoption(parser): diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py new file mode 100644 index 0000000..8e747f9 --- /dev/null +++ b/tests/test_advanced_faults.py @@ -0,0 +1,38 @@ +''' +Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la +cancelacion?.. + +''' +import pytest +from _pytest.pathlib import import_path +import trio + +from conftest import ( + examples_dir, +) + + +@pytest.mark.parametrize( + 'debug_mode', + [False, True], + ids=['debug_mode', 'no_debug_mode'], +) +def test_child_breaks_ipc_channel_during_stream( + debug_mode: bool, +): + ''' + Ensure we can (purposely) break IPC during streaming and it's still + possible for the (simulated) user to kill the actor tree using + SIGINT. + + ''' + mod = import_path( + examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py', + root=examples_dir(), + ) + + with pytest.raises(KeyboardInterrupt): + trio.run( + mod.main, + debug_mode, + ) From 1d92f2552ab1eb726fe6a2471a594b2e90b520d1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 27 Jan 2023 16:41:16 -0500 Subject: [PATCH 13/23] Adjust other examples tests to expect `pathlib` objects --- tests/test_debugger.py | 24 +++++++++++------------- tests/test_docs_examples.py | 20 ++++++++++---------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 0793df5..7885034 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -14,6 +14,7 @@ import itertools from os import path from typing import Optional import platform +import pathlib import sys import time @@ -24,7 +25,10 @@ from pexpect.exceptions import ( EOF, ) -from conftest import repodir, _ci_env +from conftest import ( + examples_dir, + _ci_env, +) # TODO: The next great debugger audit could be done by you! # - recurrent entry to breakpoint() from single actor *after* and an @@ -43,19 +47,13 @@ if platform.system() == 'Windows': ) -def examples_dir(): - """Return the abspath to the examples directory. - """ - return path.join(repodir(), 'examples', 'debugging/') - - def mk_cmd(ex_name: str) -> str: - """Generate a command suitable to pass to ``pexpect.spawn()``. - """ - return ' '.join( - ['python', - path.join(examples_dir(), f'{ex_name}.py')] - ) + ''' + Generate a command suitable to pass to ``pexpect.spawn()``. + + ''' + script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py' + return ' '.join(['python', str(script_path)]) # TODO: was trying to this xfail style but some weird bug i see in CI diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 4139836..31eca88 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -12,17 +12,17 @@ import shutil import pytest -from conftest import repodir - - -def examples_dir(): - """Return the abspath to the examples directory. - """ - return os.path.join(repodir(), 'examples') +from conftest import ( + examples_dir, +) @pytest.fixture -def run_example_in_subproc(loglevel, testdir, arb_addr): +def run_example_in_subproc( + loglevel: str, + testdir, + arb_addr: tuple[str, int], +): @contextmanager def run(script_code): @@ -32,8 +32,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): # on windows we need to create a special __main__.py which will # be executed with ``python -m `` on windows.. shutil.copyfile( - os.path.join(examples_dir(), '__main__.py'), - os.path.join(str(testdir), '__main__.py') + examples_dir() / '__main__.py', + str(testdir / '__main__.py'), ) # drop the ``if __name__ == '__main__'`` guard onwards from From 7fddb4416b7372e7e9feb6d751694ae1592e7f50 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 27 Jan 2023 17:02:36 -0500 Subject: [PATCH 14/23] Handle `mp` spawn method cases in test suite --- .../advanced_faults/ipc_failure_during_stream.py | 7 ++++++- tests/conftest.py | 2 +- tests/test_advanced_faults.py | 12 +++++++++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index 8969222..6ce06b8 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -64,14 +64,17 @@ async def recv_and_spawn_net_killers( if i > 80: n.start_soon(break_channel_silently_then_error, stream) n.start_soon(close_stream_and_error, stream) + await trio.sleep_forever() async def main( - debug_mode: bool = True, + debug_mode: bool = False, + start_method: str = 'trio', ) -> None: async with open_nursery( + start_method=start_method, # NOTE: even debugger is used we shouldn't get # a hang since it never engages due to broken IPC @@ -88,6 +91,8 @@ async def main( ) as (ctx, sent): async with ctx.open_stream() as stream: for i in range(100): + + # this may break in the mp_spawn case await stream.send(i) with trio.move_on_after(2) as cs: diff --git a/tests/conftest.py b/tests/conftest.py index 9966022..3363cf5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -159,7 +159,7 @@ def loglevel(request): @pytest.fixture(scope='session') -def spawn_backend(request): +def spawn_backend(request) -> str: return request.config.option.spawn_backend diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 8e747f9..8294f2a 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -19,6 +19,7 @@ from conftest import ( ) def test_child_breaks_ipc_channel_during_stream( debug_mode: bool, + spawn_backend: str, ): ''' Ensure we can (purposely) break IPC during streaming and it's still @@ -26,13 +27,22 @@ def test_child_breaks_ipc_channel_during_stream( SIGINT. ''' + expect_final_exc = KeyboardInterrupt + + if spawn_backend != 'trio': + if debug_mode: + pytest.skip('`debug_mode` only supported on `trio` spawner') + + expect_final_exc = trio.ClosedResourceError + mod = import_path( examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py', root=examples_dir(), ) - with pytest.raises(KeyboardInterrupt): + with pytest.raises(expect_final_exc): trio.run( mod.main, debug_mode, + spawn_backend, ) From 3a0817ff55513e9cf037b1ec409d66221f64c481 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 27 Jan 2023 17:17:17 -0500 Subject: [PATCH 15/23] Skip `advanced_faults/` subset in docs examples tests --- examples/advanced_faults/ipc_failure_during_stream.py | 8 ++++---- tests/test_docs_examples.py | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index 6ce06b8..809af50 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -26,7 +26,6 @@ async def break_channel_silently_then_error( # doesn't get stuck in debug or hang on the connection join. # this more or less simulates an infinite msg-receive hang on # the other end. - # if msg > 66: await stream._ctx.chan.send(None) assert 0 @@ -64,7 +63,6 @@ async def recv_and_spawn_net_killers( if i > 80: n.start_soon(break_channel_silently_then_error, stream) n.start_soon(close_stream_and_error, stream) - await trio.sleep_forever() async def main( @@ -92,7 +90,9 @@ async def main( async with ctx.open_stream() as stream: for i in range(100): - # this may break in the mp_spawn case + # it actually breaks right here in the + # mp_spawn/forkserver backends and thus the zombie + # reaper never even kicks in? await stream.send(i) with trio.move_on_after(2) as cs: @@ -102,7 +102,7 @@ async def main( if cs.cancelled_caught: # pretend to be a user seeing no streaming action # thinking it's a hang, and then hitting ctl-c.. - print("YOO i'm a user and, thingz hangin.. CTL-C CTRL-C..") + print("YOO i'm a user anddd thingz hangin.. CTRL-C..") raise KeyboardInterrupt diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 31eca88..f134c71 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -88,6 +88,7 @@ def run_example_in_subproc( and f[0] != '_' and 'debugging' not in p[0] and 'integration' not in p[0] + and 'advanced_faults' not in p[0] ], ids=lambda t: t[1], From 6c35ba2cb6fda874895f717a21c72fb32795ceca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 27 Jan 2023 22:59:15 -0500 Subject: [PATCH 16/23] Add IPC breakage on both parent and child side With the new fancy `_pytest.pathlib.import_path()` we can do real parametrization of the example-script-module code and thus configure whether the child, parent, or both silently break the IPC connection. Parametrize the test for all the above mentioned cases as well as the case where the IPC never breaks but we still simulate the user hammering ctl-c / SIGINT to terminate the actor tree. Adjust expected errors based on each case and heavily document each of these. --- .../ipc_failure_during_stream.py | 64 +++++++++++++++---- tests/test_advanced_faults.py | 53 +++++++++++++-- 2 files changed, 98 insertions(+), 19 deletions(-) diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index 809af50..a5d3738 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -46,6 +46,7 @@ async def close_stream_and_error( async def recv_and_spawn_net_killers( ctx: Context, + break_ipc: bool = False, **kwargs, ) -> None: @@ -58,9 +59,16 @@ async def recv_and_spawn_net_killers( ctx.open_stream() as stream, trio.open_nursery() as n, ): - for i in range(100): + async for i in stream: + print(f'child echoing {i}') await stream.send(i) - if i > 80: + if ( + break_ipc + and i > 500 + ): + '#################################\n' + 'Simulating child-side IPC BREAK!\n' + '#################################' n.start_soon(break_channel_silently_then_error, stream) n.start_soon(close_stream_and_error, stream) @@ -68,42 +76,72 @@ async def recv_and_spawn_net_killers( async def main( debug_mode: bool = False, start_method: str = 'trio', + break_parent_ipc: bool = False, + break_child_ipc: bool = False, ) -> None: - async with open_nursery( - start_method=start_method, + async with ( + open_nursery( + start_method=start_method, - # NOTE: even debugger is used we shouldn't get - # a hang since it never engages due to broken IPC - debug_mode=debug_mode, + # NOTE: even debugger is used we shouldn't get + # a hang since it never engages due to broken IPC + debug_mode=debug_mode, + loglevel='warning', - ) as n: - portal = await n.start_actor( + ) as an, + ): + portal = await an.start_actor( 'chitty_hijo', enable_modules=[__name__], ) async with portal.open_context( recv_and_spawn_net_killers, + break_ipc=break_child_ipc, + ) as (ctx, sent): async with ctx.open_stream() as stream: - for i in range(100): + for i in range(1000): + + if ( + break_parent_ipc + and i > 100 + ): + print( + '#################################\n' + 'Simulating parent-side IPC BREAK!\n' + '#################################' + ) + await stream._ctx.chan.send(None) # it actually breaks right here in the # mp_spawn/forkserver backends and thus the zombie # reaper never even kicks in? + print(f'parent sending {i}') await stream.send(i) with trio.move_on_after(2) as cs: + + # NOTE: in the parent side IPC failure case this + # will raise an ``EndOfChannel`` after the child + # is killed and sends a stop msg back to it's + # caller/this-parent. rx = await stream.receive() - print(f'I a mad user and here is what i got {rx}') + + print(f"I'm a happy user and echoed to me is {rx}") if cs.cancelled_caught: # pretend to be a user seeing no streaming action # thinking it's a hang, and then hitting ctl-c.. - print("YOO i'm a user anddd thingz hangin.. CTRL-C..") - raise KeyboardInterrupt + print("YOO i'm a user anddd thingz hangin..") + + print( + "YOO i'm mad send side dun but thingz hangin..\n" + 'MASHING CTlR-C Ctl-c..' + ) + raise KeyboardInterrupt if __name__ == '__main__': diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 8294f2a..a5e4aa9 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -3,6 +3,8 @@ Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la cancelacion?.. ''' +from functools import partial + import pytest from _pytest.pathlib import import_path import trio @@ -15,11 +17,30 @@ from conftest import ( @pytest.mark.parametrize( 'debug_mode', [False, True], - ids=['debug_mode', 'no_debug_mode'], + ids=['no_debug_mode', 'debug_mode'], +) +@pytest.mark.parametrize( + 'ipc_break', + [ + {}, + {'break_parent_ipc': True}, + {'break_child_ipc': True}, + { + 'break_child_ipc': True, + 'break_parent_ipc': True, + }, + ], + ids=[ + 'no_break', + 'break_parent', + 'break_child', + 'break_both', + ], ) def test_child_breaks_ipc_channel_during_stream( debug_mode: bool, spawn_backend: str, + ipc_break: dict | None, ): ''' Ensure we can (purposely) break IPC during streaming and it's still @@ -27,12 +48,12 @@ def test_child_breaks_ipc_channel_during_stream( SIGINT. ''' - expect_final_exc = KeyboardInterrupt - if spawn_backend != 'trio': if debug_mode: pytest.skip('`debug_mode` only supported on `trio` spawner') + # non-`trio` spawners should never hit the hang condition that + # requires the user to do ctl-c to cancel the actor tree. expect_final_exc = trio.ClosedResourceError mod = import_path( @@ -40,9 +61,29 @@ def test_child_breaks_ipc_channel_during_stream( root=examples_dir(), ) + expect_final_exc = KeyboardInterrupt + + # when ONLY the child breaks we expect the parent to get a closed + # resource error on the next `MsgStream.receive()` and then fail out + # and cancel the child from there. + if 'break_child_ipc' in ipc_break: + expect_final_exc = trio.ClosedResourceError + + # when the parent IPC side dies (even if the child's does as well) + # we expect the channel to be sent a stop msg from the child at some + # point which will signal the parent that the stream has been + # terminated. + # NOTE: when the parent breaks "after" the child you get the above + # case as well, but it's not worth testing right? + if 'break_parent_ipc' in ipc_break: + expect_final_exc = trio.EndOfChannel + with pytest.raises(expect_final_exc): trio.run( - mod.main, - debug_mode, - spawn_backend, + partial( + mod.main, + debug_mode=debug_mode, + start_method=spawn_backend, + **ipc_break, + ) ) From e34823aab4a6b84a6ee95ba15f4d68285ff97416 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 28 Jan 2023 16:44:35 -0500 Subject: [PATCH 17/23] Add parent vs. child cancels first cases --- .../ipc_failure_during_stream.py | 21 ++-- tests/test_advanced_faults.py | 95 +++++++++++++++---- 2 files changed, 89 insertions(+), 27 deletions(-) diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index a5d3738..6728b8d 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -46,8 +46,7 @@ async def close_stream_and_error( async def recv_and_spawn_net_killers( ctx: Context, - break_ipc: bool = False, - **kwargs, + break_ipc_after: bool | int = False, ) -> None: ''' @@ -63,8 +62,8 @@ async def recv_and_spawn_net_killers( print(f'child echoing {i}') await stream.send(i) if ( - break_ipc - and i > 500 + break_ipc_after + and i > break_ipc_after ): '#################################\n' 'Simulating child-side IPC BREAK!\n' @@ -76,8 +75,12 @@ async def recv_and_spawn_net_killers( async def main( debug_mode: bool = False, start_method: str = 'trio', - break_parent_ipc: bool = False, - break_child_ipc: bool = False, + + # by default we break the parent IPC first (if configured to break + # at all), but this can be changed so the child does first (even if + # both are set to break). + break_parent_ipc_after: int | bool = False, + break_child_ipc_after: int | bool = False, ) -> None: @@ -99,15 +102,15 @@ async def main( async with portal.open_context( recv_and_spawn_net_killers, - break_ipc=break_child_ipc, + break_ipc_after=break_child_ipc_after, ) as (ctx, sent): async with ctx.open_stream() as stream: for i in range(1000): if ( - break_parent_ipc - and i > 100 + break_parent_ipc_after + and i > break_parent_ipc_after ): print( '#################################\n' diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index a5e4aa9..081bd0e 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -22,30 +22,56 @@ from conftest import ( @pytest.mark.parametrize( 'ipc_break', [ - {}, - {'break_parent_ipc': True}, - {'break_child_ipc': True}, + # no breaks { - 'break_child_ipc': True, - 'break_parent_ipc': True, + 'break_parent_ipc_after': False, + 'break_child_ipc_after': False, }, + + # only parent breaks + { + 'break_parent_ipc_after': 500, + 'break_child_ipc_after': False, + }, + + # only child breaks + { + 'break_parent_ipc_after': False, + 'break_child_ipc_after': 500, + }, + + # both: break parent first + { + 'break_parent_ipc_after': 500, + 'break_child_ipc_after': 800, + }, + # both: break child first + { + 'break_parent_ipc_after': 800, + 'break_child_ipc_after': 500, + }, + ], ids=[ 'no_break', 'break_parent', 'break_child', - 'break_both', + 'break_both_parent_first', + 'break_both_child_first', ], ) -def test_child_breaks_ipc_channel_during_stream( +def test_ipc_channel_break_during_stream( debug_mode: bool, spawn_backend: str, ipc_break: dict | None, ): ''' - Ensure we can (purposely) break IPC during streaming and it's still - possible for the (simulated) user to kill the actor tree using - SIGINT. + Ensure we can have an IPC channel break its connection during + streaming and it's still possible for the (simulated) user to kill + the actor tree using SIGINT. + + We also verify the type of connection error expected in the parent + depending on which side if the IPC breaks first. ''' if spawn_backend != 'trio': @@ -66,16 +92,49 @@ def test_child_breaks_ipc_channel_during_stream( # when ONLY the child breaks we expect the parent to get a closed # resource error on the next `MsgStream.receive()` and then fail out # and cancel the child from there. - if 'break_child_ipc' in ipc_break: + if ( + + # only child breaks + ( + ipc_break['break_child_ipc_after'] + and ipc_break['break_parent_ipc_after'] is False + ) + + # both break but, parent breaks first + or ( + ipc_break['break_child_ipc_after'] is not False + and ( + ipc_break['break_parent_ipc_after'] + > ipc_break['break_child_ipc_after'] + ) + ) + + ): expect_final_exc = trio.ClosedResourceError - # when the parent IPC side dies (even if the child's does as well) - # we expect the channel to be sent a stop msg from the child at some - # point which will signal the parent that the stream has been - # terminated. - # NOTE: when the parent breaks "after" the child you get the above - # case as well, but it's not worth testing right? - if 'break_parent_ipc' in ipc_break: + # when the parent IPC side dies (even if the child's does as well + # but the child fails BEFORE the parent) we expect the channel to be + # sent a stop msg from the child at some point which will signal the + # parent that the stream has been terminated. + # NOTE: when the parent breaks "after" the child you get this same + # case as well, the child breaks the IPC channel with a stop msg + # before any closure takes place. + elif ( + # only parent breaks + ( + ipc_break['break_parent_ipc_after'] + and ipc_break['break_child_ipc_after'] is False + ) + + # both break but, child breaks first + or ( + ipc_break['break_parent_ipc_after'] is not False + and ( + ipc_break['break_child_ipc_after'] + > ipc_break['break_parent_ipc_after'] + ) + ) + ): expect_final_exc = trio.EndOfChannel with pytest.raises(expect_final_exc): From 3967c0ed9e256a018dadfceeab08be592c1f53fb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 28 Jan 2023 17:25:07 -0500 Subject: [PATCH 18/23] Add a simplified zombie lord specific process reaping test --- tests/test_advanced_faults.py | 45 +++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 081bd0e..a48866e 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -8,6 +8,7 @@ from functools import partial import pytest from _pytest.pathlib import import_path import trio +import tractor from conftest import ( examples_dir, @@ -146,3 +147,47 @@ def test_ipc_channel_break_during_stream( **ipc_break, ) ) + + +@tractor.context +async def break_ipc_after_started( + ctx: tractor.Context, +) -> None: + await ctx.started() + async with ctx.open_stream() as stream: + await stream.aclose() + await trio.sleep(0.2) + await ctx.chan.send(None) + print('child broke IPC and terminating') + + +def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages(): + ''' + Verify that is a subactor's IPC goes down just after bringing up a stream + the parent can trigger a SIGINT and the child will be reaped out-of-IPC by + the localhost process supervision machinery: aka "zombie lord". + + ''' + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'ipc_breaker', + enable_modules=[__name__], + ) + + with trio.move_on_after(1): + async with ( + portal.open_context( + break_ipc_after_started + ) as (ctx, sent), + ): + async with ctx.open_stream(): + await trio.sleep(0.5) + + print('parent waiting on context') + + print('parent exited context') + raise KeyboardInterrupt + + with pytest.raises(KeyboardInterrupt): + trio.run(main) From 556f4626db4c4ebf1927144863e11c54233ec651 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 28 Jan 2023 17:37:57 -0500 Subject: [PATCH 19/23] Tweak warning msg for still-alive-after-cancelled actor --- tractor/_spawn.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 6c0ce5b..900aea2 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -260,7 +260,9 @@ async def soft_wait( if proc.poll() is None: # type: ignore log.warning( - f'Process still alive after cancel request:\n{uid}') + 'Actor still alive after cancel request:\n' + f'{uid}' + ) n.cancel_scope.cancel() raise From aa4871b13d85d001a12c48b2425e6f94b12527ff Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 28 Jan 2023 18:01:53 -0500 Subject: [PATCH 20/23] Call `MsgStream.aclose()` in `Context.open_stream.__aexit__()` We weren't doing this originally I *think* just because of the path dependent nature of the way the code was developed (originally being mega pedantic about one-way vs. bidirectional streams) but, it doesn't seem like there's any issue just calling the stream's `.aclose()`; also have the benefit of just being less code and logic checks B) --- tractor/_streaming.py | 75 ++++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 11ff47d..699a906 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -97,6 +97,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): if self._eoc: raise trio.EndOfChannel + if self._closed: + raise trio.ClosedResourceError('This stream was closed') + try: msg = await self._rx_chan.receive() return msg['yield'] @@ -110,6 +113,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # - 'error' # possibly just handle msg['stop'] here! + if self._closed: + raise trio.ClosedResourceError('This stream was closed') + if msg.get('stop') or self._eoc: log.debug(f"{self} was stopped at remote end") @@ -189,7 +195,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): return self._eoc = True - self._closed = True # NOTE: this is super subtle IPC messaging stuff: # Relay stop iteration to far end **iff** we're @@ -206,29 +211,32 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # In the bidirectional case, `Context.open_stream()` will create # the `Actor._cids2qs` entry from a call to - # `Actor.get_context()` and will send the stop message in - # ``__aexit__()`` on teardown so it **does not** need to be - # called here. - if not self._ctx._portal: - # Only for 2 way streams can we can send stop from the - # caller side. - try: - # NOTE: if this call is cancelled we expect this end to - # handle as though the stop was never sent (though if it - # was it shouldn't matter since it's unlikely a user - # will try to re-use a stream after attemping to close - # it). - with trio.CancelScope(shield=True): - await self._ctx.send_stop() + # `Actor.get_context()` and will call us here to send the stop + # msg in ``__aexit__()`` on teardown. + try: + # NOTE: if this call is cancelled we expect this end to + # handle as though the stop was never sent (though if it + # was it shouldn't matter since it's unlikely a user + # will try to re-use a stream after attemping to close + # it). + with trio.CancelScope(shield=True): + await self._ctx.send_stop() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - # the underlying channel may already have been pulled - # in which case our stop message is meaningless since - # it can't traverse the transport. - log.debug(f'Channel for {self} was already closed') + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # the underlying channel may already have been pulled + # in which case our stop message is meaningless since + # it can't traverse the transport. + ctx = self._ctx + log.warning( + f'Stream was already destroyed?\n' + f'actor: {ctx.chan.uid}\n' + f'ctx id: {ctx.cid}' + ) + + self._closed = True # Do we close the local mem chan ``self._rx_chan`` ??!? @@ -594,30 +602,23 @@ class Context: async with MsgStream( ctx=self, rx_chan=ctx._recv_chan, - ) as rchan: + ) as stream: if self._portal: - self._portal._streams.add(rchan) + self._portal._streams.add(stream) try: self._stream_opened = True - # ensure we aren't cancelled before delivering - # the stream + # XXX: do we need this? + # ensure we aren't cancelled before yielding the stream # await trio.lowlevel.checkpoint() - yield rchan + yield stream - # XXX: Make the stream "one-shot use". On exit, signal + # NOTE: Make the stream "one-shot use". On exit, signal # ``trio.EndOfChannel``/``StopAsyncIteration`` to the # far end. - try: - await self.send_stop() - except trio.BrokenResourceError: - log.warning( - f"Couldn't close: stream already broken?\n" - f'actor: {self.chan.uid}\n' - f'ctx id: {self.cid}' - ) + await stream.aclose() finally: if self._portal: From 195d2f0ed4ce3551be6b596a86a7abb9b0ac60ab Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 28 Jan 2023 18:12:03 -0500 Subject: [PATCH 21/23] Add nooz --- nooz/346.bugfix.rst | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 nooz/346.bugfix.rst diff --git a/nooz/346.bugfix.rst b/nooz/346.bugfix.rst new file mode 100644 index 0000000..6ffe636 --- /dev/null +++ b/nooz/346.bugfix.rst @@ -0,0 +1,15 @@ +Fixes to ensure IPC (channel) breakage doesn't result in hung actor +trees; the zombie reaping and general supervision machinery will always +clean up and terminate. + +This includes not only the (mostly minor) fixes to solve these cases but +also a new extensive test suite in `test_advanced_faults.py` with an +accompanying highly configurable example module-script in +`examples/advanced_faults/ipc_failure_during_stream.py`. Tests ensure we +never get hang or zombies despite operating in debug mode and attempt to +simulate all possible IPC transport failure cases for a local-host actor +tree. + +Further we simplify `Context.open_stream.__aexit__()` to just call +`MsgStream.aclose()` directly more or less avoiding a pure duplicate +code path. From af6c325072e7d7bbe9c8b74d4a6200ea0f1b8219 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 28 Jan 2023 18:12:52 -0500 Subject: [PATCH 22/23] Bump up legacy streaming timeout a smidgen --- tests/test_legacy_one_way_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py index 4e54e02..17e94ba 100644 --- a/tests/test_legacy_one_way_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -251,7 +251,7 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend): results, diff = time_quad_ex assert results - this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.666 + this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3 assert diff < this_fast From 13c9eadc8f0b9761d4b27eb46ba48ae54b7345c9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 28 Jan 2023 18:42:31 -0500 Subject: [PATCH 23/23] Move result log msg up and drop else block --- tractor/_portal.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 5546949..05504bd 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -497,6 +497,10 @@ class Portal: f'actor: {uid}' ) result = await ctx.result() + log.runtime( + f'Context {fn_name} returned ' + f'value from callee `{result}`' + ) # though it should be impossible for any tasks # operating *in* this scope to have survived @@ -518,12 +522,6 @@ class Portal: f'task:{cid}\n' f'actor:{uid}' ) - else: - log.runtime( - f'Context {fn_name} returned ' - f'value from callee `{result}`' - ) - # XXX: (MEGA IMPORTANT) if this is a root opened process we # wait for any immediate child in debug before popping the # context from the runtime msg loop otherwise inside