diff --git a/docs/README.rst b/docs/README.rst index 9dfe2f60..9dd7faf4 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -3,8 +3,8 @@ |gh_actions| |docs| -``tractor`` is a `structured concurrent`_, multi-processing_ runtime -built on trio_. +``tractor`` is a `structured concurrent`_, (optionally +distributed_) multi-processing_ runtime built on trio_. Fundamentally, ``tractor`` gives you parallelism via ``trio``-"*actors*": independent Python processes (aka @@ -17,11 +17,20 @@ protocol" constructed on top of multiple Pythons each running a ``trio`` scheduled runtime - a call to ``trio.run()``. We believe the system adheres to the `3 axioms`_ of an "`actor model`_" -but likely *does not* look like what *you* probably think an "actor -model" looks like, and that's *intentional*. +but likely **does not** look like what **you** probably *think* an "actor +model" looks like, and that's **intentional**. -The first step to grok ``tractor`` is to get the basics of ``trio`` down. -A great place to start is the `trio docs`_ and this `blog post`_. + +Where do i start!? +------------------ +The first step to grok ``tractor`` is to get an intermediate +knowledge of ``trio`` and **structured concurrency** B) + +Some great places to start are, +- the seminal `blog post`_ +- obviously the `trio docs`_ +- wikipedia's nascent SC_ page +- the fancy diagrams @ libdill-docs_ Features @@ -593,6 +602,7 @@ matrix seems too hip, we're also mostly all in the the `trio gitter channel`_! .. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228 +.. _distributed: https://en.wikipedia.org/wiki/Distributed_computing .. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing .. _trio: https://github.com/python-trio/trio .. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements @@ -611,8 +621,9 @@ channel`_! .. _trio docs: https://trio.readthedocs.io/en/latest/ .. _blog post: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ .. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency +.. _SC: https://en.wikipedia.org/wiki/Structured_concurrency +.. _libdill-docs: https://sustrik.github.io/libdill/structured-concurrency.html .. _structured chadcurrency: https://en.wikipedia.org/wiki/Structured_concurrency -.. _structured concurrency: https://en.wikipedia.org/wiki/Structured_concurrency .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _trio-parallel: https://github.com/richardsheridan/trio-parallel diff --git a/examples/debugging/asyncio_bp.py b/examples/debugging/asyncio_bp.py new file mode 100644 index 00000000..b32ad1d8 --- /dev/null +++ b/examples/debugging/asyncio_bp.py @@ -0,0 +1,117 @@ +import asyncio + +import trio +import tractor +from tractor import to_asyncio + + +async def aio_sleep_forever(): + await asyncio.sleep(float('inf')) + + +async def bp_then_error( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, + + raise_after_bp: bool = True, + +) -> None: + + # sync with ``trio``-side (caller) task + to_trio.send_nowait('start') + + # NOTE: what happens here inside the hook needs some refinement.. + # => seems like it's still `._debug._set_trace()` but + # we set `Lock.local_task_in_debug = 'sync'`, we probably want + # some further, at least, meta-data about the task/actoq in debug + # in terms of making it clear it's asyncio mucking about. + breakpoint() + + # short checkpoint / delay + await asyncio.sleep(0.5) + + if raise_after_bp: + raise ValueError('blah') + + # TODO: test case with this so that it gets cancelled? + else: + # XXX NOTE: this is required in order to get the SIGINT-ignored + # hang case documented in the module script section! + await aio_sleep_forever() + + +@tractor.context +async def trio_ctx( + ctx: tractor.Context, + bp_before_started: bool = False, +): + + # this will block until the ``asyncio`` task sends a "first" + # message, see first line in above func. + async with ( + + to_asyncio.open_channel_from( + bp_then_error, + raise_after_bp=not bp_before_started, + ) as (first, chan), + + trio.open_nursery() as n, + ): + + assert first == 'start' + + if bp_before_started: + await tractor.breakpoint() + + await ctx.started(first) + + n.start_soon( + to_asyncio.run_task, + aio_sleep_forever, + ) + await trio.sleep_forever() + + +async def main( + bps_all_over: bool = False, + +) -> None: + + async with tractor.open_nursery() as n: + + p = await n.start_actor( + 'aio_daemon', + enable_modules=[__name__], + infect_asyncio=True, + debug_mode=True, + loglevel='cancel', + ) + + async with p.open_context( + trio_ctx, + bp_before_started=bps_all_over, + ) as (ctx, first): + + assert first == 'start' + + if bps_all_over: + await tractor.breakpoint() + + # await trio.sleep_forever() + await ctx.cancel() + assert 0 + + # TODO: case where we cancel from trio-side while asyncio task + # has debugger lock? + # await p.cancel_actor() + + +if __name__ == '__main__': + + # works fine B) + trio.run(main) + + # will hang and ignores SIGINT !! + # NOTE: you'll need to send a SIGQUIT (via ctl-\) to kill it + # manually.. + # trio.run(main, True) diff --git a/setup.py b/setup.py index a4e5e1ed..66b2622d 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ with open('docs/README.rst', encoding='utf-8') as f: setup( name="tractor", version='0.1.0a6dev0', # alpha zone - description='structured concurrrent `trio`-"actors"', + description='structured concurrent `trio`-"actors"', long_description=readme, license='AGPLv3', author='Tyler Goodlet', @@ -40,6 +40,7 @@ setup( 'tractor.trionics', # trio extensions 'tractor.msg', # lowlevel data types 'tractor._testing', # internal cross-subsys suite utils + 'tractor.devx', # "dev-experience" ], install_requires=[ @@ -53,6 +54,7 @@ setup( # 'exceptiongroup', # in stdlib as of 3.11! # tooling + 'stackscope', 'tricycle', 'trio_typing', 'colorlog', @@ -64,16 +66,15 @@ setup( # debug mode REPL 'pdbp', + # TODO: distributed transport using + # linux kernel networking + # 'pyroute2', + # pip ref docs on these specs: # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples # and pep: # https://peps.python.org/pep-0440/#version-specifiers - # windows deps workaround for ``pdbpp`` - # https://github.com/pdbpp/pdbpp/issues/498 - # https://github.com/pdbpp/fancycompleter/issues/37 - 'pyreadline3 ; platform_system == "Windows"', - ], tests_require=['pytest'], python_requires=">=3.11", diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 42b1f7d0..40247fd7 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -245,10 +245,10 @@ def test_simple_context( trio.run(main) except error_parent: pass - except trio.MultiError as me: + except BaseExceptionGroup as beg: # XXX: on windows it seems we may have to expect the group error from tractor._exceptions import is_multi_cancelled - assert is_multi_cancelled(me) + assert is_multi_cancelled(beg) else: trio.run(main) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 889e7c74..a10ecad9 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -10,6 +10,7 @@ TODO: - wonder if any of it'll work on OS X? """ +from functools import partial import itertools from typing import Optional import platform @@ -26,6 +27,10 @@ from pexpect.exceptions import ( from tractor._testing import ( examples_dir, ) +from tractor.devx._debug import ( + _pause_msg, + _crash_msg, +) from .conftest import ( _ci_env, ) @@ -123,20 +128,52 @@ def expect( raise +def in_prompt_msg( + prompt: str, + parts: list[str], + + pause_on_false: bool = False, + print_prompt_on_false: bool = True, + +) -> bool: + ''' + Predicate check if (the prompt's) std-streams output has all + `str`-parts in it. + + Can be used in test asserts for bulk matching expected + log/REPL output for a given `pdb` interact point. + + ''' + for part in parts: + if part not in prompt: + + if pause_on_false: + import pdbp + pdbp.set_trace() + + if print_prompt_on_false: + print(prompt) + + return False + + return True + def assert_before( child, patts: list[str], + **kwargs, + ) -> None: - before = str(child.before.decode()) + # as in before the prompt end + before: str = str(child.before.decode()) + assert in_prompt_msg( + prompt=before, + parts=patts, - for patt in patts: - try: - assert patt in before - except AssertionError: - print(before) - raise + **kwargs + ) @pytest.fixture( @@ -195,7 +232,10 @@ def test_root_actor_error(spawn, user_in_out): before = str(child.before.decode()) # make sure expected logging and error arrives - assert "Attaching to pdb in crashed actor: ('root'" in before + assert in_prompt_msg( + before, + [_crash_msg, "('root'"] + ) assert 'AssertionError' in before # send user command @@ -332,7 +372,10 @@ def test_subactor_error( child.expect(PROMPT) before = str(child.before.decode()) - assert "Attaching to pdb in crashed actor: ('name_error'" in before + assert in_prompt_msg( + before, + [_crash_msg, "('name_error'"] + ) if do_next: child.sendline('n') @@ -353,9 +396,15 @@ def test_subactor_error( before = str(child.before.decode()) # root actor gets debugger engaged - assert "Attaching to pdb in crashed actor: ('root'" in before + assert in_prompt_msg( + before, + [_crash_msg, "('root'"] + ) # error is a remote error propagated from the subactor - assert "RemoteActorError: ('name_error'" in before + assert in_prompt_msg( + before, + [_crash_msg, "('name_error'"] + ) # another round if ctlc: @@ -380,7 +429,10 @@ def test_subactor_breakpoint( child.expect(PROMPT) before = str(child.before.decode()) - assert "Attaching pdb to actor: ('breakpoint_forever'" in before + assert in_prompt_msg( + before, + [_pause_msg, "('breakpoint_forever'"] + ) # do some "next" commands to demonstrate recurrent breakpoint # entries @@ -396,7 +448,10 @@ def test_subactor_breakpoint( child.sendline('continue') child.expect(PROMPT) before = str(child.before.decode()) - assert "Attaching pdb to actor: ('breakpoint_forever'" in before + assert in_prompt_msg( + before, + [_pause_msg, "('breakpoint_forever'"] + ) if ctlc: do_ctlc(child) @@ -441,7 +496,10 @@ def test_multi_subactors( child.expect(PROMPT) before = str(child.before.decode()) - assert "Attaching pdb to actor: ('breakpoint_forever'" in before + assert in_prompt_msg( + before, + [_pause_msg, "('breakpoint_forever'"] + ) if ctlc: do_ctlc(child) @@ -461,7 +519,10 @@ def test_multi_subactors( # first name_error failure child.expect(PROMPT) before = str(child.before.decode()) - assert "Attaching to pdb in crashed actor: ('name_error'" in before + assert in_prompt_msg( + before, + [_crash_msg, "('name_error'"] + ) assert "NameError" in before if ctlc: @@ -487,7 +548,10 @@ def test_multi_subactors( child.sendline('c') child.expect(PROMPT) before = str(child.before.decode()) - assert "Attaching pdb to actor: ('breakpoint_forever'" in before + assert in_prompt_msg( + before, + [_pause_msg, "('breakpoint_forever'"] + ) if ctlc: do_ctlc(child) @@ -527,17 +591,21 @@ def test_multi_subactors( child.expect(PROMPT) before = str(child.before.decode()) - assert_before(child, [ - # debugger attaches to root - "Attaching to pdb in crashed actor: ('root'", + assert_before( + child, [ + # debugger attaches to root + # "Attaching to pdb in crashed actor: ('root'", + _crash_msg, + "('root'", - # expect a multierror with exceptions for each sub-actor - "RemoteActorError: ('breakpoint_forever'", - "RemoteActorError: ('name_error'", - "RemoteActorError: ('spawn_error'", - "RemoteActorError: ('name_error_1'", - 'bdb.BdbQuit', - ]) + # expect a multierror with exceptions for each sub-actor + "RemoteActorError: ('breakpoint_forever'", + "RemoteActorError: ('name_error'", + "RemoteActorError: ('spawn_error'", + "RemoteActorError: ('name_error_1'", + 'bdb.BdbQuit', + ] + ) if ctlc: do_ctlc(child) @@ -574,15 +642,22 @@ def test_multi_daemon_subactors( # the root's tty lock first so anticipate either crash # message on the first entry. - bp_forever_msg = "Attaching pdb to actor: ('bp_forever'" + bp_forev_parts = [_pause_msg, "('bp_forever'"] + bp_forev_in_msg = partial( + in_prompt_msg, + parts=bp_forev_parts, + ) + name_error_msg = "NameError: name 'doggypants' is not defined" + name_error_parts = [name_error_msg] before = str(child.before.decode()) - if bp_forever_msg in before: - next_msg = name_error_msg + + if bp_forev_in_msg(prompt=before): + next_parts = name_error_parts elif name_error_msg in before: - next_msg = bp_forever_msg + next_parts = bp_forev_parts else: raise ValueError("Neither log msg was found !?") @@ -599,7 +674,10 @@ def test_multi_daemon_subactors( child.sendline('c') child.expect(PROMPT) - assert_before(child, [next_msg]) + assert_before( + child, + next_parts, + ) # XXX: hooray the root clobbering the child here was fixed! # IMO, this demonstrates the true power of SC system design. @@ -623,9 +701,15 @@ def test_multi_daemon_subactors( child.expect(PROMPT) try: - assert_before(child, [bp_forever_msg]) + assert_before( + child, + bp_forev_parts, + ) except AssertionError: - assert_before(child, [name_error_msg]) + assert_before( + child, + name_error_parts, + ) else: if ctlc: @@ -637,7 +721,10 @@ def test_multi_daemon_subactors( child.sendline('c') child.expect(PROMPT) - assert_before(child, [name_error_msg]) + assert_before( + child, + name_error_parts, + ) # wait for final error in root # where it crashs with boxed error @@ -647,7 +734,7 @@ def test_multi_daemon_subactors( child.expect(PROMPT) assert_before( child, - [bp_forever_msg] + bp_forev_parts ) except AssertionError: break @@ -656,7 +743,9 @@ def test_multi_daemon_subactors( child, [ # boxed error raised in root task - "Attaching to pdb in crashed actor: ('root'", + # "Attaching to pdb in crashed actor: ('root'", + _crash_msg, + "('root'", "_exceptions.RemoteActorError: ('name_error'", ] ) @@ -770,7 +859,7 @@ def test_multi_nested_subactors_error_through_nurseries( child = spawn('multi_nested_subactors_error_up_through_nurseries') - timed_out_early: bool = False + # timed_out_early: bool = False for send_char in itertools.cycle(['c', 'q']): try: @@ -871,11 +960,14 @@ def test_root_nursery_cancels_before_child_releases_tty_lock( if not timed_out_early: before = str(child.before.decode()) - assert_before(child, [ - "tractor._exceptions.RemoteActorError: ('spawner0'", - "tractor._exceptions.RemoteActorError: ('name_error'", - "NameError: name 'doggypants' is not defined", - ]) + assert_before( + child, + [ + "tractor._exceptions.RemoteActorError: ('spawner0'", + "tractor._exceptions.RemoteActorError: ('name_error'", + "NameError: name 'doggypants' is not defined", + ], + ) def test_root_cancels_child_context_during_startup( @@ -909,8 +1001,10 @@ def test_different_debug_mode_per_actor( # only one actor should enter the debugger before = str(child.before.decode()) - assert "Attaching to pdb in crashed actor: ('debugged_boi'" in before - assert "RuntimeError" in before + assert in_prompt_msg( + before, + [_crash_msg, "('debugged_boi'", "RuntimeError"], + ) if ctlc: do_ctlc(child) diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py index 5d7787fa..61fff75c 100644 --- a/tests/test_legacy_one_way_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -38,10 +38,13 @@ async def async_gen_stream(sequence): assert cs.cancelled_caught +# TODO: deprecated either remove entirely +# or re-impl in terms of `MsgStream` one-sides +# wrapper, but at least remove `Portal.open_stream_from()` @tractor.stream async def context_stream( ctx: tractor.Context, - sequence + sequence: list[int], ): for i in sequence: await ctx.send_yield(i) diff --git a/tractor/__init__.py b/tractor/__init__.py index 7af40c6e..5c16bc4e 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -19,7 +19,6 @@ tractor: structured concurrent ``trio``-"actors". """ from ._clustering import open_actor_cluster -from ._ipc import Channel from ._context import ( Context, # the type context, # a func-decorator @@ -44,8 +43,10 @@ from ._exceptions import ( ModuleNotExposed, ContextCancelled, ) -from ._debug import ( +from .devx import ( breakpoint, + pause, + pause_from_sync, post_mortem, ) from . import msg @@ -53,31 +54,35 @@ from ._root import ( run_daemon, open_root_actor, ) +from ._ipc import Channel from ._portal import Portal from ._runtime import Actor __all__ = [ 'Actor', + 'BaseExceptionGroup', 'Channel', 'Context', 'ContextCancelled', 'ModuleNotExposed', 'MsgStream', - 'BaseExceptionGroup', 'Portal', 'RemoteActorError', 'breakpoint', 'context', 'current_actor', 'find_actor', + 'query_actor', 'get_arbiter', 'is_root_process', 'msg', 'open_actor_cluster', 'open_nursery', 'open_root_actor', + 'pause', 'post_mortem', + 'pause_from_sync', 'query_actor', 'run_daemon', 'stream', diff --git a/tractor/_context.py b/tractor/_context.py index 50f7bfa5..a2860f3d 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -313,7 +313,7 @@ async def _drain_to_final_msg( log.critical('SHOULD NEVER GET HERE!?') assert msg is ctx._cancel_msg assert error.msgdata == ctx._remote_error.msgdata - from ._debug import pause + from .devx._debug import pause await pause() ctx._maybe_cancel_and_set_remote_error(error) ctx._maybe_raise_remote_err(error) @@ -868,6 +868,9 @@ class Context: # TODO: maybe we should also call `._res_scope.cancel()` if it # exists to support cancelling any drain loop hangs? + # NOTE: this usage actually works here B) + # from .devx._debug import breakpoint + # await breakpoint() # TODO: add to `Channel`? @property @@ -2199,7 +2202,7 @@ async def open_context_from_portal( # pass # TODO: factor ^ into below for non-root cases? # - from ._debug import maybe_wait_for_debugger + from .devx._debug import maybe_wait_for_debugger was_acquired: bool = await maybe_wait_for_debugger( # header_msg=( # 'Delaying `ctx.cancel()` until debug lock ' @@ -2310,7 +2313,7 @@ async def open_context_from_portal( # where the root is waiting on the lock to clear but the # child has already cleared it and clobbered IPC. if debug_mode(): - from ._debug import maybe_wait_for_debugger + from .devx._debug import maybe_wait_for_debugger await maybe_wait_for_debugger() # though it should be impossible for any tasks diff --git a/tractor/_root.py b/tractor/_root.py index 881dc90f..a1a11d3b 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -38,7 +38,7 @@ from ._runtime import ( # Arbiter as Registry, async_main, ) -from . import _debug +from .devx import _debug from . import _spawn from . import _state from . import log @@ -90,7 +90,7 @@ async def open_root_actor( # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 builtin_bp_handler = sys.breakpointhook orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None) - os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' + os.environ['PYTHONBREAKPOINT'] = 'tractor.devx._debug.pause_from_sync' # attempt to retreive ``trio``'s sigint handler and stash it # on our debugger lock state. @@ -131,14 +131,20 @@ async def open_root_actor( ) ) - loglevel = (loglevel or log._default_loglevel).upper() + loglevel = ( + loglevel + or log._default_loglevel + ).upper() - if debug_mode and _spawn._spawn_method == 'trio': + if ( + debug_mode + and _spawn._spawn_method == 'trio' + ): _state._runtime_vars['_debug_mode'] = True - # expose internal debug module to every actor allowing - # for use of ``await tractor.breakpoint()`` - enable_modules.append('tractor._debug') + # expose internal debug module to every actor allowing for + # use of ``await tractor.pause()`` + enable_modules.append('tractor.devx._debug') # if debug mode get's enabled *at least* use that level of # logging for some informative console prompts. @@ -156,7 +162,20 @@ async def open_root_actor( "Debug mode is only supported for the `trio` backend!" ) - log.get_console_log(loglevel) + assert loglevel + _log = log.get_console_log(loglevel) + assert _log + + # TODO: factor this into `.devx._stackscope`!! + if debug_mode: + try: + logger.info('Enabling `stackscope` traces on SIGUSR1') + from .devx import enable_stack_on_sig + enable_stack_on_sig() + except ImportError: + logger.warning( + '`stackscope` not installed for use in debug mode!' + ) try: # make a temporary connection to see if an arbiter exists, @@ -237,10 +256,10 @@ async def open_root_actor( ) as err: entered = await _debug._maybe_enter_pm(err) - if ( not entered - and not is_multi_cancelled(err) + and + not is_multi_cancelled(err) ): logger.exception('Root actor crashed:\n') diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 47548106..89c97381 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -55,7 +55,7 @@ from ._exceptions import ( unpack_error, TransportClosed, ) -from . import _debug +from .devx import _debug from . import _state from .log import get_logger diff --git a/tractor/_runtime.py b/tractor/_runtime.py index cc8eaf5f..6b3d9461 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -78,7 +78,7 @@ from ._exceptions import ( TransportClosed, ) from ._discovery import get_arbiter -from . import _debug +from .devx import _debug from ._portal import Portal from . import _state from . import _mp_fixup_main @@ -187,7 +187,7 @@ class Actor: self._parent_main_data = _mp_fixup_main._mp_figure_out_main() # always include debugging tools module - enable_modules.append('tractor._debug') + enable_modules.append('tractor.devx._debug') mods = {} for name in enable_modules: @@ -632,7 +632,7 @@ class Actor: and not db_cs.cancel_called and uid == pdb_user_uid ): - log.warning( + log.critical( f'STALE DEBUG LOCK DETECTED FOR {uid}' ) # TODO: figure out why this breaks tests.. @@ -1722,4 +1722,6 @@ class Arbiter(Actor): ) -> None: uid = (str(uid[0]), str(uid[1])) - self._registry.pop(uid) + entry: tuple = self._registry.pop(uid, None) + if entry is None: + log.warning(f'Request to de-register {uid} failed?') diff --git a/tractor/_spawn.py b/tractor/_spawn.py index e91638bc..78c38c84 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -34,7 +34,7 @@ from typing import ( import trio from trio import TaskStatus -from ._debug import ( +from .devx._debug import ( maybe_wait_for_debugger, acquire_debug_lock, ) @@ -551,13 +551,14 @@ async def trio_proc( with trio.move_on_after(0.5): await proc.wait() - log.pdb( - 'Delaying subproc reaper while debugger locked..' - ) await maybe_wait_for_debugger( child_in_debug=_runtime_vars.get( '_debug_mode', False ), + header_msg=( + 'Delaying subproc reaper while debugger locked..\n' + ), + # TODO: need a diff value then default? # poll_steps=9999999, ) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 1e5ea387..c8c2336d 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -31,7 +31,7 @@ import warnings import trio -from ._debug import maybe_wait_for_debugger +from .devx._debug import maybe_wait_for_debugger from ._state import current_actor, is_main_process from .log import get_logger, get_loglevel from ._runtime import Actor diff --git a/tractor/devx/__init__.py b/tractor/devx/__init__.py new file mode 100644 index 00000000..c4676e3f --- /dev/null +++ b/tractor/devx/__init__.py @@ -0,0 +1,37 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Runtime "developer experience" utils and addons to aid our +(advanced) users and core devs in building distributed applications +and working with/on the actor runtime. + +""" +from ._debug import ( + maybe_wait_for_debugger as maybe_wait_for_debugger, + acquire_debug_lock as acquire_debug_lock, + breakpoint as breakpoint, + pause as pause, + pause_from_sync as pause_from_sync, + shield_sigint_handler as shield_sigint_handler, + MultiActorPdb as MultiActorPdb, + open_crash_handler as open_crash_handler, + maybe_open_crash_handler as maybe_open_crash_handler, + post_mortem as post_mortem, +) +from ._stackscope import ( + enable_stack_on_sig as enable_stack_on_sig, +) diff --git a/tractor/_debug.py b/tractor/devx/_debug.py similarity index 58% rename from tractor/_debug.py rename to tractor/devx/_debug.py index b0482f18..3203af1b 100644 --- a/tractor/_debug.py +++ b/tractor/devx/_debug.py @@ -1,18 +1,19 @@ # tractor: structured concurrent "actors". # Copyright 2018-eternity Tyler Goodlet. -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# This program is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation, either version 3 of +# the License, or (at your option) any later version. -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Affero General Public License for more details. -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . +# You should have received a copy of the GNU Affero General Public +# License along with this program. If not, see +# . """ Multi-core debugging for da peeps! @@ -20,17 +21,21 @@ Multi-core debugging for da peeps! """ from __future__ import annotations import bdb -import os -import sys -import signal +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, + nullcontext, +) from functools import ( partial, cached_property, ) -from contextlib import asynccontextmanager as acm +import os +import signal +import sys +import traceback from typing import ( Any, - Optional, Callable, AsyncIterator, AsyncGenerator, @@ -40,24 +45,31 @@ from types import FrameType import pdbp import tractor import trio -from trio_typing import TaskStatus +from trio.lowlevel import current_task +from trio_typing import ( + TaskStatus, + # Task, +) -from .log import get_logger -from ._discovery import get_root -from ._state import ( +from ..log import get_logger +from .._state import ( + current_actor, is_root_process, debug_mode, ) -from ._exceptions import ( +from .._exceptions import ( is_multi_cancelled, ContextCancelled, ) -from ._ipc import Channel +from .._ipc import Channel log = get_logger(__name__) -__all__ = ['breakpoint', 'post_mortem'] +__all__ = [ + 'breakpoint', + 'post_mortem', +] class Lock: @@ -69,10 +81,10 @@ class Lock: ''' repl: MultiActorPdb | None = None # placeholder for function to set a ``trio.Event`` on debugger exit - # pdb_release_hook: Optional[Callable] = None + # pdb_release_hook: Callable | None = None _trio_handler: Callable[ - [int, Optional[FrameType]], Any + [int, FrameType | None], Any ] | int | None = None # actor-wide variable pointing to current task name using debugger @@ -83,23 +95,23 @@ class Lock: # and must be cancelled if this actor is cancelled via IPC # request-message otherwise deadlocks with the parent actor may # ensure - _debugger_request_cs: Optional[trio.CancelScope] = None + _debugger_request_cs: trio.CancelScope|None = None # NOTE: set only in the root actor for the **local** root spawned task # which has acquired the lock (i.e. this is on the callee side of # the `lock_tty_for_child()` context entry). - _root_local_task_cs_in_debug: Optional[trio.CancelScope] = None + _root_local_task_cs_in_debug: trio.CancelScope|None = None # actor tree-wide actor uid that supposedly has the tty lock - global_actor_in_debug: Optional[tuple[str, str]] = None + global_actor_in_debug: tuple[str, str] = None - local_pdb_complete: Optional[trio.Event] = None - no_remote_has_tty: Optional[trio.Event] = None + local_pdb_complete: trio.Event | None = None + no_remote_has_tty: trio.Event | None = None # lock in root actor preventing multi-access to local tty _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() - _orig_sigint_handler: Optional[Callable] = None + _orig_sigint_handler: Callable | None = None _blocked: set[tuple[str, str]] = set() @classmethod @@ -110,6 +122,7 @@ class Lock: ) @classmethod + @pdbp.hideframe # XXX NOTE XXX see below in `.pause_from_sync()` def unshield_sigint(cls): # always restore ``trio``'s sigint handler. see notes below in # the pdb factory about the nightmare that is that code swapping @@ -129,10 +142,6 @@ class Lock: if owner: raise - # actor-local state, irrelevant for non-root. - cls.global_actor_in_debug = None - cls.local_task_in_debug = None - try: # sometimes the ``trio`` might already be terminated in # which case this call will raise. @@ -143,6 +152,11 @@ class Lock: cls.unshield_sigint() cls.repl = None + # actor-local state, irrelevant for non-root. + cls.global_actor_in_debug = None + cls.local_task_in_debug = None + + class TractorConfig(pdbp.DefaultConfig): ''' @@ -151,7 +165,7 @@ class TractorConfig(pdbp.DefaultConfig): ''' use_pygments: bool = True sticky_by_default: bool = False - enable_hidden_frames: bool = False + enable_hidden_frames: bool = True # much thanks @mdmintz for the hot tip! # fixes line spacing issue when resizing terminal B) @@ -228,26 +242,23 @@ async def _acquire_debug_lock_from_root_task( to the ``pdb`` repl. ''' - task_name = trio.lowlevel.current_task().name + task_name: str = current_task().name + we_acquired: bool = False log.runtime( f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}" ) - - we_acquired = False - try: log.runtime( f"entering lock checkpoint, remote task: {task_name}:{uid}" ) - we_acquired = True - # NOTE: if the surrounding cancel scope from the # `lock_tty_for_child()` caller is cancelled, this line should # unblock and NOT leave us in some kind of # a "child-locked-TTY-but-child-is-uncontactable-over-IPC" # condition. await Lock._debug_lock.acquire() + we_acquired = True if Lock.no_remote_has_tty is None: # mark the tty lock as being in use so that the runtime @@ -316,14 +327,13 @@ async def lock_tty_for_child( highly reliable at releasing the mutex complete! ''' - task_name = trio.lowlevel.current_task().name - + task_name: str = current_task().name if tuple(subactor_uid) in Lock._blocked: log.warning( f'Actor {subactor_uid} is blocked from acquiring debug lock\n' f"remote task: {task_name}:{subactor_uid}" ) - ctx._enter_debugger_on_cancel = False + ctx._enter_debugger_on_cancel: bool = False await ctx.cancel(f'Debug lock blocked for {subactor_uid}') return 'pdb_lock_blocked' @@ -374,12 +384,14 @@ async def wait_for_parent_stdin_hijack( This function is used by any sub-actor to acquire mutex access to the ``pdb`` REPL and thus the root's TTY for interactive debugging - (see below inside ``_breakpoint()``). It can be used to ensure that + (see below inside ``pause()``). It can be used to ensure that an intermediate nursery-owning actor does not clobber its children if they are in debug (see below inside ``maybe_wait_for_debugger()``). ''' + from .._discovery import get_root + with trio.CancelScope(shield=True) as cs: Lock._debugger_request_cs = cs @@ -389,7 +401,7 @@ async def wait_for_parent_stdin_hijack( # this syncs to child's ``Context.started()`` call. async with portal.open_context( - tractor._debug.lock_tty_for_child, + lock_tty_for_child, subactor_uid=actor_uid, ) as (ctx, val): @@ -398,11 +410,13 @@ async def wait_for_parent_stdin_hijack( assert val == 'Locked' async with ctx.open_stream() as stream: - # unblock local caller - try: + # unblock local caller assert Lock.local_pdb_complete task_status.started(cs) + + # wait for local task to exit and + # release the REPL await Lock.local_pdb_complete.wait() finally: @@ -440,30 +454,261 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]: return pdb, Lock.unshield_sigint -async def _breakpoint( - - debug_func, - - # TODO: - # shield: bool = False +def shield_sigint_handler( + signum: int, + frame: 'frame', # type: ignore # noqa + # pdb_obj: MultiActorPdb | None = None, + *args, ) -> None: ''' - Breakpoint entry for engaging debugger instance sync-interaction, - from async code, executing in actor runtime (task). + Specialized, debugger-aware SIGINT handler. + + In childred we always ignore to avoid deadlocks since cancellation + should always be managed by the parent supervising actor. The root + is always cancelled on ctrl-c. ''' __tracebackhide__ = True - actor = tractor.current_actor() - pdb, undo_sigint = mk_mpdb() - task_name = trio.lowlevel.current_task().name - # TODO: is it possible to debug a trio.Cancelled except block? - # right now it seems like we can kinda do with by shielding - # around ``tractor.breakpoint()`` but not if we move the shielded - # scope here??? - # with trio.CancelScope(shield=shield): - # await trio.lowlevel.checkpoint() + uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug + + actor = current_actor() + # print(f'{actor.uid} in HANDLER with ') + + def do_cancel(): + # If we haven't tried to cancel the runtime then do that instead + # of raising a KBI (which may non-gracefully destroy + # a ``trio.run()``). + if not actor._cancel_called: + actor.cancel_soon() + + # If the runtime is already cancelled it likely means the user + # hit ctrl-c again because teardown didn't full take place in + # which case we do the "hard" raising of a local KBI. + else: + raise KeyboardInterrupt + + any_connected: bool = False + + if uid_in_debug is not None: + # try to see if the supposed (sub)actor in debug still + # has an active connection to *this* actor, and if not + # it's likely they aren't using the TTY lock / debugger + # and we should propagate SIGINT normally. + chans: list[tractor.Channel] = actor._peers.get(tuple(uid_in_debug)) + if chans: + any_connected = any(chan.connected() for chan in chans) + if not any_connected: + log.warning( + 'A global actor reported to be in debug ' + 'but no connection exists for this child:\n' + f'{uid_in_debug}\n' + 'Allowing SIGINT propagation..' + ) + return do_cancel() + + # only set in the actor actually running the REPL + pdb_obj: MultiActorPdb | None = Lock.repl + + # root actor branch that reports whether or not a child + # has locked debugger. + if ( + is_root_process() + and uid_in_debug is not None + + # XXX: only if there is an existing connection to the + # (sub-)actor in debug do we ignore SIGINT in this + # parent! Otherwise we may hang waiting for an actor + # which has already terminated to unlock. + and any_connected + ): + # we are root and some actor is in debug mode + # if uid_in_debug is not None: + + if pdb_obj: + name = uid_in_debug[0] + if name != 'root': + log.pdb( + f"Ignoring SIGINT, child in debug mode: `{uid_in_debug}`" + ) + + else: + log.pdb( + "Ignoring SIGINT while in debug mode" + ) + elif ( + is_root_process() + ): + if pdb_obj: + log.pdb( + "Ignoring SIGINT since debug mode is enabled" + ) + + if ( + Lock._root_local_task_cs_in_debug + and not Lock._root_local_task_cs_in_debug.cancel_called + ): + Lock._root_local_task_cs_in_debug.cancel() + + # revert back to ``trio`` handler asap! + Lock.unshield_sigint() + + # child actor that has locked the debugger + elif not is_root_process(): + + chan: Channel = actor._parent_chan + if not chan or not chan.connected(): + log.warning( + 'A global actor reported to be in debug ' + 'but no connection exists for its parent:\n' + f'{uid_in_debug}\n' + 'Allowing SIGINT propagation..' + ) + return do_cancel() + + task: str | None = Lock.local_task_in_debug + if ( + task + and pdb_obj + ): + log.pdb( + f"Ignoring SIGINT while task in debug mode: `{task}`" + ) + + # TODO: how to handle the case of an intermediary-child actor + # that **is not** marked in debug mode? See oustanding issue: + # https://github.com/goodboy/tractor/issues/320 + # elif debug_mode(): + + else: # XXX: shouldn't ever get here? + raise RuntimeError("WTFWTFWTF") + # raise KeyboardInterrupt("WTFWTFWTF") + + # NOTE: currently (at least on ``fancycompleter`` 0.9.2) + # it looks to be that the last command that was run (eg. ll) + # will be repeated by default. + + # maybe redraw/print last REPL output to console since + # we want to alert the user that more input is expect since + # nothing has been done dur to ignoring sigint. + if ( + pdb_obj # only when this actor has a REPL engaged + ): + # XXX: yah, mega hack, but how else do we catch this madness XD + if pdb_obj.shname == 'xonsh': + pdb_obj.stdout.write(pdb_obj.prompt) + + pdb_obj.stdout.flush() + + # TODO: make this work like sticky mode where if there is output + # detected as written to the tty we redraw this part underneath + # and erase the past draw of this same bit above? + # pdb_obj.sticky = True + # pdb_obj._print_if_sticky() + + # also see these links for an approach from ``ptk``: + # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 + # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py + + +_pause_msg: str = 'Attaching to pdb REPL in actor' + + +def _set_trace( + actor: tractor.Actor | None = None, + pdb: MultiActorPdb | None = None, + shield: bool = False, + + extra_frames_up_when_async: int = 1, +): + __tracebackhide__: bool = True + actor: tractor.Actor = actor or current_actor() + + # always start 1 level up from THIS in user code. + frame: FrameType|None + if frame := sys._getframe(): + frame: FrameType = frame.f_back # type: ignore + + if ( + frame + and ( + pdb + and actor is not None + ) + # or shield + ): + msg: str = _pause_msg + if shield: + # log.warning( + msg = ( + '\n\n' + ' ------ - ------\n' + 'Debugger invoked with `shield=True` so an extra\n' + '`trio.CancelScope.__exit__()` frame is shown..\n' + '\n' + 'Try going up one frame to see your pause point!\n' + '\n' + ' SORRY we need to fix this!\n' + ' ------ - ------\n\n' + ) + msg + + # pdbp.set_trace() + # TODO: maybe print the actor supervion tree up to the + # root here? Bo + log.pdb( + f'{msg}\n' + '|\n' + f'|_ {actor.uid}\n' + ) + # no f!#$&* idea, but when we're in async land + # we need 2x frames up? + for i in range(extra_frames_up_when_async): + frame: FrameType = frame.f_back + log.debug( + f'Going up frame {i} -> {frame}\n' + ) + + else: + pdb, undo_sigint = mk_mpdb() + + # we entered the global ``breakpoint()`` built-in from sync + # code? + Lock.local_task_in_debug = 'sync' + + pdb.set_trace(frame=frame) + + +async def _pause( + + debug_func: Callable = _set_trace, + release_lock_signal: trio.Event | None = None, + + # TODO: allow caller to pause despite task cancellation, + # exactly the same as wrapping with: + # with CancelScope(shield=True): + # await pause() + # => the REMAINING ISSUE is that the scope's .__exit__() frame + # is always show in the debugger on entry.. and there seems to + # be no way to override it?.. + # shield: bool = False, + + shield: bool = False, + task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED + +) -> None: + ''' + Inner impl for `pause()` to avoid the `trio.CancelScope.__exit__()` + stack frame when not shielded (since apparently i can't figure out + how to hide it using the normal mechanisms..) + + Hopefully we won't need this in the long run. + + ''' + __tracebackhide__: bool = True + actor = current_actor() + pdb, undo_sigint = mk_mpdb() + task_name: str = trio.lowlevel.current_task().name if ( not Lock.local_pdb_complete @@ -471,6 +716,10 @@ async def _breakpoint( ): Lock.local_pdb_complete = trio.Event() + debug_func = partial( + debug_func, + ) + # TODO: need a more robust check for the "root" actor if ( not is_root_process() @@ -559,229 +808,221 @@ async def _breakpoint( Lock.repl = pdb try: - # block here one (at the appropriate frame *up*) where - # ``breakpoint()`` was awaited and begin handling stdio. - log.debug("Entering the synchronous world of pdb") - debug_func(actor, pdb) + # TODO: do we want to support using this **just** for the + # locking / common code (prolly to help address #320)? + # + # if debug_func is None: + # assert release_lock_signal, ( + # 'Must pass `release_lock_signal: trio.Event` if no ' + # 'trace func provided!' + # ) + # print(f"{actor.uid} ENTERING WAIT") + # with trio.CancelScope(shield=True): + # await release_lock_signal.wait() + + # else: + # block here one (at the appropriate frame *up*) where + # ``breakpoint()`` was awaited and begin handling stdio. + log.debug('Entering sync world of the `pdb` REPL..') + try: + debug_func( + actor, + pdb, + extra_frames_up_when_async=2, + shield=shield, + ) + except BaseException: + log.exception( + 'Failed to invoke internal `debug_func = ' + f'{debug_func.func.__name__}`\n' + ) + raise except bdb.BdbQuit: Lock.release() raise - # XXX: apparently we can't do this without showing this frame - # in the backtrace on first entry to the REPL? Seems like an odd - # behaviour that should have been fixed by now. This is also why - # we scrapped all the @cm approaches that were tried previously. - # finally: - # __tracebackhide__ = True - # # frame = sys._getframe() - # # last_f = frame.f_back - # # last_f.f_globals['__tracebackhide__'] = True - # # signal.signal = pdbp.hideframe(signal.signal) + except BaseException: + log.exception( + 'Failed to engage debugger via `_pause()` ??\n' + ) + raise + +# XXX: apparently we can't do this without showing this frame +# in the backtrace on first entry to the REPL? Seems like an odd +# behaviour that should have been fixed by now. This is also why +# we scrapped all the @cm approaches that were tried previously. +# finally: +# __tracebackhide__ = True +# # frame = sys._getframe() +# # last_f = frame.f_back +# # last_f.f_globals['__tracebackhide__'] = True +# # signal.signal = pdbp.hideframe(signal.signal) -def shield_sigint_handler( - signum: int, - frame: 'frame', # type: ignore # noqa - # pdb_obj: Optional[MultiActorPdb] = None, - *args, +async def pause( + + debug_func: Callable = _set_trace, + release_lock_signal: trio.Event | None = None, + + # TODO: allow caller to pause despite task cancellation, + # exactly the same as wrapping with: + # with CancelScope(shield=True): + # await pause() + # => the REMAINING ISSUE is that the scope's .__exit__() frame + # is always show in the debugger on entry.. and there seems to + # be no way to override it?.. + # shield: bool = False, + + shield: bool = False, + task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED ) -> None: ''' - Specialized, debugger-aware SIGINT handler. + A pause point (more commonly known as a "breakpoint") interrupt + instruction for engaging a blocking debugger instance to + conduct manual console-based-REPL-interaction from within + `tractor`'s async runtime, normally from some single-threaded + and currently executing actor-hosted-`trio`-task in some + (remote) process. - In childred we always ignore to avoid deadlocks since cancellation - should always be managed by the parent supervising actor. The root - is always cancelled on ctrl-c. + NOTE: we use the semantics "pause" since it better encompasses + the entirety of the necessary global-runtime-state-mutation any + actor-task must access and lock in order to get full isolated + control over the process tree's root TTY: + https://en.wikipedia.org/wiki/Breakpoint ''' - __tracebackhide__ = True + __tracebackhide__: bool = True - uid_in_debug = Lock.global_actor_in_debug + if shield: + # NOTE XXX: even hard coding this inside the `class CancelScope:` + # doesn't seem to work for me!? + # ^ XXX ^ - actor = tractor.current_actor() - # print(f'{actor.uid} in HANDLER with ') + # def _exit(self, *args, **kwargs): + # __tracebackhide__: bool = True + # super().__exit__(*args, **kwargs) - def do_cancel(): - # If we haven't tried to cancel the runtime then do that instead - # of raising a KBI (which may non-gracefully destroy - # a ``trio.run()``). - if not actor._cancel_called: - actor.cancel_soon() + trio.CancelScope.__enter__.__tracebackhide__ = True + trio.CancelScope.__exit__.__tracebackhide__ = True - # If the runtime is already cancelled it likely means the user - # hit ctrl-c again because teardown didn't full take place in - # which case we do the "hard" raising of a local KBI. - else: - raise KeyboardInterrupt + # import types + # with trio.CancelScope(shield=shield) as cs: + # cs.__exit__ = types.MethodType(_exit, cs) + # cs.__exit__.__tracebackhide__ = True - any_connected = False + with trio.CancelScope(shield=shield) as cs: + # setattr(cs.__exit__.__func__, '__tracebackhide__', True) + # setattr(cs.__enter__.__func__, '__tracebackhide__', True) - if uid_in_debug is not None: - # try to see if the supposed (sub)actor in debug still - # has an active connection to *this* actor, and if not - # it's likely they aren't using the TTY lock / debugger - # and we should propagate SIGINT normally. - chans = actor._peers.get(tuple(uid_in_debug)) - if chans: - any_connected = any(chan.connected() for chan in chans) - if not any_connected: - log.warning( - 'A global actor reported to be in debug ' - 'but no connection exists for this child:\n' - f'{uid_in_debug}\n' - 'Allowing SIGINT propagation..' - ) - return do_cancel() - - # only set in the actor actually running the REPL - pdb_obj = Lock.repl - - # root actor branch that reports whether or not a child - # has locked debugger. - if ( - is_root_process() - and uid_in_debug is not None - - # XXX: only if there is an existing connection to the - # (sub-)actor in debug do we ignore SIGINT in this - # parent! Otherwise we may hang waiting for an actor - # which has already terminated to unlock. - and any_connected - ): - # we are root and some actor is in debug mode - # if uid_in_debug is not None: - - if pdb_obj: - name = uid_in_debug[0] - if name != 'root': - log.pdb( - f"Ignoring SIGINT, child in debug mode: `{uid_in_debug}`" - ) - - else: - log.pdb( - "Ignoring SIGINT while in debug mode" - ) - elif ( - is_root_process() - ): - if pdb_obj: - log.pdb( - "Ignoring SIGINT since debug mode is enabled" + # NOTE: so the caller can always cancel even if shielded + task_status.started(cs) + return await _pause( + debug_func=debug_func, + release_lock_signal=release_lock_signal, + shield=True, + task_status=task_status, ) - - if ( - Lock._root_local_task_cs_in_debug - and not Lock._root_local_task_cs_in_debug.cancel_called - ): - Lock._root_local_task_cs_in_debug.cancel() - - # revert back to ``trio`` handler asap! - Lock.unshield_sigint() - - # child actor that has locked the debugger - elif not is_root_process(): - - chan: Channel = actor._parent_chan - if not chan or not chan.connected(): - log.warning( - 'A global actor reported to be in debug ' - 'but no connection exists for its parent:\n' - f'{uid_in_debug}\n' - 'Allowing SIGINT propagation..' - ) - return do_cancel() - - task = Lock.local_task_in_debug - if ( - task - and pdb_obj - ): - log.pdb( - f"Ignoring SIGINT while task in debug mode: `{task}`" - ) - - # TODO: how to handle the case of an intermediary-child actor - # that **is not** marked in debug mode? See oustanding issue: - # https://github.com/goodboy/tractor/issues/320 - # elif debug_mode(): - - else: # XXX: shouldn't ever get here? - print("WTFWTFWTF") - raise KeyboardInterrupt - - # NOTE: currently (at least on ``fancycompleter`` 0.9.2) - # it looks to be that the last command that was run (eg. ll) - # will be repeated by default. - - # maybe redraw/print last REPL output to console since - # we want to alert the user that more input is expect since - # nothing has been done dur to ignoring sigint. - if ( - pdb_obj # only when this actor has a REPL engaged - ): - # XXX: yah, mega hack, but how else do we catch this madness XD - if pdb_obj.shname == 'xonsh': - pdb_obj.stdout.write(pdb_obj.prompt) - - pdb_obj.stdout.flush() - - # TODO: make this work like sticky mode where if there is output - # detected as written to the tty we redraw this part underneath - # and erase the past draw of this same bit above? - # pdb_obj.sticky = True - # pdb_obj._print_if_sticky() - - # also see these links for an approach from ``ptk``: - # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 - # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py - - # XXX LEGACY: lol, see ``pdbpp`` issue: - # https://github.com/pdbpp/pdbpp/issues/496 - - -def _set_trace( - actor: tractor.Actor | None = None, - pdb: MultiActorPdb | None = None, -): - __tracebackhide__ = True - actor = actor or tractor.current_actor() - - # start 2 levels up in user code - frame: Optional[FrameType] = sys._getframe() - if frame: - frame = frame.f_back # type: ignore - - if ( - frame - and pdb - and actor is not None - ): - log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") - # no f!#$&* idea, but when we're in async land - # we need 2x frames up? - frame = frame.f_back - else: - pdb, undo_sigint = mk_mpdb() - - # we entered the global ``breakpoint()`` built-in from sync - # code? - Lock.local_task_in_debug = 'sync' - - pdb.set_trace(frame=frame) + return await _pause( + debug_func=debug_func, + release_lock_signal=release_lock_signal, + shield=False, + task_status=task_status, + ) -breakpoint = partial( - _breakpoint, - _set_trace, + + +# TODO: allow pausing from sync code. +# normally by remapping python's builtin breakpoint() hook to this +# runtime aware version which takes care of all . +def pause_from_sync() -> None: + print("ENTER SYNC PAUSE") + actor: tractor.Actor = current_actor( + err_on_no_runtime=False, + ) + if actor: + try: + import greenback + # __tracebackhide__ = True + + + # task_can_release_tty_lock = trio.Event() + + # spawn bg task which will lock out the TTY, we poll + # just below until the release event is reporting that task as + # waiting.. not the most ideal but works for now ;) + greenback.await_( + actor._service_n.start(partial( + pause, + debug_func=None, + # release_lock_signal=task_can_release_tty_lock, + )) + ) + + except ModuleNotFoundError: + log.warning('NO GREENBACK FOUND') + else: + log.warning('Not inside actor-runtime') + + db, undo_sigint = mk_mpdb() + Lock.local_task_in_debug = 'sync' + # db.config.enable_hidden_frames = True + + # we entered the global ``breakpoint()`` built-in from sync + # code? + frame: FrameType | None = sys._getframe() + # print(f'FRAME: {str(frame)}') + # assert not db._is_hidden(frame) + + frame: FrameType = frame.f_back # type: ignore + # print(f'FRAME: {str(frame)}') + # if not db._is_hidden(frame): + # pdbp.set_trace() + # db._hidden_frames.append( + # (frame, frame.f_lineno) + # ) + db.set_trace(frame=frame) + # NOTE XXX: see the `@pdbp.hideframe` decoration + # on `Lock.unshield_sigint()`.. I have NO CLUE why + # the next instruction's def frame is being shown + # in the tb but it seems to be something wonky with + # the way `pdb` core works? + # undo_sigint() + + # Lock.global_actor_in_debug = actor.uid + # Lock.release() + # task_can_release_tty_lock.set() + + +# using the "pause" semantics instead since +# that better covers actually somewhat "pausing the runtime" +# for this particular paralell task to do debugging B) +# pp = pause # short-hand for "pause point" + + +async def breakpoint(**kwargs): + log.warning( + '`tractor.breakpoint()` is deprecated!\n' + 'Please use `tractor.pause()` instead!\n' + ) + await pause(**kwargs) + + +_crash_msg: str = ( + 'Attaching to pdb REPL in crashed actor' ) def _post_mortem( actor: tractor.Actor, pdb: MultiActorPdb, + shield: bool = False, + + # only for compat with `._set_trace()`.. + extra_frames_up_when_async=0, ) -> None: ''' @@ -789,20 +1030,28 @@ def _post_mortem( debugger instance. ''' - log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") + # TODO: print the actor supervion tree up to the root + # here! Bo + log.pdb( + f'{_crash_msg}\n' + '|\n' + f'|_ {actor.uid}\n' + ) - # TODO: you need ``pdbpp`` master (at least this commit - # https://github.com/pdbpp/pdbpp/commit/b757794857f98d53e3ebbe70879663d7d843a6c2) - # to fix this and avoid the hang it causes. See issue: - # https://github.com/pdbpp/pdbpp/issues/480 - # TODO: help with a 3.10+ major release if/when it arrives. - - pdbp.xpm(Pdb=lambda: pdb) + # TODO: only replacing this to add the + # `end=''` to the print XD + # pdbp.xpm(Pdb=lambda: pdb) + info = sys.exc_info() + print(traceback.format_exc(), end='') + pdbp.post_mortem( + t=info[2], + Pdb=lambda: pdb, + ) post_mortem = partial( - _breakpoint, - _post_mortem, + pause, + debug_func=_post_mortem, ) @@ -843,9 +1092,10 @@ async def acquire_debug_lock( ''' Grab root's debug lock on entry, release on exit. - This helper is for actor's who don't actually need - to acquired the debugger but want to wait until the - lock is free in the process-tree root. + This helper is for actor's who don't actually need to acquired + the debugger but want to wait until the lock is free in the + process-tree root such that they don't clobber an ongoing pdb + REPL session in some peer or child! ''' if not debug_mode(): @@ -866,14 +1116,18 @@ async def maybe_wait_for_debugger( poll_delay: float = 0.1, child_in_debug: bool = False, -) -> None: + header_msg: str = '', + +) -> bool: # was locked and we polled? if ( not debug_mode() and not child_in_debug ): - return + return False + + msg: str = header_msg if ( is_root_process() ): @@ -883,40 +1137,147 @@ async def maybe_wait_for_debugger( # will make the pdb repl unusable. # Instead try to wait for pdb to be released before # tearing down. + in_debug: tuple[str, str]|None = Lock.global_actor_in_debug + debug_complete: trio.Event|None = Lock.no_remote_has_tty - sub_in_debug = None + if in_debug == current_actor().uid: + log.debug( + msg + + + 'Root already owns the TTY LOCK' + ) + return True - for _ in range(poll_steps): - - if Lock.global_actor_in_debug: - sub_in_debug = tuple(Lock.global_actor_in_debug) - - log.debug('Root polling for debug') - - with trio.CancelScope(shield=True): - await trio.sleep(poll_delay) - - # TODO: could this make things more deterministic? wait - # to see if a sub-actor task will be scheduled and grab - # the tty lock on the next tick? - # XXX: doesn't seem to work - # await trio.testing.wait_all_tasks_blocked(cushion=0) - - debug_complete = Lock.no_remote_has_tty - if ( - (debug_complete and - not debug_complete.is_set()) - ): - log.debug( - 'Root has errored but pdb is in use by ' - f'child {sub_in_debug}\n' - 'Waiting on tty lock to release..') - - await debug_complete.wait() - - await trio.sleep(poll_delay) - continue + elif in_debug: + msg += ( + f'Debug `Lock` in use by subactor: {in_debug}\n' + ) + # TODO: could this make things more deterministic? + # wait to see if a sub-actor task will be + # scheduled and grab the tty lock on the next + # tick? + # XXX => but it doesn't seem to work.. + # await trio.testing.wait_all_tasks_blocked(cushion=0) else: log.debug( - 'Root acquired TTY LOCK' + msg + + + 'Root immediately acquired debug TTY LOCK' ) + return False + + for istep in range(poll_steps): + if ( + debug_complete + and not debug_complete.is_set() + and in_debug is not None + ): + log.pdb( + msg + + + 'Root is waiting on tty lock to release..\n' + ) + with trio.CancelScope(shield=True): + await debug_complete.wait() + log.pdb( + f'Child subactor released debug lock\n' + f'|_{in_debug}\n' + ) + + # is no subactor locking debugger currently? + if ( + in_debug is None + and ( + debug_complete is None + or debug_complete.is_set() + ) + ): + log.pdb( + msg + + + 'Root acquired tty lock!' + ) + break + + else: + # TODO: don't need this right? + # await trio.lowlevel.checkpoint() + + log.debug( + 'Root polling for debug:\n' + f'poll step: {istep}\n' + f'poll delya: {poll_delay}' + ) + with trio.CancelScope(shield=True): + await trio.sleep(poll_delay) + continue + + # fallthrough on failure to acquire.. + # else: + # raise RuntimeError( + # msg + # + + # 'Root actor failed to acquire debug lock?' + # ) + return True + + # else: + # # TODO: non-root call for #320? + # this_uid: tuple[str, str] = current_actor().uid + # async with acquire_debug_lock( + # subactor_uid=this_uid, + # ): + # pass + return False + +# TODO: better naming and what additionals? +# - [ ] optional runtime plugging? +# - [ ] detection for sync vs. async code? +# - [ ] specialized REPL entry when in distributed mode? +# - [x] allow ignoring kbi Bo +@cm +def open_crash_handler( + catch: set[BaseException] = { + Exception, + BaseException, + }, + ignore: set[BaseException] = { + KeyboardInterrupt, + }, +): + ''' + Generic "post mortem" crash handler using `pdbp` REPL debugger. + + We expose this as a CLI framework addon to both `click` and + `typer` users so they can quickly wrap cmd endpoints which get + automatically wrapped to use the runtime's `debug_mode: bool` + AND `pdbp.pm()` around any code that is PRE-runtime entry + - any sync code which runs BEFORE the main call to + `trio.run()`. + + ''' + try: + yield + except tuple(catch) as err: + + if type(err) not in ignore: + pdbp.xpm() + + raise + + +@cm +def maybe_open_crash_handler(pdb: bool = False): + ''' + Same as `open_crash_handler()` but with bool input flag + to allow conditional handling. + + Normally this is used with CLI endpoints such that if the --pdb + flag is passed the pdb REPL is engaed on any crashes B) + ''' + rtctx = nullcontext + if pdb: + rtctx = open_crash_handler + + with rtctx(): + yield diff --git a/tractor/devx/_stackscope.py b/tractor/devx/_stackscope.py new file mode 100644 index 00000000..706b85d3 --- /dev/null +++ b/tractor/devx/_stackscope.py @@ -0,0 +1,84 @@ +# tractor: structured concurrent "actors". +# Copyright eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +The fundamental cross process SC abstraction: an inter-actor, +cancel-scope linked task "context". + +A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built +into each ``trio.Nursery`` except it links the lifetimes of memory space +disjoint, parallel executing tasks in separate actors. + +''' +from signal import ( + signal, + SIGUSR1, +) + +import trio + +@trio.lowlevel.disable_ki_protection +def dump_task_tree() -> None: + import stackscope + from tractor.log import get_console_log + + tree_str: str = str( + stackscope.extract( + trio.lowlevel.current_root_task(), + recurse_child_tasks=True + ) + ) + log = get_console_log('cancel') + log.pdb( + f'Dumping `stackscope` tree:\n\n' + f'{tree_str}\n' + ) + # import logging + # try: + # with open("/dev/tty", "w") as tty: + # tty.write(tree_str) + # except BaseException: + # logging.getLogger( + # "task_tree" + # ).exception("Error printing task tree") + + +def signal_handler(sig: int, frame: object) -> None: + import traceback + try: + trio.lowlevel.current_trio_token( + ).run_sync_soon(dump_task_tree) + except RuntimeError: + # not in async context -- print a normal traceback + traceback.print_stack() + + + +def enable_stack_on_sig( + sig: int = SIGUSR1 +) -> None: + ''' + Enable `stackscope` tracing on reception of a signal; by + default this is SIGUSR1. + + ''' + signal( + sig, + signal_handler, + ) + # NOTE: not the above can be triggered from + # a (xonsh) shell using: + # kill -SIGUSR1 @$(pgrep -f '') diff --git a/tractor/devx/cli.py b/tractor/devx/cli.py new file mode 100644 index 00000000..c44f9686 --- /dev/null +++ b/tractor/devx/cli.py @@ -0,0 +1,129 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +CLI framework extensions for hacking on the actor runtime. + +Currently popular frameworks supported are: + + - `typer` via the `@callback` API + +""" +from __future__ import annotations +from typing import ( + Any, + Callable, +) +from typing_extensions import Annotated + +import typer + + +_runtime_vars: dict[str, Any] = {} + + +def load_runtime_vars( + ctx: typer.Context, + callback: Callable, + pdb: bool = False, # --pdb + ll: Annotated[ + str, + typer.Option( + '--loglevel', + '-l', + help='BigD logging level', + ), + ] = 'cancel', # -l info +): + ''' + Maybe engage crash handling with `pdbp` when code inside + a `typer` CLI endpoint cmd raises. + + To use this callback simply take your `app = typer.Typer()` instance + and decorate this function with it like so: + + .. code:: python + + from tractor.devx import cli + + app = typer.Typer() + + # manual decoration to hook into `click`'s context system! + cli.load_runtime_vars = app.callback( + invoke_without_command=True, + ) + + And then you can use the now augmented `click` CLI context as so, + + .. code:: python + + @app.command( + context_settings={ + "allow_extra_args": True, + "ignore_unknown_options": True, + } + ) + def my_cli_cmd( + ctx: typer.Context, + ): + rtvars: dict = ctx.runtime_vars + pdb: bool = rtvars['pdb'] + + with tractor.devx.cli.maybe_open_crash_handler(pdb=pdb): + trio.run( + partial( + my_tractor_main_task_func, + debug_mode=pdb, + loglevel=rtvars['ll'], + ) + ) + + which will enable log level and debug mode globally for the entire + `tractor` + `trio` runtime thereafter! + + Bo + + ''' + global _runtime_vars + _runtime_vars |= { + 'pdb': pdb, + 'll': ll, + } + + ctx.runtime_vars: dict[str, Any] = _runtime_vars + print( + f'`typer` sub-cmd: {ctx.invoked_subcommand}\n' + f'`tractor` runtime vars: {_runtime_vars}' + ) + + # XXX NOTE XXX: hackzone.. if no sub-cmd is specified (the + # default if the user just invokes `bigd`) then we simply + # invoke the sole `_bigd()` cmd passing in the "parent" + # typer.Context directly to that call since we're treating it + # as a "non sub-command" or wtv.. + # TODO: ideally typer would have some kinda built-in way to get + # this behaviour without having to construct and manually + # invoke our own cmd.. + if ( + ctx.invoked_subcommand is None + or ctx.invoked_subcommand == callback.__name__ + ): + cmd: typer.core.TyperCommand = typer.core.TyperCommand( + name='bigd', + callback=callback, + ) + ctx.params = {'ctx': ctx} + cmd.invoke(ctx) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 174a99d3..7c88edd2 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -28,7 +28,6 @@ from typing import ( Callable, AsyncIterator, Awaitable, - Optional, ) import trio @@ -65,9 +64,9 @@ class LinkedTaskChannel(trio.abc.Channel): _trio_exited: bool = False # set after ``asyncio.create_task()`` - _aio_task: Optional[asyncio.Task] = None - _aio_err: Optional[BaseException] = None - _broadcaster: Optional[BroadcastReceiver] = None + _aio_task: asyncio.Task | None = None + _aio_err: BaseException | None = None + _broadcaster: BroadcastReceiver | None = None async def aclose(self) -> None: await self._from_aio.aclose() @@ -188,7 +187,7 @@ def _run_asyncio_task( cancel_scope = trio.CancelScope() aio_task_complete = trio.Event() - aio_err: Optional[BaseException] = None + aio_err: BaseException | None = None chan = LinkedTaskChannel( aio_q, # asyncio.Queue @@ -270,7 +269,7 @@ def _run_asyncio_task( ''' nonlocal chan aio_err = chan._aio_err - task_err: Optional[BaseException] = None + task_err: BaseException | None = None # only to avoid ``asyncio`` complaining about uncaptured # task exceptions @@ -350,11 +349,11 @@ async def translate_aio_errors( ''' trio_task = trio.lowlevel.current_task() - aio_err: Optional[BaseException] = None + aio_err: BaseException | None = None # TODO: make thisi a channel method? def maybe_raise_aio_err( - err: Optional[Exception] = None + err: Exception | None = None ) -> None: aio_err = chan._aio_err if (