diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 689e364..3e271ff 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,4 +38,4 @@ jobs: - name: Install dependencies run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager - name: Run tests - run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} \ No newline at end of file + run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 484f64d..0000000 --- a/.travis.yml +++ /dev/null @@ -1,68 +0,0 @@ -language: python -dist: xenial -sudo: required - -matrix: - include: - - name: "Windows, Python Latest: multiprocessing" - os: windows - language: sh - python: 3.x # only works on linux - env: SPAWN_BACKEND="mp" - before_install: - - choco install python3 --params "/InstallDir:C:\\Python" - - export PATH="/c/Python:/c/Python/Scripts:$PATH" - - python -m pip install --upgrade pip wheel - - - name: "Windows, Python Latest: trio" - os: windows - language: sh - python: 3.x # only works on linux - env: SPAWN_BACKEND="trio" - before_install: - - choco install python3 --params "/InstallDir:C:\\Python" - - export PATH="/c/Python:/c/Python/Scripts:$PATH" - - python -m pip install --upgrade pip wheel - - - name: "Windows, Python 3.7: multiprocessing" - os: windows - python: 3.7 # only works on linux - env: SPAWN_BACKEND="mp" - language: sh - before_install: - - choco install python3 --version 3.7.4 --params "/InstallDir:C:\\Python" - - export PATH="/c/Python:/c/Python/Scripts:$PATH" - - python -m pip install --upgrade pip wheel - - - name: "Windows, Python 3.7: trio" - os: windows - python: 3.7 # only works on linux - env: SPAWN_BACKEND="trio" - language: sh - before_install: - - choco install python3 --version 3.7.4 --params "/InstallDir:C:\\Python" - - export PATH="/c/Python:/c/Python/Scripts:$PATH" - - python -m pip install --upgrade pip wheel - - - name: "Python 3.7: multiprocessing" - python: 3.7 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND="mp" - - name: "Python 3.7: trio" - python: 3.7 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND="trio" - - - name: "Python 3.8: multiprocessing" - python: 3.8 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND="mp" - - name: "Python 3.8: trio" - python: 3.8 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND="trio" - -install: - - cd $TRAVIS_BUILD_DIR - - pip install -U pip - - pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager - -script: - - mypy tractor/ --ignore-missing-imports - - pytest tests/ --spawn-backend=${SPAWN_BACKEND} diff --git a/README.rst b/README.rst index 804c8f2..e2e4dd3 100644 --- a/README.rst +++ b/README.rst @@ -2,7 +2,8 @@ tractor ======= A `structured concurrent`_, async-native "`actor model`_" built on trio_ and multiprocessing_. -|travis| |docs| +|gh_actions| +|docs| .. _actor model: https://en.wikipedia.org/wiki/Actor_model .. _trio: https://github.com/python-trio/trio @@ -57,8 +58,8 @@ channel`_! .. _matrix channel: https://matrix.to/#/!tractor:matrix.org -.. |travis| image:: https://img.shields.io/travis/goodboy/tractor/master.svg - :target: https://travis-ci.org/goodboy/tractor +.. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square + :target: https://actions-badge.atrox.dev/goodboy/tractor/goto .. |docs| image:: https://readthedocs.org/projects/tractor/badge/?version=latest :target: https://tractor.readthedocs.io/en/latest/?badge=latest :alt: Documentation Status diff --git a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py new file mode 100644 index 0000000..488fffa --- /dev/null +++ b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py @@ -0,0 +1,58 @@ +import tractor + + +async def name_error(): + "Raise a ``NameError``" + getattr(doggypants) + + +async def breakpoint_forever(): + "Indefinitely re-enter debugger in child actor." + while True: + await tractor.breakpoint() + + +async def spawn_until(depth=0): + """"A nested nursery that triggers another ``NameError``. + """ + async with tractor.open_nursery() as n: + if depth < 1: + # await n.run_in_actor('breakpoint_forever', breakpoint_forever) + await n.run_in_actor('name_error', name_error) + else: + depth -= 1 + await n.run_in_actor(f'spawn_until_{depth}', spawn_until, depth=depth) + + +async def main(): + """The main ``tractor`` routine. + + The process tree should look as approximately as follows when the debugger + first engages: + + python examples/debugging/multi_nested_subactors_bp_forever.py + ├─ python -m tractor._child --uid ('spawner1', '7eab8462 ...) + │ └─ python -m tractor._child --uid ('spawn_until_3', 'afcba7a8 ...) + │ └─ python -m tractor._child --uid ('spawn_until_2', 'd2433d13 ...) + │ └─ python -m tractor._child --uid ('spawn_until_1', '1df589de ...) + │ └─ python -m tractor._child --uid ('spawn_until_0', '3720602b ...) + │ + └─ python -m tractor._child --uid ('spawner0', '1d42012b ...) + └─ python -m tractor._child --uid ('spawn_until_2', '2877e155 ...) + └─ python -m tractor._child --uid ('spawn_until_1', '0502d786 ...) + └─ python -m tractor._child --uid ('spawn_until_0', 'de918e6d ...) + + """ + async with tractor.open_nursery() as n: + + # spawn both actors + portal = await n.run_in_actor('spawner0', spawn_until, depth=3) + portal1 = await n.run_in_actor('spawner1', spawn_until, depth=4) + + # gah still an issue here. + # await portal.result() + # await portal1.result() + + +if __name__ == '__main__': + tractor.run(main, debug_mode=True) diff --git a/examples/debugging/multi_subactor_root_errors.py b/examples/debugging/multi_subactor_root_errors.py new file mode 100644 index 0000000..05f0fa7 --- /dev/null +++ b/examples/debugging/multi_subactor_root_errors.py @@ -0,0 +1,43 @@ +import tractor + + +async def name_error(): + "Raise a ``NameError``" + getattr(doggypants) + + +async def spawn_error(): + """"A nested nursery that triggers another ``NameError``. + """ + async with tractor.open_nursery() as n: + portal = await n.run_in_actor('name_error_1', name_error) + return await portal.result() + + +async def main(): + """The main ``tractor`` routine. + + The process tree should look as approximately as follows: + + python examples/debugging/multi_subactors.py + ├─ python -m tractor._child --uid ('name_error', 'a7caf490 ...) + `-python -m tractor._child --uid ('spawn_error', '52ee14a5 ...) + `-python -m tractor._child --uid ('name_error', '3391222c ...) + """ + async with tractor.open_nursery() as n: + + # spawn both actors + portal = await n.run_in_actor('name_error', name_error) + portal1 = await n.run_in_actor('spawn_error', spawn_error) + + # trigger a root actor error + assert 0 + + # attempt to collect results (which raises error in parent) + # still has some issues where the parent seems to get stuck + await portal.result() + await portal1.result() + + +if __name__ == '__main__': + tractor.run(main, debug_mode=True) diff --git a/examples/debugging/multi_subactors.py b/examples/debugging/multi_subactors.py new file mode 100644 index 0000000..16ff22d --- /dev/null +++ b/examples/debugging/multi_subactors.py @@ -0,0 +1,47 @@ +import tractor +import trio + + +async def breakpoint_forever(): + "Indefinitely re-enter debugger in child actor." + while True: + await trio.sleep(0.1) + await tractor.breakpoint() + + +async def name_error(): + "Raise a ``NameError``" + getattr(doggypants) + + +async def spawn_error(): + """"A nested nursery that triggers another ``NameError``. + """ + async with tractor.open_nursery() as n: + portal = await n.run_in_actor('name_error_1', name_error) + return await portal.result() + + +async def main(): + """The main ``tractor`` routine. + + The process tree should look as approximately as follows: + + -python examples/debugging/multi_subactors.py + |-python -m tractor._child --uid ('name_error', 'a7caf490 ...) + |-python -m tractor._child --uid ('bp_forever', '1f787a7e ...) + `-python -m tractor._child --uid ('spawn_error', '52ee14a5 ...) + `-python -m tractor._child --uid ('name_error', '3391222c ...) + """ + async with tractor.open_nursery() as n: + + # Spawn both actors, don't bother with collecting results + # (would result in a different debugger outcome due to parent's + # cancellation). + await n.run_in_actor('bp_forever', breakpoint_forever) + await n.run_in_actor('name_error', name_error) + await n.run_in_actor('spawn_error', spawn_error) + + +if __name__ == '__main__': + tractor.run(main, debug_mode=True) diff --git a/examples/debugging/root_actor_breakpoint.py b/examples/debugging/root_actor_breakpoint.py new file mode 100644 index 0000000..bd4dcb1 --- /dev/null +++ b/examples/debugging/root_actor_breakpoint.py @@ -0,0 +1,15 @@ +import trio +import tractor + + +async def main(): + + await trio.sleep(0.1) + + await tractor.breakpoint() + + await trio.sleep(0.1) + + +if __name__ == '__main__': + tractor.run(main, debug_mode=True) diff --git a/examples/debugging/root_actor_breakpoint_forever.py b/examples/debugging/root_actor_breakpoint_forever.py new file mode 100644 index 0000000..0332ab6 --- /dev/null +++ b/examples/debugging/root_actor_breakpoint_forever.py @@ -0,0 +1,11 @@ +import tractor + + +async def main(): + + while True: + await tractor.breakpoint() + + +if __name__ == '__main__': + tractor.run(main, debug_mode=True) diff --git a/examples/debugging/root_actor_error.py b/examples/debugging/root_actor_error.py new file mode 100644 index 0000000..7486699 --- /dev/null +++ b/examples/debugging/root_actor_error.py @@ -0,0 +1,9 @@ +import tractor + + +async def main(): + assert 0 + + +if __name__ == '__main__': + tractor.run(main, debug_mode=True) diff --git a/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py b/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py new file mode 100644 index 0000000..04ec767 --- /dev/null +++ b/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py @@ -0,0 +1,48 @@ +import tractor + + +async def name_error(): + "Raise a ``NameError``" + getattr(doggypants) + + +async def spawn_until(depth=0): + """"A nested nursery that triggers another ``NameError``. + """ + async with tractor.open_nursery() as n: + if depth < 1: + # await n.run_in_actor('breakpoint_forever', breakpoint_forever) + await n.run_in_actor('name_error', name_error) + else: + depth -= 1 + await n.run_in_actor(f'spawn_until_{depth}', spawn_until, depth=depth) + + +async def main(): + """The main ``tractor`` routine. + + The process tree should look as approximately as follows when the debugger + first engages: + + python examples/debugging/multi_nested_subactors_bp_forever.py + ├─ python -m tractor._child --uid ('spawner1', '7eab8462 ...) + │ └─ python -m tractor._child --uid ('spawn_until_0', '3720602b ...) + │ └─ python -m tractor._child --uid ('name_error', '505bf71d ...) + │ + └─ python -m tractor._child --uid ('spawner0', '1d42012b ...) + └─ python -m tractor._child --uid ('name_error', '6c2733b8 ...) + + """ + async with tractor.open_nursery() as n: + + # spawn both actors + portal = await n.run_in_actor('spawner0', spawn_until, depth=0) + portal1 = await n.run_in_actor('spawner1', spawn_until, depth=1) + + # nursery cancellation should be triggered due to propagated error + await portal.result() + await portal1.result() + + +if __name__ == '__main__': + tractor.run(main, debug_mode=True, loglevel='warning') diff --git a/examples/debugging/subactor_breakpoint.py b/examples/debugging/subactor_breakpoint.py new file mode 100644 index 0000000..35db6cf --- /dev/null +++ b/examples/debugging/subactor_breakpoint.py @@ -0,0 +1,25 @@ +import trio +import tractor + + +async def breakpoint_forever(): + """Indefinitely re-enter debugger in child actor. + """ + while True: + await trio.sleep(0.1) + await tractor.breakpoint() + + +async def main(): + + async with tractor.open_nursery() as n: + + portal = await n.run_in_actor( + 'breakpoint_forever', + breakpoint_forever, + ) + await portal.result() + + +if __name__ == '__main__': + tractor.run(main, debug_mode=True) diff --git a/examples/debugging/subactor_error.py b/examples/debugging/subactor_error.py new file mode 100644 index 0000000..67ec1b6 --- /dev/null +++ b/examples/debugging/subactor_error.py @@ -0,0 +1,16 @@ +import tractor + + +async def name_error(): + getattr(doggypants) + + +async def main(): + async with tractor.open_nursery() as n: + + portal = await n.run_in_actor('name_error', name_error) + await portal.result() + + +if __name__ == '__main__': + tractor.run(main, debug_mode=True) diff --git a/requirements-test.txt b/requirements-test.txt index e9dae32..2e2d1b5 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -3,3 +3,4 @@ pytest-trio pdbpp mypy trio_typing +pexpect diff --git a/setup.py b/setup.py index 24d4479..e31e359 100755 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ setup( ], install_requires=[ 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', - 'trio_typing' + 'trio_typing', 'pdbpp', ], tests_require=['pytest'], python_requires=">=3.7", diff --git a/tests/conftest.py b/tests/conftest.py index 6a9f938..347cc7d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -39,6 +39,16 @@ no_windows = pytest.mark.skipif( ) +def repodir(): + """Return the abspath to the repo directory. + """ + dirname = os.path.dirname + dirpath = os.path.abspath( + dirname(dirname(os.path.realpath(__file__))) + ) + return dirpath + + def pytest_addoption(parser): parser.addoption( "--ll", action="store", dest='loglevel', diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index e6d460c..9e3c9f9 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -413,3 +413,38 @@ def test_cancel_via_SIGINT_other_task( with pytest.raises(KeyboardInterrupt): tractor.run(main) + + + +async def spin_for(period=3): + "Sync sleep." + time.sleep(period) + + +async def spawn(): + async with tractor.open_nursery() as tn: + portal = await tn.run_in_actor('sleeper', spin_for) + + +@no_windows +def test_cancel_while_childs_child_in_sync_sleep( + loglevel, + start_method, + spawn_backend, +): + """Verify that a child cancelled while executing sync code is torn + down even when that cancellation is triggered by the parent + 2 nurseries "up". + """ + if start_method == 'forkserver': + pytest.skip("Forksever sux hard at resuming from sync sleep...") + + async def main(): + with trio.fail_after(2): + async with tractor.open_nursery() as tn: + portal = await tn.run_in_actor('spawn', spawn) + await trio.sleep(1) + assert 0 + + with pytest.raises(AssertionError): + tractor.run(main) diff --git a/tests/test_debugger.py b/tests/test_debugger.py new file mode 100644 index 0000000..5ea4f71 --- /dev/null +++ b/tests/test_debugger.py @@ -0,0 +1,366 @@ +""" +That native debug better work! + +All these tests can be understood (somewhat) by running the equivalent +`examples/debugging/` scripts manually. + +TODO: None of these tests have been run successfully on windows yet. +""" +from os import path + +import pytest +import pexpect + +from conftest import repodir + + +# TODO: The next great debugger audit could be done by you! +# - recurrent entry to breakpoint() from single actor *after* and an +# error in another task? +# - root error before child errors +# - root error after child errors +# - root error before child breakpoint +# - root error after child breakpoint +# - recurrent root errors + + +def examples_dir(): + """Return the abspath to the examples directory. + """ + return path.join(repodir(), 'examples', 'debugging/') + + +def mk_cmd(ex_name: str) -> str: + """Generate a command suitable to pass to ``pexpect.spawn()``. + """ + return ' '.join( + ['python', + path.join(examples_dir(), f'{ex_name}.py')] + ) + + +@pytest.fixture +def spawn( + testdir, + arb_addr, +) -> 'pexpect.spawn': + + def _spawn(cmd): + return testdir.spawn( + cmd=mk_cmd(cmd), + expect_timeout=3, + ) + + return _spawn + + +@pytest.mark.parametrize( + 'user_in_out', + [ + ('c', 'AssertionError'), + ('q', 'AssertionError'), + ], + ids=lambda item: f'{item[0]} -> {item[1]}', +) +def test_root_actor_error(spawn, user_in_out): + """Demonstrate crash handler entering pdbpp from basic error in root actor. + """ + user_input, expect_err_str = user_in_out + + child = spawn('root_actor_error') + + # scan for the pdbpp prompt + child.expect(r"\(Pdb\+\+\)") + + before = str(child.before.decode()) + + # make sure expected logging and error arrives + assert "Attaching to pdb in crashed actor: ('arbiter'" in before + assert 'AssertionError' in before + + # send user command + child.sendline(user_input) + + # process should exit + child.expect(pexpect.EOF) + assert expect_err_str in str(child.before) + + +@pytest.mark.parametrize( + 'user_in_out', + [ + ('c', None), + ('q', 'bdb.BdbQuit'), + ], + ids=lambda item: f'{item[0]} -> {item[1]}', +) +def test_root_actor_bp(spawn, user_in_out): + """Demonstrate breakpoint from in root actor. + """ + user_input, expect_err_str = user_in_out + child = spawn('root_actor_breakpoint') + + # scan for the pdbpp prompt + child.expect(r"\(Pdb\+\+\)") + + assert 'Error' not in str(child.before) + + # send user command + child.sendline(user_input) + child.expect('\r\n') + + # process should exit + child.expect(pexpect.EOF) + + if expect_err_str is None: + assert 'Error' not in str(child.before) + else: + assert expect_err_str in str(child.before) + + +def test_root_actor_bp_forever(spawn): + "Re-enter a breakpoint from the root actor-task." + child = spawn('root_actor_breakpoint_forever') + + # do some "next" commands to demonstrate recurrent breakpoint + # entries + for _ in range(10): + child.sendline('next') + child.expect(r"\(Pdb\+\+\)") + + # do one continue which should trigger a new task to lock the tty + child.sendline('continue') + child.expect(r"\(Pdb\+\+\)") + + # XXX: this previously caused a bug! + child.sendline('n') + child.expect(r"\(Pdb\+\+\)") + + child.sendline('n') + child.expect(r"\(Pdb\+\+\)") + + +def test_subactor_error(spawn): + "Single subactor raising an error" + + child = spawn('subactor_error') + + # scan for the pdbpp prompt + child.expect(r"\(Pdb\+\+\)") + + before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('name_error'" in before + + # send user command + # (in this case it's the same for 'continue' vs. 'quit') + child.sendline('continue') + + # the debugger should enter a second time in the nursery + # creating actor + + child.expect(r"\(Pdb\+\+\)") + + before = str(child.before.decode()) + + # root actor gets debugger engaged + assert "Attaching to pdb in crashed actor: ('arbiter'" in before + + # error is a remote error propagated from the subactor + assert "RemoteActorError: ('name_error'" in before + + child.sendline('c') + child.expect('\r\n') + + # process should exit + child.expect(pexpect.EOF) + + +def test_subactor_breakpoint(spawn): + "Single subactor with an infinite breakpoint loop" + + child = spawn('subactor_breakpoint') + + # scan for the pdbpp prompt + child.expect(r"\(Pdb\+\+\)") + + before = str(child.before.decode()) + assert "Attaching pdb to actor: ('breakpoint_forever'" in before + + # do some "next" commands to demonstrate recurrent breakpoint + # entries + for _ in range(10): + child.sendline('next') + child.expect(r"\(Pdb\+\+\)") + + # now run some "continues" to show re-entries + for _ in range(5): + child.sendline('continue') + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) + assert "Attaching pdb to actor: ('breakpoint_forever'" in before + + # finally quit the loop + child.sendline('q') + + # child process should exit but parent will capture pdb.BdbQuit + child.expect(r"\(Pdb\+\+\)") + + before = str(child.before.decode()) + assert "RemoteActorError: ('breakpoint_forever'" in before + assert 'bdb.BdbQuit' in before + + # quit the parent + child.sendline('c') + + # process should exit + child.expect(pexpect.EOF) + + before = str(child.before.decode()) + assert "RemoteActorError: ('breakpoint_forever'" in before + assert 'bdb.BdbQuit' in before + + +def test_multi_subactors(spawn): + """Multiple subactors, both erroring and breakpointing as well as + a nested subactor erroring. + """ + child = spawn(r'multi_subactors') + + # scan for the pdbpp prompt + child.expect(r"\(Pdb\+\+\)") + + before = str(child.before.decode()) + assert "Attaching pdb to actor: ('bp_forever'" in before + + # do some "next" commands to demonstrate recurrent breakpoint + # entries + for _ in range(10): + child.sendline('next') + child.expect(r"\(Pdb\+\+\)") + + # continue to next error + child.sendline('c') + + # first name_error failure + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) + assert "NameError" in before + + # continue again + child.sendline('c') + + # 2nd name_error failure + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) + assert "NameError" in before + + # breakpoint loop should re-engage + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) + assert "Attaching pdb to actor: ('bp_forever'" in before + + # now run some "continues" to show re-entries + for _ in range(5): + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + + # quit the loop and expect parent to attach + child.sendline('q') + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('arbiter'" in before + assert "RemoteActorError: ('bp_forever'" in before + assert 'bdb.BdbQuit' in before + + # process should exit + child.sendline('c') + child.expect(pexpect.EOF) + + before = str(child.before.decode()) + assert "RemoteActorError: ('bp_forever'" in before + assert 'bdb.BdbQuit' in before + + +def test_multi_subactors_root_errors(spawn): + """Multiple subactors, both erroring and breakpointing as well as + a nested subactor erroring. + """ + child = spawn('multi_subactor_root_errors') + + # scan for the pdbpp prompt + child.expect(r"\(Pdb\+\+\)") + + # at most one subactor should attach before the root is cancelled + before = str(child.before.decode()) + assert "NameError: name 'doggypants' is not defined" in before + + # continue again + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + + # should now get attached in root with assert error + before = str(child.before.decode()) + # should have come just after priot prompt + assert "Cancelling nursery in ('spawn_error'," in before + assert "Attaching to pdb in crashed actor: ('arbiter'" in before + assert "AssertionError" in before + + # continue again + child.sendline('c') + child.expect(pexpect.EOF) + + before = str(child.before.decode()) + assert "AssertionError" in before + + +def test_multi_nested_subactors_error_through_nurseries(spawn): + """Verify deeply nested actors that error trigger debugger entries + at each actor nurserly (level) all the way up the tree. + """ + + # NOTE: previously, inside this script was a a bug where if the + # parent errors before a 2-levels-lower actor has released the lock, + # the parent tries to cancel it but it's stuck in the debugger? + # A test (below) has now been added to explicitly verify this is + # fixed. + + child = spawn('multi_nested_subactors_error_up_through_nurseries') + + for _ in range(12): + child.expect(r"\(Pdb\+\+\)") + child.sendline('c') + + child.expect(pexpect.EOF) + + before = str(child.before.decode()) + assert "NameError" in before + + +def test_root_nursery_cancels_before_child_releases_tty_lock(spawn): + """Test that when the root sends a cancel message before a nested + child has unblocked (which can happen when it has the tty lock and + is engaged in pdb) it is indeed cancelled after exiting the debugger. + """ + child = spawn('root_cancelled_but_child_is_in_tty_lock') + + child.expect(r"\(Pdb\+\+\)") + + before = str(child.before.decode()) + assert "NameError: name 'doggypants' is not defined" in before + assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before + child.sendline('c') + + for _ in range(4): + child.expect(r"\(Pdb\+\+\)") + + before = str(child.before.decode()) + assert "NameError: name 'doggypants' is not defined" in before + + child.sendline('c') + + child.expect(pexpect.EOF) + before = str(child.before.decode()) + assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before + assert "tractor._exceptions.RemoteActorError: ('name_error'" in before + assert "NameError: name 'doggypants' is not defined" in before diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index c6d5df8..73e6a69 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -11,15 +11,7 @@ import shutil import pytest - -def repodir(): - """Return the abspath to the repo directory. - """ - dirname = os.path.dirname - dirpath = os.path.abspath( - dirname(dirname(os.path.realpath(__file__))) - ) - return dirpath +from conftest import repodir def examples_dir(): @@ -85,15 +77,22 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): @pytest.mark.parametrize( 'example_script', - [f for f in os.listdir(examples_dir()) if '__' not in f], + [ + f for f in os.listdir(examples_dir()) + if ( + ('__' not in f) and + ('debugging' not in f) + ) + ], ) def test_example(run_example_in_subproc, example_script): """Load and run scripts from this repo's ``examples/`` dir as a user would copy and pasing them into their editor. - On windows a little more "finessing" is done to make ``multiprocessing`` play nice: - we copy the ``__main__.py`` into the test directory and invoke the script as a module - with ``python -m test_example``. + On windows a little more "finessing" is done to make + ``multiprocessing`` play nice: we copy the ``__main__.py`` into the + test directory and invoke the script as a module with ``python -m + test_example``. """ ex_file = os.path.join(examples_dir(), example_script) with open(ex_file, 'r') as ex: diff --git a/tests/test_rpc.py b/tests/test_rpc.py index b3fa1df..8ac9b09 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -63,7 +63,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): # module should raise a ModuleNotFoundError at import testdir.makefile('.py', tmp_mod=funcname) - # no need to exposed module to the subactor + # no need to expose module to the subactor subactor_exposed_mods = exposed_mods exposed_mods = [] func_defined = False @@ -95,7 +95,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): tractor.run( main, arbiter_addr=arb_addr, - rpc_module_paths=exposed_mods, + rpc_module_paths=exposed_mods.copy(), ) # handle both parameterized cases diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 17ab9ea..919b278 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -203,9 +203,8 @@ async def cancel_after(wait): @pytest.fixture(scope='module') def time_quad_ex(arb_addr, ci_env, spawn_backend): - if ci_env and spawn_backend == 'mp' and (platform.system() != 'Windows'): - """no idea, but the travis and github actions, mp *nix runs are - flaking out here often + if spawn_backend == 'mp': + """no idea but the mp *nix runs are flaking out here often... """ pytest.skip("Test is too flaky on mp in CI") diff --git a/tractor/__init__.py b/tractor/__init__.py index ea61d88..5ed843f 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -17,12 +17,16 @@ from ._discovery import get_arbiter, find_actor, wait_for_actor from ._actor import Actor, _start_actor, Arbiter from ._trionics import open_nursery from ._state import current_actor +from . import _state from ._exceptions import RemoteActorError, ModuleNotExposed +from ._debug import breakpoint, post_mortem from . import msg from . import _spawn __all__ = [ + 'breakpoint', + 'post_mortem', 'current_actor', 'find_actor', 'get_arbiter', @@ -46,16 +50,34 @@ _default_arbiter_port = 1616 async def _main( async_fn: typing.Callable[..., typing.Awaitable], args: Tuple, - kwargs: typing.Dict[str, typing.Any], arbiter_addr: Tuple[str, int], name: Optional[str] = None, + start_method: Optional[str] = None, + debug_mode: bool = False, + **kwargs, ) -> typing.Any: """Async entry point for ``tractor``. """ logger = log.get_logger('tractor') + + if start_method is not None: + _spawn.try_set_start_method(start_method) + + if debug_mode and _spawn._spawn_method == 'trio': + _state._runtime_vars['_debug_mode'] = True + # expose internal debug module to every actor allowing + # for use of ``await tractor.breakpoint()`` + kwargs.setdefault('rpc_module_paths', []).append('tractor._debug') + elif debug_mode: + raise RuntimeError("Debug mode is only supported for the `trio` backend!") + main = partial(async_fn, *args) + arbiter_addr = (host, port) = arbiter_addr or ( - _default_arbiter_host, _default_arbiter_port) + _default_arbiter_host, + _default_arbiter_port + ) + loglevel = kwargs.get('loglevel', log.get_loglevel()) if loglevel is not None: log._default_loglevel = loglevel @@ -98,20 +120,40 @@ def run( *args, name: Optional[str] = None, arbiter_addr: Tuple[str, int] = ( - _default_arbiter_host, _default_arbiter_port), + _default_arbiter_host, + _default_arbiter_port, + ), # either the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods - # OR `trio_run_in_process` (the new default). + # OR `trio` (the new default). start_method: Optional[str] = None, + debug_mode: bool = False, **kwargs, ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ - if start_method is not None: - _spawn.try_set_start_method(start_method) - return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name) + # mark top most level process as root actor + _state._runtime_vars['_is_root'] = True + + return trio.run( + partial( + # our entry + _main, + + # user entry point + async_fn, + args, + + # global kwargs + arbiter_addr=arbiter_addr, + name=name, + start_method=start_method, + debug_mode=debug_mode, + **kwargs, + ) + ) def run_daemon( diff --git a/tractor/_actor.py b/tractor/_actor.py index d59bd82..f4b9795 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -13,6 +13,7 @@ from typing import Dict, List, Tuple, Any, Optional from types import ModuleType import sys import os +from contextlib import ExitStack import trio # type: ignore from trio_typing import TaskStatus @@ -26,6 +27,7 @@ from ._exceptions import ( unpack_error, ModuleNotExposed ) +from . import _debug from ._discovery import get_arbiter from ._portal import Portal from . import _state @@ -125,8 +127,11 @@ async def _invoke( task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except (Exception, trio.MultiError) as err: + # NOTE: don't enter debug mode recursively after quitting pdb + log.exception("Actor crashed:") + await _debug._maybe_enter_pm(err) + # always ship errors back to caller - log.exception("Actor errored:") err_msg = pack_error(err) err_msg['cid'] = cid try: @@ -146,11 +151,11 @@ async def _invoke( # If we're cancelled before the task returns then the # cancel scope will not have been inserted yet log.warn( - f"Task {func} was likely cancelled before it was started") - - if not actor._rpc_tasks: - log.info("All RPC tasks have completed") - actor._ongoing_rpc_tasks.set() + f"Task {func} likely errored or cancelled before it started") + finally: + if not actor._rpc_tasks: + log.info("All RPC tasks have completed") + actor._ongoing_rpc_tasks.set() def _get_mod_abspath(module): @@ -171,13 +176,16 @@ class Actor: _root_n: Optional[trio.Nursery] = None _service_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None + _lifetime_stack: ExitStack = ExitStack() # Information about `__main__` from parent _parent_main_data: Dict[str, str] + _parent_chan_cs: Optional[trio.CancelScope] = None def __init__( self, name: str, + *, rpc_module_paths: List[str] = [], statespace: Optional[Dict[str, Any]] = None, uid: str = None, @@ -191,12 +199,18 @@ class Actor: self.name = name self.uid = (name, uid or str(uuid.uuid4())) + self._cancel_complete = trio.Event() + self._cancel_called: bool = False + # retreive and store parent `__main__` data which # will be passed to children self._parent_main_data = _mp_fixup_main._mp_figure_out_main() + # always include debugging tools module + rpc_module_paths.append('tractor._debug') + mods = {} - for name in rpc_module_paths or (): + for name in rpc_module_paths: mod = importlib.import_module(name) mods[name] = _get_mod_abspath(mod) @@ -237,6 +251,7 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ Tuple[Any, Any, Any, Any, Any]] = None + self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore async def wait_for_peer( self, uid: Tuple[str, str] @@ -415,7 +430,6 @@ class Actor: async def _process_messages( self, chan: Channel, - treat_as_gen: bool = False, shield: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -519,7 +533,10 @@ class Actor: else: # self.cancel() was called so kill this msg loop # and break out into ``_async_main()`` - log.warning(f"{self.uid} was remotely cancelled") + log.warning( + f"{self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await self._cancel_complete.wait() loop_cs.cancel() break @@ -527,7 +544,10 @@ class Actor: f"Waiting on next msg for {chan} from {chan.uid}") else: # channel disconnect - log.debug(f"{chan} from {chan.uid} disconnected") + log.debug( + f"{chan} for {chan.uid} disconnected, cancelling tasks" + ) + self.cancel_rpc_tasks(chan) except trio.ClosedResourceError: log.error(f"{chan} form {chan.uid} broke") @@ -549,7 +569,7 @@ class Actor: f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") - async def _chan_to_parent( + async def _from_parent( self, parent_addr: Optional[Tuple[str, int]], ) -> Tuple[Channel, Optional[Tuple[str, int]]]: @@ -578,6 +598,10 @@ class Actor: parent_data.pop('bind_host'), parent_data.pop('bind_port'), ) + rvs = parent_data.pop('_runtime_vars') + rvs['_is_root'] = False + _state._runtime_vars.update(rvs) + for attr, value in parent_data.items(): setattr(self, attr, value) @@ -615,13 +639,13 @@ class Actor: # establish primary connection with immediate parent self._parent_chan = None if parent_addr is not None: - self._parent_chan, accept_addr_from_rent = await self._chan_to_parent( + self._parent_chan, accept_addr_rent = await self._from_parent( parent_addr) # either it's passed in because we're not a child # or because we're running in mp mode - if accept_addr_from_rent is not None: - accept_addr = accept_addr_from_rent + if accept_addr_rent is not None: + accept_addr = accept_addr_rent # load exposed/allowed RPC modules # XXX: do this **after** establishing a channel to the parent @@ -660,6 +684,9 @@ class Actor: accept_port=port ) ) + accept_addr = self.accept_addr + if _state._runtime_vars['_is_root']: + _state._runtime_vars['_root_mailbox'] = accept_addr # Register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") @@ -670,7 +697,7 @@ class Actor: 'self', 'register_actor', uid=self.uid, - sockaddr=self.accept_addr, + sockaddr=accept_addr, ) registered_with_arbiter = True @@ -696,7 +723,7 @@ class Actor: # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) - except (trio.MultiError, Exception) as err: + except Exception as err: if not registered_with_arbiter: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger @@ -728,12 +755,17 @@ class Actor: finally: log.info("Root nursery complete") + # tear down all lifetime contexts + # api idea: ``tractor.open_context()`` + log.warn("Closing all actor lifetime contexts") + self._lifetime_stack.close() + # Unregister actor from the arbiter if registered_with_arbiter and ( self._arb_addr is not None ): failed = False - with trio.move_on_after(5) as cs: + with trio.move_on_after(0.5) as cs: cs.shield = True try: async with get_arbiter(*self._arb_addr) as arb_portal: @@ -807,19 +839,32 @@ class Actor: spawning new rpc tasks - return control the parent channel message loop """ + log.warning(f"{self.uid} is trying to cancel") + self._cancel_called = True + # cancel all ongoing rpc tasks with trio.CancelScope(shield=True): + + # kill any debugger request task to avoid deadlock + # with the root actor in this tree + dbcs = _debug._debugger_request_cs + if dbcs is not None: + log.debug("Cancelling active debugger request") + dbcs.cancel() + # kill all ongoing tasks await self.cancel_rpc_tasks() + # cancel all rpc tasks permanently + if self._service_n: + self._service_n.cancel_scope.cancel() + # stop channel server self.cancel_server() await self._server_down.wait() - # rekt all channel loops - if self._service_n: - self._service_n.cancel_scope.cancel() - + log.warning(f"{self.uid} was sucessfullly cancelled") + self._cancel_complete.set() return True # XXX: hard kill logic if needed? @@ -859,20 +904,31 @@ class Actor: scope.cancel() # wait for _invoke to mark the task complete + log.debug( + f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n" + f"peer: {chan.uid}\n") await is_complete.wait() log.debug( f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") - async def cancel_rpc_tasks(self) -> None: + async def cancel_rpc_tasks( + self, + only_chan: Optional[Channel] = None, + ) -> None: """Cancel all existing RPC responder tasks using the cancel scope registered for each. """ tasks = self._rpc_tasks log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") for (chan, cid) in tasks.copy(): + if only_chan is not None: + if only_chan != chan: + continue + # TODO: this should really done in a nursery batch await self._cancel_task(cid, chan) + log.info( f"Waiting for remaining rpc tasks to complete {tasks}") await self._ongoing_rpc_tasks.wait() @@ -1024,7 +1080,17 @@ async def _start_actor( parent_addr=None ) ) - result = await main() + try: + result = await main() + except (Exception, trio.MultiError) as err: + try: + log.exception("Actor crashed:") + await _debug._maybe_enter_pm(err) + + raise + + finally: + await actor.cancel() # XXX: the actor is cancelled when this context is complete # given that there are no more active peer channels connected diff --git a/tractor/_debug.py b/tractor/_debug.py new file mode 100644 index 0000000..29c0430 --- /dev/null +++ b/tractor/_debug.py @@ -0,0 +1,322 @@ +""" +Multi-core debugging for da peeps! +""" +import bdb +import sys +from functools import partial +from contextlib import asynccontextmanager +from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator +# import signal + +from async_generator import aclosing +import tractor +import trio +from trio.testing import wait_all_tasks_blocked + +from .log import get_logger +from . import _state +from ._discovery import get_root +from ._state import is_root_process + +try: + # wtf: only exported when installed in dev mode? + import pdbpp +except ImportError: + # pdbpp is installed in regular mode...it monkey patches stuff + import pdb + assert pdb.xpm, "pdbpp is not installed?" # type: ignore + pdbpp = pdb + +log = get_logger(__name__) + + +__all__ = ['breakpoint', 'post_mortem'] + + +# placeholder for function to set a ``trio.Event`` on debugger exit +_pdb_release_hook: Optional[Callable] = None + +# actor-wide variable pointing to current task name using debugger +_in_debug = False + +# lock in root actor preventing multi-access to local tty +_debug_lock = trio.StrictFIFOLock() + +# XXX: set by the current task waiting on the root tty lock +# and must be cancelled if this actor is cancelled via message +# otherwise deadlocks with the parent actor may ensure +_debugger_request_cs: Optional[trio.CancelScope] = None + + +class TractorConfig(pdbpp.DefaultConfig): + """Custom ``pdbpp`` goodness. + """ + # sticky_by_default = True + + +class PdbwTeardown(pdbpp.Pdb): + """Add teardown hooks to the regular ``pdbpp.Pdb``. + """ + # override the pdbpp config with our coolio one + DefaultConfig = TractorConfig + + # TODO: figure out how to dissallow recursive .set_trace() entry + # since that'll cause deadlock for us. + def set_continue(self): + global _in_debug + try: + super().set_continue() + finally: + _in_debug = False + _pdb_release_hook() + + def set_quit(self): + global _in_debug + try: + super().set_quit() + finally: + _in_debug = False + _pdb_release_hook() + + +# TODO: will be needed whenever we get to true remote debugging. +# XXX see https://github.com/goodboy/tractor/issues/130 + +# # TODO: is there some way to determine this programatically? +# _pdb_exit_patterns = tuple( +# str.encode(patt + "\n") for patt in ( +# 'c', 'cont', 'continue', 'q', 'quit') +# ) + +# def subactoruid2proc( +# actor: 'Actor', # noqa +# uid: Tuple[str, str] +# ) -> trio.Process: +# n = actor._actoruid2nursery[uid] +# _, proc, _ = n._children[uid] +# return proc + +# async def hijack_stdin(): +# log.info(f"Hijacking stdin from {actor.uid}") + +# trap std in and relay to subproc +# async_stdin = trio.wrap_file(sys.stdin) + +# async with aclosing(async_stdin): +# async for msg in async_stdin: +# log.trace(f"Stdin input:\n{msg}") +# # encode to bytes +# bmsg = str.encode(msg) + +# # relay bytes to subproc over pipe +# # await proc.stdin.send_all(bmsg) + +# if bmsg in _pdb_exit_patterns: +# log.info("Closing stdin hijack") +# break + + +@asynccontextmanager +async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]: + """Acquire a actor local FIFO lock meant to mutex entry to a local + debugger entry point to avoid tty clobbering by multiple processes. + """ + task_name = trio.lowlevel.current_task() + try: + log.error(f"TTY BEING ACQUIRED by {task_name}:{uid}") + await _debug_lock.acquire() + log.error(f"TTY lock acquired by {task_name}:{uid}") + yield + finally: + _debug_lock.release() + log.error(f"TTY lock released by {task_name}:{uid}") + + +def handler(signum, frame): + """Block SIGINT while in debug to avoid deadlocks with cancellation. + """ + print( + "tractor ignores SIGINT while in debug mode\n" + "If you have a special need for it please open an issue.\n" + ) + + +# don't allow those stdlib mofos to mess with sigint handler +pdbpp.pdb.Pdb.sigint_handler = handler + + +# @contextmanager +# def _disable_sigint(): +# try: +# # disable sigint handling while in debug +# prior_handler = signal.signal(signal.SIGINT, handler) +# yield +# finally: +# # restore SIGINT handling +# signal.signal(signal.SIGINT, prior_handler) + + +async def _hijack_stdin_relay_to_child( + subactor_uid: Tuple[str, str] +) -> AsyncIterator[str]: + # TODO: when we get to true remote debugging + # this will deliver stdin data + log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock") + async with _acquire_debug_lock(subactor_uid): + log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock") + + # with _disable_sigint(): + + # indicate to child that we've locked stdio + yield 'Locked' + + # wait for cancellation of stream by child + # indicating debugger is dis-engaged + await trio.sleep_forever() + + log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock") + + +# XXX: We only make this sync in case someone wants to +# overload the ``breakpoint()`` built-in. +def _breakpoint(debug_func) -> Awaitable[None]: + """``tractor`` breakpoint entry for engaging pdb machinery + in subactors. + """ + actor = tractor.current_actor() + do_unlock = trio.Event() + + async def wait_for_parent_stdin_hijack( + task_status=trio.TASK_STATUS_IGNORED + ): + global _debugger_request_cs + with trio.CancelScope() as cs: + _debugger_request_cs = cs + try: + async with get_root() as portal: + with trio.fail_after(.5): + agen = await portal.run( + 'tractor._debug', + '_hijack_stdin_relay_to_child', + subactor_uid=actor.uid, + ) + async with aclosing(agen): + + # block until first yield above + async for val in agen: + + assert val == 'Locked' + task_status.started() + + # with trio.CancelScope(shield=True): + await do_unlock.wait() + + # trigger cancellation of remote stream + break + finally: + log.debug(f"Exiting debugger for actor {actor}") + global _in_debug + _in_debug = False + log.debug(f"Child {actor} released parent stdio lock") + + async def _bp(): + """Async breakpoint which schedules a parent stdio lock, and once complete + enters the ``pdbpp`` debugging console. + """ + task_name = trio.lowlevel.current_task().name + + global _in_debug + + # TODO: need a more robust check for the "root" actor + if actor._parent_chan and not is_root_process(): + if _in_debug: + if _in_debug == task_name: + # this task already has the lock and is + # likely recurrently entering a breakpoint + return + + # if **this** actor is already in debug mode block here + # waiting for the control to be released - this allows + # support for recursive entries to `tractor.breakpoint()` + log.warning( + f"Actor {actor.uid} already has a debug lock, waiting...") + await do_unlock.wait() + await trio.sleep(0.1) + + # assign unlock callback for debugger teardown hooks + global _pdb_release_hook + _pdb_release_hook = do_unlock.set + + # mark local actor as "in debug mode" to avoid recurrent + # entries/requests to the root process + _in_debug = task_name + + # this **must** be awaited by the caller and is done using the + # root nursery so that the debugger can continue to run without + # being restricted by the scope of a new task nursery. + await actor._service_n.start(wait_for_parent_stdin_hijack) + + elif is_root_process(): + # we also wait in the root-parent for any child that + # may have the tty locked prior + if _debug_lock.locked(): # root process already has it; ignore + return + await _debug_lock.acquire() + _pdb_release_hook = _debug_lock.release + + # block here one (at the appropriate frame *up* where + # ``breakpoint()`` was awaited and begin handling stdio + log.debug("Entering the synchronous world of pdb") + debug_func(actor) + + + # user code **must** await this! + return _bp() + + +def _set_trace(actor): + log.critical(f"\nAttaching pdb to actor: {actor.uid}\n") + PdbwTeardown().set_trace( + # start 2 levels up in user code + frame=sys._getframe().f_back.f_back, + ) + + +breakpoint = partial( + _breakpoint, + _set_trace, +) + + +def _post_mortem(actor): + log.critical(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") + # custom Pdb post-mortem entry + pdbpp.xpm(Pdb=PdbwTeardown) + + +post_mortem = partial( + _breakpoint, + _post_mortem, +) + + +async def _maybe_enter_pm(err): + if ( + _state.debug_mode() + and not isinstance(err, bdb.BdbQuit) + + # XXX: if the error is the likely result of runtime-wide + # cancellation, we don't want to enter the debugger since + # there's races between when the parent actor has killed all + # comms and when the child tries to contact said parent to + # acquire the tty lock. + + # Really we just want to mostly avoid catching KBIs here so there + # might be a simpler check we can do? + and trio.MultiError.filter( + lambda exc: exc if not isinstance(exc, trio.Cancelled) else None, + err, + ) + ): + log.warning("Actor crashed, entering debug mode") + await post_mortem() diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 0407896..13da85d 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -11,7 +11,7 @@ from ._portal import ( open_portal, LocalPortal, ) -from ._state import current_actor +from ._state import current_actor, _runtime_vars @asynccontextmanager @@ -37,6 +37,16 @@ async def get_arbiter( yield arb_portal +@asynccontextmanager +async def get_root( +**kwargs, +) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: + host, port = _runtime_vars['_root_mailbox'] + async with _connect_chan(host, port) as chan: + async with open_portal(chan, **kwargs) as portal: + yield portal + + @asynccontextmanager async def find_actor( name: str, diff --git a/tractor/_entry.py b/tractor/_entry.py index 883cc6b..9099fc0 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -3,6 +3,7 @@ Process entry points. """ from functools import partial from typing import Tuple, Any +import signal import trio # type: ignore @@ -57,6 +58,13 @@ def _trio_main( ) -> None: """Entry point for a `trio_run_in_process` subactor. """ + # Disable sigint handling in children; + # we don't need it thanks to our cancellation machinery. + signal.signal(signal.SIGINT, signal.SIG_IGN) + + # TODO: make a global func to set this or is it too hacky? + # os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint' + if actor.loglevel is not None: log.info( f"Setting loglevel for {actor.uid} to {actor.loglevel}") diff --git a/tractor/_portal.py b/tractor/_portal.py index e749ec6..af3c1b5 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -278,7 +278,8 @@ class Portal: try: # send cancel cmd - might not get response # XXX: sure would be nice to make this work with a proper shield - # with trio.CancelScope(shield=True): + # with trio.CancelScope() as cancel_scope: + # with trio.CancelScope(shield=True) as cancel_scope: with trio.move_on_after(0.5) as cancel_scope: cancel_scope.shield = True await self.run('self', 'cancel') diff --git a/tractor/_spawn.py b/tractor/_spawn.py index f0eb012..2065967 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -23,7 +23,7 @@ from multiprocessing import forkserver # type: ignore from typing import Tuple from . import _forkserver_override -from ._state import current_actor +from ._state import current_actor, is_main_process from .log import get_logger from ._portal import Portal from ._actor import Actor, ActorFailure @@ -87,12 +87,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: return _ctx -def is_main_process() -> bool: - """Bool determining if this actor is running in the top-most process. - """ - return mp.current_process().name == 'MainProcess' - - async def exhaust_portal( portal: Portal, actor: Actor @@ -118,6 +112,11 @@ async def exhaust_portal( except (Exception, trio.MultiError) as err: # we reraise in the parent task via a ``trio.MultiError`` return err + except trio.Cancelled as err: + # lol, of course we need this too ;P + # TODO: merge with above? + log.warning(f"Cancelled result waiter for {portal.actor.uid}") + return err else: log.debug(f"Returning final result: {final}") return final @@ -194,8 +193,17 @@ async def spawn_subactor( # the outer scope since no actor zombies are # ever allowed. This ``__aexit__()`` also shields # internally. - async with proc: - log.debug(f"Terminating {proc}") + log.debug(f"Attempting to kill {proc}") + + # NOTE: this timeout effectively does nothing right now since + # we are shielding the ``.wait()`` inside ``new_proc()`` which + # will pretty much never release until the process exits. + with trio.move_on_after(3) as cs: + async with proc: + log.debug(f"Terminating {proc}") + if cs.cancelled_caught: + log.critical(f"HARD KILLING {proc}") + proc.kill() async def new_proc( @@ -206,6 +214,8 @@ async def new_proc( # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], + _runtime_vars: Dict[str, Any], # serialized and sent to _child + *, use_trio_run_in_process: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: @@ -241,9 +251,14 @@ async def new_proc( "statespace": subactor.statespace, "_arb_addr": subactor._arb_addr, "bind_host": bind_addr[0], - "bind_port": bind_addr[1] + "bind_port": bind_addr[1], + "_runtime_vars": _runtime_vars, }) + # track subactor in current nursery + curr_actor = current_actor() + curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + # resume caller at next checkpoint now that child is up task_status.started(portal) @@ -353,6 +368,8 @@ async def new_proc( await proc_waiter(proc) proc.join() + # This is again common logic for all backends: + log.debug(f"Joined {proc}") # pop child entry to indicate we are no longer managing this subactor subactor, proc, portal = actor_nursery._children.pop(subactor.uid) diff --git a/tractor/_state.py b/tractor/_state.py index d624fc9..03ddf13 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,12 +1,18 @@ """ Per process state """ -from typing import Optional +from typing import Optional, Dict, Any from collections import Mapping +import multiprocessing as mp import trio _current_actor: Optional['Actor'] = None # type: ignore +_runtime_vars: Dict[str, Any] = { + '_debug_mode': False, + '_is_root': False, + '_root_mailbox': (None, None) +} def current_actor() -> 'Actor': # type: ignore @@ -36,3 +42,20 @@ class ActorContextInfo(Mapping): except RuntimeError: # no local actor/task context initialized yet return f'no {key} context' + + +def is_main_process() -> bool: + """Bool determining if this actor is running in the top-most process. + """ + return mp.current_process().name == 'MainProcess' + + +def debug_mode() -> bool: + """Bool determining if "debug mode" is on which enables + remote subactor pdb entry on crashes. + """ + return bool(_runtime_vars['_debug_mode']) + + +def is_root_process() -> bool: + return _runtime_vars['_is_root'] diff --git a/tractor/_trionics.py b/tractor/_trionics.py index e846144..aecf16e 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -13,10 +13,11 @@ from ._state import current_actor from .log import get_logger, get_loglevel from ._actor import Actor from ._portal import Portal +from . import _state from . import _spawn -log = get_logger('tractor') +log = get_logger(__name__) _default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) @@ -58,6 +59,10 @@ class ActorNursery: ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() + # configure and pass runtime state + _rtv = _state._runtime_vars.copy() + _rtv['_is_root'] = False + subactor = Actor( name, # modules allowed to invoked funcs from @@ -83,6 +88,7 @@ class ActorNursery: self.errors, bind_addr, parent_addr, + _rtv, # run time vars ) ) @@ -131,19 +137,14 @@ class ActorNursery: If ``hard_killl`` is set to ``True`` then kill the processes directly without any far end graceful ``trio`` cancellation. """ - def do_hard_kill(proc): - log.warning(f"Hard killing subactors {self._children}") - proc.terminate() - # XXX: below doesn't seem to work? - # send KeyBoardInterrupt (trio abort signal) to sub-actors - # os.kill(proc.pid, signal.SIGINT) + self.cancelled = True - log.debug("Cancelling nursery") + log.warning(f"Cancelling nursery in {self._actor.uid}") with trio.move_on_after(3) as cs: async with trio.open_nursery() as nursery: for subactor, proc, portal in self._children.values(): if hard_kill: - do_hard_kill(proc) + proc.terminate() else: if portal is None: # actor hasn't fully spawned yet event = self._actor._peer_connected[subactor.uid] @@ -163,7 +164,7 @@ class ActorNursery: if chan: portal = Portal(chan) else: # there's no other choice left - do_hard_kill(proc) + proc.terminate() # spawn cancel tasks for each sub-actor assert portal @@ -172,13 +173,13 @@ class ActorNursery: # if we cancelled the cancel (we hung cancelling remote actors) # then hard kill all sub-processes if cs.cancelled_caught: - log.error(f"Failed to gracefully cancel {self}, hard killing!") - async with trio.open_nursery(): - for subactor, proc, portal in self._children.values(): - nursery.start_soon(do_hard_kill, proc) + log.error( + f"Failed to cancel {self}\nHard killing process tree!") + for subactor, proc, portal in self._children.values(): + log.warning(f"Hard killing process {proc}") + proc.terminate() # mark ourselves as having (tried to have) cancelled all subactors - self.cancelled = True self._join_procs.set() @@ -229,7 +230,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # after we yield upwards yield anursery log.debug( - f"Waiting on subactors {anursery._children}" + f"Waiting on subactors {anursery._children} " "to complete" ) except BaseException as err: @@ -273,6 +274,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # ria_nursery scope end + # XXX: do we need a `trio.Cancelled` catch here as well? except (Exception, trio.MultiError) as err: # If actor-local error was raised while waiting on # ".run_in_actor()" actors then we also want to cancel all @@ -294,6 +296,8 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: if anursery._children: with trio.CancelScope(shield=True): await anursery.cancel() + + # use `MultiError` as needed if len(errors) > 1: raise trio.MultiError(tuple(errors.values())) else: