Merge pull request #129 from goodboy/multiproc_debug

Wen? Multiprocessing-native debugger now!
clean_log_header
goodboy 2020-10-14 09:14:03 -04:00 committed by GitHub
commit 7115d6c3bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1258 additions and 150 deletions

View File

@ -38,4 +38,4 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs

View File

@ -1,68 +0,0 @@
language: python
dist: xenial
sudo: required
matrix:
include:
- name: "Windows, Python Latest: multiprocessing"
os: windows
language: sh
python: 3.x # only works on linux
env: SPAWN_BACKEND="mp"
before_install:
- choco install python3 --params "/InstallDir:C:\\Python"
- export PATH="/c/Python:/c/Python/Scripts:$PATH"
- python -m pip install --upgrade pip wheel
- name: "Windows, Python Latest: trio"
os: windows
language: sh
python: 3.x # only works on linux
env: SPAWN_BACKEND="trio"
before_install:
- choco install python3 --params "/InstallDir:C:\\Python"
- export PATH="/c/Python:/c/Python/Scripts:$PATH"
- python -m pip install --upgrade pip wheel
- name: "Windows, Python 3.7: multiprocessing"
os: windows
python: 3.7 # only works on linux
env: SPAWN_BACKEND="mp"
language: sh
before_install:
- choco install python3 --version 3.7.4 --params "/InstallDir:C:\\Python"
- export PATH="/c/Python:/c/Python/Scripts:$PATH"
- python -m pip install --upgrade pip wheel
- name: "Windows, Python 3.7: trio"
os: windows
python: 3.7 # only works on linux
env: SPAWN_BACKEND="trio"
language: sh
before_install:
- choco install python3 --version 3.7.4 --params "/InstallDir:C:\\Python"
- export PATH="/c/Python:/c/Python/Scripts:$PATH"
- python -m pip install --upgrade pip wheel
- name: "Python 3.7: multiprocessing"
python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp"
- name: "Python 3.7: trio"
python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio"
- name: "Python 3.8: multiprocessing"
python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp"
- name: "Python 3.8: trio"
python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio"
install:
- cd $TRAVIS_BUILD_DIR
- pip install -U pip
- pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
script:
- mypy tractor/ --ignore-missing-imports
- pytest tests/ --spawn-backend=${SPAWN_BACKEND}

View File

@ -2,7 +2,8 @@ tractor
======= =======
A `structured concurrent`_, async-native "`actor model`_" built on trio_ and multiprocessing_. A `structured concurrent`_, async-native "`actor model`_" built on trio_ and multiprocessing_.
|travis| |docs| |gh_actions|
|docs|
.. _actor model: https://en.wikipedia.org/wiki/Actor_model .. _actor model: https://en.wikipedia.org/wiki/Actor_model
.. _trio: https://github.com/python-trio/trio .. _trio: https://github.com/python-trio/trio
@ -57,8 +58,8 @@ channel`_!
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org .. _matrix channel: https://matrix.to/#/!tractor:matrix.org
.. |travis| image:: https://img.shields.io/travis/goodboy/tractor/master.svg .. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square
:target: https://travis-ci.org/goodboy/tractor :target: https://actions-badge.atrox.dev/goodboy/tractor/goto
.. |docs| image:: https://readthedocs.org/projects/tractor/badge/?version=latest .. |docs| image:: https://readthedocs.org/projects/tractor/badge/?version=latest
:target: https://tractor.readthedocs.io/en/latest/?badge=latest :target: https://tractor.readthedocs.io/en/latest/?badge=latest
:alt: Documentation Status :alt: Documentation Status

View File

@ -0,0 +1,58 @@
import tractor
async def name_error():
"Raise a ``NameError``"
getattr(doggypants)
async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
while True:
await tractor.breakpoint()
async def spawn_until(depth=0):
""""A nested nursery that triggers another ``NameError``.
"""
async with tractor.open_nursery() as n:
if depth < 1:
# await n.run_in_actor('breakpoint_forever', breakpoint_forever)
await n.run_in_actor('name_error', name_error)
else:
depth -= 1
await n.run_in_actor(f'spawn_until_{depth}', spawn_until, depth=depth)
async def main():
"""The main ``tractor`` routine.
The process tree should look as approximately as follows when the debugger
first engages:
python examples/debugging/multi_nested_subactors_bp_forever.py
python -m tractor._child --uid ('spawner1', '7eab8462 ...)
python -m tractor._child --uid ('spawn_until_3', 'afcba7a8 ...)
python -m tractor._child --uid ('spawn_until_2', 'd2433d13 ...)
python -m tractor._child --uid ('spawn_until_1', '1df589de ...)
python -m tractor._child --uid ('spawn_until_0', '3720602b ...)
python -m tractor._child --uid ('spawner0', '1d42012b ...)
python -m tractor._child --uid ('spawn_until_2', '2877e155 ...)
python -m tractor._child --uid ('spawn_until_1', '0502d786 ...)
python -m tractor._child --uid ('spawn_until_0', 'de918e6d ...)
"""
async with tractor.open_nursery() as n:
# spawn both actors
portal = await n.run_in_actor('spawner0', spawn_until, depth=3)
portal1 = await n.run_in_actor('spawner1', spawn_until, depth=4)
# gah still an issue here.
# await portal.result()
# await portal1.result()
if __name__ == '__main__':
tractor.run(main, debug_mode=True)

View File

@ -0,0 +1,43 @@
import tractor
async def name_error():
"Raise a ``NameError``"
getattr(doggypants)
async def spawn_error():
""""A nested nursery that triggers another ``NameError``.
"""
async with tractor.open_nursery() as n:
portal = await n.run_in_actor('name_error_1', name_error)
return await portal.result()
async def main():
"""The main ``tractor`` routine.
The process tree should look as approximately as follows:
python examples/debugging/multi_subactors.py
python -m tractor._child --uid ('name_error', 'a7caf490 ...)
`-python -m tractor._child --uid ('spawn_error', '52ee14a5 ...)
`-python -m tractor._child --uid ('name_error', '3391222c ...)
"""
async with tractor.open_nursery() as n:
# spawn both actors
portal = await n.run_in_actor('name_error', name_error)
portal1 = await n.run_in_actor('spawn_error', spawn_error)
# trigger a root actor error
assert 0
# attempt to collect results (which raises error in parent)
# still has some issues where the parent seems to get stuck
await portal.result()
await portal1.result()
if __name__ == '__main__':
tractor.run(main, debug_mode=True)

View File

@ -0,0 +1,47 @@
import tractor
import trio
async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor."
while True:
await trio.sleep(0.1)
await tractor.breakpoint()
async def name_error():
"Raise a ``NameError``"
getattr(doggypants)
async def spawn_error():
""""A nested nursery that triggers another ``NameError``.
"""
async with tractor.open_nursery() as n:
portal = await n.run_in_actor('name_error_1', name_error)
return await portal.result()
async def main():
"""The main ``tractor`` routine.
The process tree should look as approximately as follows:
-python examples/debugging/multi_subactors.py
|-python -m tractor._child --uid ('name_error', 'a7caf490 ...)
|-python -m tractor._child --uid ('bp_forever', '1f787a7e ...)
`-python -m tractor._child --uid ('spawn_error', '52ee14a5 ...)
`-python -m tractor._child --uid ('name_error', '3391222c ...)
"""
async with tractor.open_nursery() as n:
# Spawn both actors, don't bother with collecting results
# (would result in a different debugger outcome due to parent's
# cancellation).
await n.run_in_actor('bp_forever', breakpoint_forever)
await n.run_in_actor('name_error', name_error)
await n.run_in_actor('spawn_error', spawn_error)
if __name__ == '__main__':
tractor.run(main, debug_mode=True)

View File

@ -0,0 +1,15 @@
import trio
import tractor
async def main():
await trio.sleep(0.1)
await tractor.breakpoint()
await trio.sleep(0.1)
if __name__ == '__main__':
tractor.run(main, debug_mode=True)

View File

@ -0,0 +1,11 @@
import tractor
async def main():
while True:
await tractor.breakpoint()
if __name__ == '__main__':
tractor.run(main, debug_mode=True)

View File

@ -0,0 +1,9 @@
import tractor
async def main():
assert 0
if __name__ == '__main__':
tractor.run(main, debug_mode=True)

View File

@ -0,0 +1,48 @@
import tractor
async def name_error():
"Raise a ``NameError``"
getattr(doggypants)
async def spawn_until(depth=0):
""""A nested nursery that triggers another ``NameError``.
"""
async with tractor.open_nursery() as n:
if depth < 1:
# await n.run_in_actor('breakpoint_forever', breakpoint_forever)
await n.run_in_actor('name_error', name_error)
else:
depth -= 1
await n.run_in_actor(f'spawn_until_{depth}', spawn_until, depth=depth)
async def main():
"""The main ``tractor`` routine.
The process tree should look as approximately as follows when the debugger
first engages:
python examples/debugging/multi_nested_subactors_bp_forever.py
python -m tractor._child --uid ('spawner1', '7eab8462 ...)
python -m tractor._child --uid ('spawn_until_0', '3720602b ...)
python -m tractor._child --uid ('name_error', '505bf71d ...)
python -m tractor._child --uid ('spawner0', '1d42012b ...)
python -m tractor._child --uid ('name_error', '6c2733b8 ...)
"""
async with tractor.open_nursery() as n:
# spawn both actors
portal = await n.run_in_actor('spawner0', spawn_until, depth=0)
portal1 = await n.run_in_actor('spawner1', spawn_until, depth=1)
# nursery cancellation should be triggered due to propagated error
await portal.result()
await portal1.result()
if __name__ == '__main__':
tractor.run(main, debug_mode=True, loglevel='warning')

View File

@ -0,0 +1,25 @@
import trio
import tractor
async def breakpoint_forever():
"""Indefinitely re-enter debugger in child actor.
"""
while True:
await trio.sleep(0.1)
await tractor.breakpoint()
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
'breakpoint_forever',
breakpoint_forever,
)
await portal.result()
if __name__ == '__main__':
tractor.run(main, debug_mode=True)

View File

@ -0,0 +1,16 @@
import tractor
async def name_error():
getattr(doggypants)
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor('name_error', name_error)
await portal.result()
if __name__ == '__main__':
tractor.run(main, debug_mode=True)

View File

@ -3,3 +3,4 @@ pytest-trio
pdbpp pdbpp
mypy mypy
trio_typing trio_typing
pexpect

View File

@ -39,7 +39,7 @@ setup(
], ],
install_requires=[ install_requires=[
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt',
'trio_typing' 'trio_typing', 'pdbpp',
], ],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.7", python_requires=">=3.7",

View File

@ -39,6 +39,16 @@ no_windows = pytest.mark.skipif(
) )
def repodir():
"""Return the abspath to the repo directory.
"""
dirname = os.path.dirname
dirpath = os.path.abspath(
dirname(dirname(os.path.realpath(__file__)))
)
return dirpath
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption( parser.addoption(
"--ll", action="store", dest='loglevel', "--ll", action="store", dest='loglevel',

View File

@ -413,3 +413,38 @@ def test_cancel_via_SIGINT_other_task(
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run(main) tractor.run(main)
async def spin_for(period=3):
"Sync sleep."
time.sleep(period)
async def spawn():
async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor('sleeper', spin_for)
@no_windows
def test_cancel_while_childs_child_in_sync_sleep(
loglevel,
start_method,
spawn_backend,
):
"""Verify that a child cancelled while executing sync code is torn
down even when that cancellation is triggered by the parent
2 nurseries "up".
"""
if start_method == 'forkserver':
pytest.skip("Forksever sux hard at resuming from sync sleep...")
async def main():
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor('spawn', spawn)
await trio.sleep(1)
assert 0
with pytest.raises(AssertionError):
tractor.run(main)

View File

@ -0,0 +1,366 @@
"""
That native debug better work!
All these tests can be understood (somewhat) by running the equivalent
`examples/debugging/` scripts manually.
TODO: None of these tests have been run successfully on windows yet.
"""
from os import path
import pytest
import pexpect
from conftest import repodir
# TODO: The next great debugger audit could be done by you!
# - recurrent entry to breakpoint() from single actor *after* and an
# error in another task?
# - root error before child errors
# - root error after child errors
# - root error before child breakpoint
# - root error after child breakpoint
# - recurrent root errors
def examples_dir():
"""Return the abspath to the examples directory.
"""
return path.join(repodir(), 'examples', 'debugging/')
def mk_cmd(ex_name: str) -> str:
"""Generate a command suitable to pass to ``pexpect.spawn()``.
"""
return ' '.join(
['python',
path.join(examples_dir(), f'{ex_name}.py')]
)
@pytest.fixture
def spawn(
testdir,
arb_addr,
) -> 'pexpect.spawn':
def _spawn(cmd):
return testdir.spawn(
cmd=mk_cmd(cmd),
expect_timeout=3,
)
return _spawn
@pytest.mark.parametrize(
'user_in_out',
[
('c', 'AssertionError'),
('q', 'AssertionError'),
],
ids=lambda item: f'{item[0]} -> {item[1]}',
)
def test_root_actor_error(spawn, user_in_out):
"""Demonstrate crash handler entering pdbpp from basic error in root actor.
"""
user_input, expect_err_str = user_in_out
child = spawn('root_actor_error')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
# make sure expected logging and error arrives
assert "Attaching to pdb in crashed actor: ('arbiter'" in before
assert 'AssertionError' in before
# send user command
child.sendline(user_input)
# process should exit
child.expect(pexpect.EOF)
assert expect_err_str in str(child.before)
@pytest.mark.parametrize(
'user_in_out',
[
('c', None),
('q', 'bdb.BdbQuit'),
],
ids=lambda item: f'{item[0]} -> {item[1]}',
)
def test_root_actor_bp(spawn, user_in_out):
"""Demonstrate breakpoint from in root actor.
"""
user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
assert 'Error' not in str(child.before)
# send user command
child.sendline(user_input)
child.expect('\r\n')
# process should exit
child.expect(pexpect.EOF)
if expect_err_str is None:
assert 'Error' not in str(child.before)
else:
assert expect_err_str in str(child.before)
def test_root_actor_bp_forever(spawn):
"Re-enter a breakpoint from the root actor-task."
child = spawn('root_actor_breakpoint_forever')
# do some "next" commands to demonstrate recurrent breakpoint
# entries
for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)")
# do one continue which should trigger a new task to lock the tty
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)")
# XXX: this previously caused a bug!
child.sendline('n')
child.expect(r"\(Pdb\+\+\)")
child.sendline('n')
child.expect(r"\(Pdb\+\+\)")
def test_subactor_error(spawn):
"Single subactor raising an error"
child = spawn('subactor_error')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
# send user command
# (in this case it's the same for 'continue' vs. 'quit')
child.sendline('continue')
# the debugger should enter a second time in the nursery
# creating actor
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
# root actor gets debugger engaged
assert "Attaching to pdb in crashed actor: ('arbiter'" in before
# error is a remote error propagated from the subactor
assert "RemoteActorError: ('name_error'" in before
child.sendline('c')
child.expect('\r\n')
# process should exit
child.expect(pexpect.EOF)
def test_subactor_breakpoint(spawn):
"Single subactor with an infinite breakpoint loop"
child = spawn('subactor_breakpoint')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
# do some "next" commands to demonstrate recurrent breakpoint
# entries
for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)")
# now run some "continues" to show re-entries
for _ in range(5):
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
# finally quit the loop
child.sendline('q')
# child process should exit but parent will capture pdb.BdbQuit
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "RemoteActorError: ('breakpoint_forever'" in before
assert 'bdb.BdbQuit' in before
# quit the parent
child.sendline('c')
# process should exit
child.expect(pexpect.EOF)
before = str(child.before.decode())
assert "RemoteActorError: ('breakpoint_forever'" in before
assert 'bdb.BdbQuit' in before
def test_multi_subactors(spawn):
"""Multiple subactors, both erroring and breakpointing as well as
a nested subactor erroring.
"""
child = spawn(r'multi_subactors')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching pdb to actor: ('bp_forever'" in before
# do some "next" commands to demonstrate recurrent breakpoint
# entries
for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)")
# continue to next error
child.sendline('c')
# first name_error failure
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "NameError" in before
# continue again
child.sendline('c')
# 2nd name_error failure
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "NameError" in before
# breakpoint loop should re-engage
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching pdb to actor: ('bp_forever'" in before
# now run some "continues" to show re-entries
for _ in range(5):
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
# quit the loop and expect parent to attach
child.sendline('q')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('arbiter'" in before
assert "RemoteActorError: ('bp_forever'" in before
assert 'bdb.BdbQuit' in before
# process should exit
child.sendline('c')
child.expect(pexpect.EOF)
before = str(child.before.decode())
assert "RemoteActorError: ('bp_forever'" in before
assert 'bdb.BdbQuit' in before
def test_multi_subactors_root_errors(spawn):
"""Multiple subactors, both erroring and breakpointing as well as
a nested subactor erroring.
"""
child = spawn('multi_subactor_root_errors')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# at most one subactor should attach before the root is cancelled
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
# continue again
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
# should now get attached in root with assert error
before = str(child.before.decode())
# should have come just after priot prompt
assert "Cancelling nursery in ('spawn_error'," in before
assert "Attaching to pdb in crashed actor: ('arbiter'" in before
assert "AssertionError" in before
# continue again
child.sendline('c')
child.expect(pexpect.EOF)
before = str(child.before.decode())
assert "AssertionError" in before
def test_multi_nested_subactors_error_through_nurseries(spawn):
"""Verify deeply nested actors that error trigger debugger entries
at each actor nurserly (level) all the way up the tree.
"""
# NOTE: previously, inside this script was a a bug where if the
# parent errors before a 2-levels-lower actor has released the lock,
# the parent tries to cancel it but it's stuck in the debugger?
# A test (below) has now been added to explicitly verify this is
# fixed.
child = spawn('multi_nested_subactors_error_up_through_nurseries')
for _ in range(12):
child.expect(r"\(Pdb\+\+\)")
child.sendline('c')
child.expect(pexpect.EOF)
before = str(child.before.decode())
assert "NameError" in before
def test_root_nursery_cancels_before_child_releases_tty_lock(spawn):
"""Test that when the root sends a cancel message before a nested
child has unblocked (which can happen when it has the tty lock and
is engaged in pdb) it is indeed cancelled after exiting the debugger.
"""
child = spawn('root_cancelled_but_child_is_in_tty_lock')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before
child.sendline('c')
for _ in range(4):
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
child.sendline('c')
child.expect(pexpect.EOF)
before = str(child.before.decode())
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
assert "NameError: name 'doggypants' is not defined" in before

View File

@ -11,15 +11,7 @@ import shutil
import pytest import pytest
from conftest import repodir
def repodir():
"""Return the abspath to the repo directory.
"""
dirname = os.path.dirname
dirpath = os.path.abspath(
dirname(dirname(os.path.realpath(__file__)))
)
return dirpath
def examples_dir(): def examples_dir():
@ -85,15 +77,22 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
@pytest.mark.parametrize( @pytest.mark.parametrize(
'example_script', 'example_script',
[f for f in os.listdir(examples_dir()) if '__' not in f], [
f for f in os.listdir(examples_dir())
if (
('__' not in f) and
('debugging' not in f)
)
],
) )
def test_example(run_example_in_subproc, example_script): def test_example(run_example_in_subproc, example_script):
"""Load and run scripts from this repo's ``examples/`` dir as a user """Load and run scripts from this repo's ``examples/`` dir as a user
would copy and pasing them into their editor. would copy and pasing them into their editor.
On windows a little more "finessing" is done to make ``multiprocessing`` play nice: On windows a little more "finessing" is done to make
we copy the ``__main__.py`` into the test directory and invoke the script as a module ``multiprocessing`` play nice: we copy the ``__main__.py`` into the
with ``python -m test_example``. test directory and invoke the script as a module with ``python -m
test_example``.
""" """
ex_file = os.path.join(examples_dir(), example_script) ex_file = os.path.join(examples_dir(), example_script)
with open(ex_file, 'r') as ex: with open(ex_file, 'r') as ex:

View File

@ -63,7 +63,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
# module should raise a ModuleNotFoundError at import # module should raise a ModuleNotFoundError at import
testdir.makefile('.py', tmp_mod=funcname) testdir.makefile('.py', tmp_mod=funcname)
# no need to exposed module to the subactor # no need to expose module to the subactor
subactor_exposed_mods = exposed_mods subactor_exposed_mods = exposed_mods
exposed_mods = [] exposed_mods = []
func_defined = False func_defined = False
@ -95,7 +95,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
tractor.run( tractor.run(
main, main,
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
rpc_module_paths=exposed_mods, rpc_module_paths=exposed_mods.copy(),
) )
# handle both parameterized cases # handle both parameterized cases

View File

@ -203,9 +203,8 @@ async def cancel_after(wait):
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def time_quad_ex(arb_addr, ci_env, spawn_backend): def time_quad_ex(arb_addr, ci_env, spawn_backend):
if ci_env and spawn_backend == 'mp' and (platform.system() != 'Windows'): if spawn_backend == 'mp':
"""no idea, but the travis and github actions, mp *nix runs are """no idea but the mp *nix runs are flaking out here often...
flaking out here often
""" """
pytest.skip("Test is too flaky on mp in CI") pytest.skip("Test is too flaky on mp in CI")

View File

@ -17,12 +17,16 @@ from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._actor import Actor, _start_actor, Arbiter from ._actor import Actor, _start_actor, Arbiter
from ._trionics import open_nursery from ._trionics import open_nursery
from ._state import current_actor from ._state import current_actor
from . import _state
from ._exceptions import RemoteActorError, ModuleNotExposed from ._exceptions import RemoteActorError, ModuleNotExposed
from ._debug import breakpoint, post_mortem
from . import msg from . import msg
from . import _spawn from . import _spawn
__all__ = [ __all__ = [
'breakpoint',
'post_mortem',
'current_actor', 'current_actor',
'find_actor', 'find_actor',
'get_arbiter', 'get_arbiter',
@ -46,16 +50,34 @@ _default_arbiter_port = 1616
async def _main( async def _main(
async_fn: typing.Callable[..., typing.Awaitable], async_fn: typing.Callable[..., typing.Awaitable],
args: Tuple, args: Tuple,
kwargs: typing.Dict[str, typing.Any],
arbiter_addr: Tuple[str, int], arbiter_addr: Tuple[str, int],
name: Optional[str] = None, name: Optional[str] = None,
start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs,
) -> typing.Any: ) -> typing.Any:
"""Async entry point for ``tractor``. """Async entry point for ``tractor``.
""" """
logger = log.get_logger('tractor') logger = log.get_logger('tractor')
if start_method is not None:
_spawn.try_set_start_method(start_method)
if debug_mode and _spawn._spawn_method == 'trio':
_state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
kwargs.setdefault('rpc_module_paths', []).append('tractor._debug')
elif debug_mode:
raise RuntimeError("Debug mode is only supported for the `trio` backend!")
main = partial(async_fn, *args) main = partial(async_fn, *args)
arbiter_addr = (host, port) = arbiter_addr or ( arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host, _default_arbiter_port) _default_arbiter_host,
_default_arbiter_port
)
loglevel = kwargs.get('loglevel', log.get_loglevel()) loglevel = kwargs.get('loglevel', log.get_loglevel())
if loglevel is not None: if loglevel is not None:
log._default_loglevel = loglevel log._default_loglevel = loglevel
@ -98,20 +120,40 @@ def run(
*args, *args,
name: Optional[str] = None, name: Optional[str] = None,
arbiter_addr: Tuple[str, int] = ( arbiter_addr: Tuple[str, int] = (
_default_arbiter_host, _default_arbiter_port), _default_arbiter_host,
_default_arbiter_port,
),
# either the `multiprocessing` start method: # either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio_run_in_process` (the new default). # OR `trio` (the new default).
start_method: Optional[str] = None, start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs, **kwargs,
) -> Any: ) -> Any:
"""Run a trio-actor async function in process. """Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor. This is tractor's main entry and the start point for any async actor.
""" """
if start_method is not None: # mark top most level process as root actor
_spawn.try_set_start_method(start_method) _state._runtime_vars['_is_root'] = True
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)
return trio.run(
partial(
# our entry
_main,
# user entry point
async_fn,
args,
# global kwargs
arbiter_addr=arbiter_addr,
name=name,
start_method=start_method,
debug_mode=debug_mode,
**kwargs,
)
)
def run_daemon( def run_daemon(

View File

@ -13,6 +13,7 @@ from typing import Dict, List, Tuple, Any, Optional
from types import ModuleType from types import ModuleType
import sys import sys
import os import os
from contextlib import ExitStack
import trio # type: ignore import trio # type: ignore
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -26,6 +27,7 @@ from ._exceptions import (
unpack_error, unpack_error,
ModuleNotExposed ModuleNotExposed
) )
from . import _debug
from ._discovery import get_arbiter from ._discovery import get_arbiter
from ._portal import Portal from ._portal import Portal
from . import _state from . import _state
@ -125,8 +127,11 @@ async def _invoke(
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# NOTE: don't enter debug mode recursively after quitting pdb
log.exception("Actor crashed:")
await _debug._maybe_enter_pm(err)
# always ship errors back to caller # always ship errors back to caller
log.exception("Actor errored:")
err_msg = pack_error(err) err_msg = pack_error(err)
err_msg['cid'] = cid err_msg['cid'] = cid
try: try:
@ -146,8 +151,8 @@ async def _invoke(
# If we're cancelled before the task returns then the # If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
log.warn( log.warn(
f"Task {func} was likely cancelled before it was started") f"Task {func} likely errored or cancelled before it started")
finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
log.info("All RPC tasks have completed") log.info("All RPC tasks have completed")
actor._ongoing_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
@ -171,13 +176,16 @@ class Actor:
_root_n: Optional[trio.Nursery] = None _root_n: Optional[trio.Nursery] = None
_service_n: Optional[trio.Nursery] = None _service_n: Optional[trio.Nursery] = None
_server_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None
_lifetime_stack: ExitStack = ExitStack()
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: Dict[str, str] _parent_main_data: Dict[str, str]
_parent_chan_cs: Optional[trio.CancelScope] = None
def __init__( def __init__(
self, self,
name: str, name: str,
*,
rpc_module_paths: List[str] = [], rpc_module_paths: List[str] = [],
statespace: Optional[Dict[str, Any]] = None, statespace: Optional[Dict[str, Any]] = None,
uid: str = None, uid: str = None,
@ -191,12 +199,18 @@ class Actor:
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid4())) self.uid = (name, uid or str(uuid.uuid4()))
self._cancel_complete = trio.Event()
self._cancel_called: bool = False
# retreive and store parent `__main__` data which # retreive and store parent `__main__` data which
# will be passed to children # will be passed to children
self._parent_main_data = _mp_fixup_main._mp_figure_out_main() self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
# always include debugging tools module
rpc_module_paths.append('tractor._debug')
mods = {} mods = {}
for name in rpc_module_paths or (): for name in rpc_module_paths:
mod = importlib.import_module(name) mod = importlib.import_module(name)
mods[name] = _get_mod_abspath(mod) mods[name] = _get_mod_abspath(mod)
@ -237,6 +251,7 @@ class Actor:
self._parent_chan: Optional[Channel] = None self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[ self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore
async def wait_for_peer( async def wait_for_peer(
self, uid: Tuple[str, str] self, uid: Tuple[str, str]
@ -415,7 +430,6 @@ class Actor:
async def _process_messages( async def _process_messages(
self, self,
chan: Channel, chan: Channel,
treat_as_gen: bool = False,
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -519,7 +533,10 @@ class Actor:
else: else:
# self.cancel() was called so kill this msg loop # self.cancel() was called so kill this msg loop
# and break out into ``_async_main()`` # and break out into ``_async_main()``
log.warning(f"{self.uid} was remotely cancelled") log.warning(
f"{self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await self._cancel_complete.wait()
loop_cs.cancel() loop_cs.cancel()
break break
@ -527,7 +544,10 @@ class Actor:
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
else: else:
# channel disconnect # channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected") log.debug(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
self.cancel_rpc_tasks(chan)
except trio.ClosedResourceError: except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke") log.error(f"{chan} form {chan.uid} broke")
@ -549,7 +569,7 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
async def _chan_to_parent( async def _from_parent(
self, self,
parent_addr: Optional[Tuple[str, int]], parent_addr: Optional[Tuple[str, int]],
) -> Tuple[Channel, Optional[Tuple[str, int]]]: ) -> Tuple[Channel, Optional[Tuple[str, int]]]:
@ -578,6 +598,10 @@ class Actor:
parent_data.pop('bind_host'), parent_data.pop('bind_host'),
parent_data.pop('bind_port'), parent_data.pop('bind_port'),
) )
rvs = parent_data.pop('_runtime_vars')
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
for attr, value in parent_data.items(): for attr, value in parent_data.items():
setattr(self, attr, value) setattr(self, attr, value)
@ -615,13 +639,13 @@ class Actor:
# establish primary connection with immediate parent # establish primary connection with immediate parent
self._parent_chan = None self._parent_chan = None
if parent_addr is not None: if parent_addr is not None:
self._parent_chan, accept_addr_from_rent = await self._chan_to_parent( self._parent_chan, accept_addr_rent = await self._from_parent(
parent_addr) parent_addr)
# either it's passed in because we're not a child # either it's passed in because we're not a child
# or because we're running in mp mode # or because we're running in mp mode
if accept_addr_from_rent is not None: if accept_addr_rent is not None:
accept_addr = accept_addr_from_rent accept_addr = accept_addr_rent
# load exposed/allowed RPC modules # load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent # XXX: do this **after** establishing a channel to the parent
@ -660,6 +684,9 @@ class Actor:
accept_port=port accept_port=port
) )
) )
accept_addr = self.accept_addr
if _state._runtime_vars['_is_root']:
_state._runtime_vars['_root_mailbox'] = accept_addr
# Register with the arbiter if we're told its addr # Register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`") log.debug(f"Registering {self} for role `{self.name}`")
@ -670,7 +697,7 @@ class Actor:
'self', 'self',
'register_actor', 'register_actor',
uid=self.uid, uid=self.uid,
sockaddr=self.accept_addr, sockaddr=accept_addr,
) )
registered_with_arbiter = True registered_with_arbiter = True
@ -696,7 +723,7 @@ class Actor:
# Blocks here as expected until the root nursery is # Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except (trio.MultiError, Exception) as err: except Exception as err:
if not registered_with_arbiter: if not registered_with_arbiter:
# TODO: I guess we could try to connect back # TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger # to the parent through a channel and engage a debugger
@ -728,12 +755,17 @@ class Actor:
finally: finally:
log.info("Root nursery complete") log.info("Root nursery complete")
# tear down all lifetime contexts
# api idea: ``tractor.open_context()``
log.warn("Closing all actor lifetime contexts")
self._lifetime_stack.close()
# Unregister actor from the arbiter # Unregister actor from the arbiter
if registered_with_arbiter and ( if registered_with_arbiter and (
self._arb_addr is not None self._arb_addr is not None
): ):
failed = False failed = False
with trio.move_on_after(5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
try: try:
async with get_arbiter(*self._arb_addr) as arb_portal: async with get_arbiter(*self._arb_addr) as arb_portal:
@ -807,19 +839,32 @@ class Actor:
spawning new rpc tasks spawning new rpc tasks
- return control the parent channel message loop - return control the parent channel message loop
""" """
log.warning(f"{self.uid} is trying to cancel")
self._cancel_called = True
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# kill any debugger request task to avoid deadlock
# with the root actor in this tree
dbcs = _debug._debugger_request_cs
if dbcs is not None:
log.debug("Cancelling active debugger request")
dbcs.cancel()
# kill all ongoing tasks # kill all ongoing tasks
await self.cancel_rpc_tasks() await self.cancel_rpc_tasks()
# cancel all rpc tasks permanently
if self._service_n:
self._service_n.cancel_scope.cancel()
# stop channel server # stop channel server
self.cancel_server() self.cancel_server()
await self._server_down.wait() await self._server_down.wait()
# rekt all channel loops log.warning(f"{self.uid} was sucessfullly cancelled")
if self._service_n: self._cancel_complete.set()
self._service_n.cancel_scope.cancel()
return True return True
# XXX: hard kill logic if needed? # XXX: hard kill logic if needed?
@ -859,20 +904,31 @@ class Actor:
scope.cancel() scope.cancel()
# wait for _invoke to mark the task complete # wait for _invoke to mark the task complete
log.debug(
f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
await is_complete.wait() await is_complete.wait()
log.debug( log.debug(
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n" f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n") f"peer: {chan.uid}\n")
async def cancel_rpc_tasks(self) -> None: async def cancel_rpc_tasks(
self,
only_chan: Optional[Channel] = None,
) -> None:
"""Cancel all existing RPC responder tasks using the cancel scope """Cancel all existing RPC responder tasks using the cancel scope
registered for each. registered for each.
""" """
tasks = self._rpc_tasks tasks = self._rpc_tasks
log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (chan, cid) in tasks.copy(): for (chan, cid) in tasks.copy():
if only_chan is not None:
if only_chan != chan:
continue
# TODO: this should really done in a nursery batch # TODO: this should really done in a nursery batch
await self._cancel_task(cid, chan) await self._cancel_task(cid, chan)
log.info( log.info(
f"Waiting for remaining rpc tasks to complete {tasks}") f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait() await self._ongoing_rpc_tasks.wait()
@ -1024,7 +1080,17 @@ async def _start_actor(
parent_addr=None parent_addr=None
) )
) )
try:
result = await main() result = await main()
except (Exception, trio.MultiError) as err:
try:
log.exception("Actor crashed:")
await _debug._maybe_enter_pm(err)
raise
finally:
await actor.cancel()
# XXX: the actor is cancelled when this context is complete # XXX: the actor is cancelled when this context is complete
# given that there are no more active peer channels connected # given that there are no more active peer channels connected

322
tractor/_debug.py 100644
View File

@ -0,0 +1,322 @@
"""
Multi-core debugging for da peeps!
"""
import bdb
import sys
from functools import partial
from contextlib import asynccontextmanager
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
# import signal
from async_generator import aclosing
import tractor
import trio
from trio.testing import wait_all_tasks_blocked
from .log import get_logger
from . import _state
from ._discovery import get_root
from ._state import is_root_process
try:
# wtf: only exported when installed in dev mode?
import pdbpp
except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff
import pdb
assert pdb.xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb
log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem']
# placeholder for function to set a ``trio.Event`` on debugger exit
_pdb_release_hook: Optional[Callable] = None
# actor-wide variable pointing to current task name using debugger
_in_debug = False
# lock in root actor preventing multi-access to local tty
_debug_lock = trio.StrictFIFOLock()
# XXX: set by the current task waiting on the root tty lock
# and must be cancelled if this actor is cancelled via message
# otherwise deadlocks with the parent actor may ensure
_debugger_request_cs: Optional[trio.CancelScope] = None
class TractorConfig(pdbpp.DefaultConfig):
"""Custom ``pdbpp`` goodness.
"""
# sticky_by_default = True
class PdbwTeardown(pdbpp.Pdb):
"""Add teardown hooks to the regular ``pdbpp.Pdb``.
"""
# override the pdbpp config with our coolio one
DefaultConfig = TractorConfig
# TODO: figure out how to dissallow recursive .set_trace() entry
# since that'll cause deadlock for us.
def set_continue(self):
global _in_debug
try:
super().set_continue()
finally:
_in_debug = False
_pdb_release_hook()
def set_quit(self):
global _in_debug
try:
super().set_quit()
finally:
_in_debug = False
_pdb_release_hook()
# TODO: will be needed whenever we get to true remote debugging.
# XXX see https://github.com/goodboy/tractor/issues/130
# # TODO: is there some way to determine this programatically?
# _pdb_exit_patterns = tuple(
# str.encode(patt + "\n") for patt in (
# 'c', 'cont', 'continue', 'q', 'quit')
# )
# def subactoruid2proc(
# actor: 'Actor', # noqa
# uid: Tuple[str, str]
# ) -> trio.Process:
# n = actor._actoruid2nursery[uid]
# _, proc, _ = n._children[uid]
# return proc
# async def hijack_stdin():
# log.info(f"Hijacking stdin from {actor.uid}")
# trap std in and relay to subproc
# async_stdin = trio.wrap_file(sys.stdin)
# async with aclosing(async_stdin):
# async for msg in async_stdin:
# log.trace(f"Stdin input:\n{msg}")
# # encode to bytes
# bmsg = str.encode(msg)
# # relay bytes to subproc over pipe
# # await proc.stdin.send_all(bmsg)
# if bmsg in _pdb_exit_patterns:
# log.info("Closing stdin hijack")
# break
@asynccontextmanager
async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
"""Acquire a actor local FIFO lock meant to mutex entry to a local
debugger entry point to avoid tty clobbering by multiple processes.
"""
task_name = trio.lowlevel.current_task()
try:
log.error(f"TTY BEING ACQUIRED by {task_name}:{uid}")
await _debug_lock.acquire()
log.error(f"TTY lock acquired by {task_name}:{uid}")
yield
finally:
_debug_lock.release()
log.error(f"TTY lock released by {task_name}:{uid}")
def handler(signum, frame):
"""Block SIGINT while in debug to avoid deadlocks with cancellation.
"""
print(
"tractor ignores SIGINT while in debug mode\n"
"If you have a special need for it please open an issue.\n"
)
# don't allow those stdlib mofos to mess with sigint handler
pdbpp.pdb.Pdb.sigint_handler = handler
# @contextmanager
# def _disable_sigint():
# try:
# # disable sigint handling while in debug
# prior_handler = signal.signal(signal.SIGINT, handler)
# yield
# finally:
# # restore SIGINT handling
# signal.signal(signal.SIGINT, prior_handler)
async def _hijack_stdin_relay_to_child(
subactor_uid: Tuple[str, str]
) -> AsyncIterator[str]:
# TODO: when we get to true remote debugging
# this will deliver stdin data
log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
async with _acquire_debug_lock(subactor_uid):
log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
# with _disable_sigint():
# indicate to child that we've locked stdio
yield 'Locked'
# wait for cancellation of stream by child
# indicating debugger is dis-engaged
await trio.sleep_forever()
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
# XXX: We only make this sync in case someone wants to
# overload the ``breakpoint()`` built-in.
def _breakpoint(debug_func) -> Awaitable[None]:
"""``tractor`` breakpoint entry for engaging pdb machinery
in subactors.
"""
actor = tractor.current_actor()
do_unlock = trio.Event()
async def wait_for_parent_stdin_hijack(
task_status=trio.TASK_STATUS_IGNORED
):
global _debugger_request_cs
with trio.CancelScope() as cs:
_debugger_request_cs = cs
try:
async with get_root() as portal:
with trio.fail_after(.5):
agen = await portal.run(
'tractor._debug',
'_hijack_stdin_relay_to_child',
subactor_uid=actor.uid,
)
async with aclosing(agen):
# block until first yield above
async for val in agen:
assert val == 'Locked'
task_status.started()
# with trio.CancelScope(shield=True):
await do_unlock.wait()
# trigger cancellation of remote stream
break
finally:
log.debug(f"Exiting debugger for actor {actor}")
global _in_debug
_in_debug = False
log.debug(f"Child {actor} released parent stdio lock")
async def _bp():
"""Async breakpoint which schedules a parent stdio lock, and once complete
enters the ``pdbpp`` debugging console.
"""
task_name = trio.lowlevel.current_task().name
global _in_debug
# TODO: need a more robust check for the "root" actor
if actor._parent_chan and not is_root_process():
if _in_debug:
if _in_debug == task_name:
# this task already has the lock and is
# likely recurrently entering a breakpoint
return
# if **this** actor is already in debug mode block here
# waiting for the control to be released - this allows
# support for recursive entries to `tractor.breakpoint()`
log.warning(
f"Actor {actor.uid} already has a debug lock, waiting...")
await do_unlock.wait()
await trio.sleep(0.1)
# assign unlock callback for debugger teardown hooks
global _pdb_release_hook
_pdb_release_hook = do_unlock.set
# mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process
_in_debug = task_name
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
await actor._service_n.start(wait_for_parent_stdin_hijack)
elif is_root_process():
# we also wait in the root-parent for any child that
# may have the tty locked prior
if _debug_lock.locked(): # root process already has it; ignore
return
await _debug_lock.acquire()
_pdb_release_hook = _debug_lock.release
# block here one (at the appropriate frame *up* where
# ``breakpoint()`` was awaited and begin handling stdio
log.debug("Entering the synchronous world of pdb")
debug_func(actor)
# user code **must** await this!
return _bp()
def _set_trace(actor):
log.critical(f"\nAttaching pdb to actor: {actor.uid}\n")
PdbwTeardown().set_trace(
# start 2 levels up in user code
frame=sys._getframe().f_back.f_back,
)
breakpoint = partial(
_breakpoint,
_set_trace,
)
def _post_mortem(actor):
log.critical(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
# custom Pdb post-mortem entry
pdbpp.xpm(Pdb=PdbwTeardown)
post_mortem = partial(
_breakpoint,
_post_mortem,
)
async def _maybe_enter_pm(err):
if (
_state.debug_mode()
and not isinstance(err, bdb.BdbQuit)
# XXX: if the error is the likely result of runtime-wide
# cancellation, we don't want to enter the debugger since
# there's races between when the parent actor has killed all
# comms and when the child tries to contact said parent to
# acquire the tty lock.
# Really we just want to mostly avoid catching KBIs here so there
# might be a simpler check we can do?
and trio.MultiError.filter(
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None,
err,
)
):
log.warning("Actor crashed, entering debug mode")
await post_mortem()

View File

@ -11,7 +11,7 @@ from ._portal import (
open_portal, open_portal,
LocalPortal, LocalPortal,
) )
from ._state import current_actor from ._state import current_actor, _runtime_vars
@asynccontextmanager @asynccontextmanager
@ -37,6 +37,16 @@ async def get_arbiter(
yield arb_portal yield arb_portal
@asynccontextmanager
async def get_root(
**kwargs,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
host, port = _runtime_vars['_root_mailbox']
async with _connect_chan(host, port) as chan:
async with open_portal(chan, **kwargs) as portal:
yield portal
@asynccontextmanager @asynccontextmanager
async def find_actor( async def find_actor(
name: str, name: str,

View File

@ -3,6 +3,7 @@ Process entry points.
""" """
from functools import partial from functools import partial
from typing import Tuple, Any from typing import Tuple, Any
import signal
import trio # type: ignore import trio # type: ignore
@ -57,6 +58,13 @@ def _trio_main(
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio_run_in_process` subactor.
""" """
# Disable sigint handling in children;
# we don't need it thanks to our cancellation machinery.
signal.signal(signal.SIGINT, signal.SIG_IGN)
# TODO: make a global func to set this or is it too hacky?
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}") f"Setting loglevel for {actor.uid} to {actor.loglevel}")

View File

@ -278,7 +278,8 @@ class Portal:
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield # XXX: sure would be nice to make this work with a proper shield
# with trio.CancelScope(shield=True): # with trio.CancelScope() as cancel_scope:
# with trio.CancelScope(shield=True) as cancel_scope:
with trio.move_on_after(0.5) as cancel_scope: with trio.move_on_after(0.5) as cancel_scope:
cancel_scope.shield = True cancel_scope.shield = True
await self.run('self', 'cancel') await self.run('self', 'cancel')

View File

@ -23,7 +23,7 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override
from ._state import current_actor from ._state import current_actor, is_main_process
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
@ -87,12 +87,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
return _ctx return _ctx
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
async def exhaust_portal( async def exhaust_portal(
portal: Portal, portal: Portal,
actor: Actor actor: Actor
@ -118,6 +112,11 @@ async def exhaust_portal(
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError`` # we reraise in the parent task via a ``trio.MultiError``
return err return err
except trio.Cancelled as err:
# lol, of course we need this too ;P
# TODO: merge with above?
log.warning(f"Cancelled result waiter for {portal.actor.uid}")
return err
else: else:
log.debug(f"Returning final result: {final}") log.debug(f"Returning final result: {final}")
return final return final
@ -194,8 +193,17 @@ async def spawn_subactor(
# the outer scope since no actor zombies are # the outer scope since no actor zombies are
# ever allowed. This ``__aexit__()`` also shields # ever allowed. This ``__aexit__()`` also shields
# internally. # internally.
log.debug(f"Attempting to kill {proc}")
# NOTE: this timeout effectively does nothing right now since
# we are shielding the ``.wait()`` inside ``new_proc()`` which
# will pretty much never release until the process exits.
with trio.move_on_after(3) as cs:
async with proc: async with proc:
log.debug(f"Terminating {proc}") log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
log.critical(f"HARD KILLING {proc}")
proc.kill()
async def new_proc( async def new_proc(
@ -206,6 +214,8 @@ async def new_proc(
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False, use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
@ -241,9 +251,14 @@ async def new_proc(
"statespace": subactor.statespace, "statespace": subactor.statespace,
"_arb_addr": subactor._arb_addr, "_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0], "bind_host": bind_addr[0],
"bind_port": bind_addr[1] "bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
}) })
# track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up # resume caller at next checkpoint now that child is up
task_status.started(portal) task_status.started(portal)
@ -353,6 +368,8 @@ async def new_proc(
await proc_waiter(proc) await proc_waiter(proc)
proc.join() proc.join()
# This is again common logic for all backends:
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing this subactor # pop child entry to indicate we are no longer managing this subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid) subactor, proc, portal = actor_nursery._children.pop(subactor.uid)

View File

@ -1,12 +1,18 @@
""" """
Per process state Per process state
""" """
from typing import Optional from typing import Optional, Dict, Any
from collections import Mapping from collections import Mapping
import multiprocessing as mp
import trio import trio
_current_actor: Optional['Actor'] = None # type: ignore _current_actor: Optional['Actor'] = None # type: ignore
_runtime_vars: Dict[str, Any] = {
'_debug_mode': False,
'_is_root': False,
'_root_mailbox': (None, None)
}
def current_actor() -> 'Actor': # type: ignore def current_actor() -> 'Actor': # type: ignore
@ -36,3 +42,20 @@ class ActorContextInfo(Mapping):
except RuntimeError: except RuntimeError:
# no local actor/task context initialized yet # no local actor/task context initialized yet
return f'no {key} context' return f'no {key} context'
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
def debug_mode() -> bool:
"""Bool determining if "debug mode" is on which enables
remote subactor pdb entry on crashes.
"""
return bool(_runtime_vars['_debug_mode'])
def is_root_process() -> bool:
return _runtime_vars['_is_root']

View File

@ -13,10 +13,11 @@ from ._state import current_actor
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
from . import _state
from . import _spawn from . import _spawn
log = get_logger('tractor') log = get_logger(__name__)
_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) _default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)
@ -58,6 +59,10 @@ class ActorNursery:
) -> Portal: ) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
subactor = Actor( subactor = Actor(
name, name,
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
@ -83,6 +88,7 @@ class ActorNursery:
self.errors, self.errors,
bind_addr, bind_addr,
parent_addr, parent_addr,
_rtv, # run time vars
) )
) )
@ -131,19 +137,14 @@ class ActorNursery:
If ``hard_killl`` is set to ``True`` then kill the processes If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation. directly without any far end graceful ``trio`` cancellation.
""" """
def do_hard_kill(proc): self.cancelled = True
log.warning(f"Hard killing subactors {self._children}")
proc.terminate()
# XXX: below doesn't seem to work?
# send KeyBoardInterrupt (trio abort signal) to sub-actors
# os.kill(proc.pid, signal.SIGINT)
log.debug("Cancelling nursery") log.warning(f"Cancelling nursery in {self._actor.uid}")
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values(): for subactor, proc, portal in self._children.values():
if hard_kill: if hard_kill:
do_hard_kill(proc) proc.terminate()
else: else:
if portal is None: # actor hasn't fully spawned yet if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid] event = self._actor._peer_connected[subactor.uid]
@ -163,7 +164,7 @@ class ActorNursery:
if chan: if chan:
portal = Portal(chan) portal = Portal(chan)
else: # there's no other choice left else: # there's no other choice left
do_hard_kill(proc) proc.terminate()
# spawn cancel tasks for each sub-actor # spawn cancel tasks for each sub-actor
assert portal assert portal
@ -172,13 +173,13 @@ class ActorNursery:
# if we cancelled the cancel (we hung cancelling remote actors) # if we cancelled the cancel (we hung cancelling remote actors)
# then hard kill all sub-processes # then hard kill all sub-processes
if cs.cancelled_caught: if cs.cancelled_caught:
log.error(f"Failed to gracefully cancel {self}, hard killing!") log.error(
async with trio.open_nursery(): f"Failed to cancel {self}\nHard killing process tree!")
for subactor, proc, portal in self._children.values(): for subactor, proc, portal in self._children.values():
nursery.start_soon(do_hard_kill, proc) log.warning(f"Hard killing process {proc}")
proc.terminate()
# mark ourselves as having (tried to have) cancelled all subactors # mark ourselves as having (tried to have) cancelled all subactors
self.cancelled = True
self._join_procs.set() self._join_procs.set()
@ -229,7 +230,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
# after we yield upwards # after we yield upwards
yield anursery yield anursery
log.debug( log.debug(
f"Waiting on subactors {anursery._children}" f"Waiting on subactors {anursery._children} "
"to complete" "to complete"
) )
except BaseException as err: except BaseException as err:
@ -273,6 +274,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
# ria_nursery scope end # ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well?
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# If actor-local error was raised while waiting on # If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all # ".run_in_actor()" actors then we also want to cancel all
@ -294,6 +296,8 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
if anursery._children: if anursery._children:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await anursery.cancel() await anursery.cancel()
# use `MultiError` as needed
if len(errors) > 1: if len(errors) > 1:
raise trio.MultiError(tuple(errors.values())) raise trio.MultiError(tuple(errors.values()))
else: else: