Merge pull request #165 from goodboy/signint_saviour

Ignore SIGINT when in a debugger REPL
lifetime_stack_tests
goodboy 2022-08-03 09:26:54 -04:00 committed by GitHub
commit 641ed7a32a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1282 additions and 454 deletions

View File

@ -73,8 +73,11 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: List dependencies
run: pip freeze
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
# We skip 3.10 on windows for now due to # We skip 3.10 on windows for now due to
# https://github.com/pytest-dev/pytest/issues/8733 # https://github.com/pytest-dev/pytest/issues/8733
@ -110,5 +113,8 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: List dependencies
run: pip freeze
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace

View File

@ -3,13 +3,14 @@
|gh_actions| |gh_actions|
|docs| |docs|
``tractor`` is a `structured concurrent`_, multi-processing_ runtime built on trio_. ``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*": Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
our nurseries_ let you spawn new Python processes which each run a ``trio`` our nurseries_ let you spawn new Python processes which each run a ``trio``
scheduled runtime - a call to ``trio.run()``. scheduled runtime - a call to ``trio.run()``.
We believe the system adhere's to the `3 axioms`_ of an "`actor model`_" We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
but likely *does not* look like what *you* probably think an "actor but likely *does not* look like what *you* probably think an "actor
model" looks like, and that's *intentional*. model" looks like, and that's *intentional*.
@ -577,13 +578,13 @@ say hi, please feel free to reach us in our `matrix channel`_. If
matrix seems too hip, we're also mostly all in the the `trio gitter matrix seems too hip, we're also mostly all in the the `trio gitter
channel`_! channel`_!
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trio: https://github.com/python-trio/trio
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements .. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
.. _actor model: https://en.wikipedia.org/wiki/Actor_model .. _actor model: https://en.wikipedia.org/wiki/Actor_model
.. _trio: https://github.com/python-trio/trio
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles .. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts .. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s

View File

View File

@ -0,0 +1,40 @@
import trio
import tractor
@tractor.context
async def just_sleep(
ctx: tractor.Context,
**kwargs,
) -> None:
'''
Start and sleep.
'''
await ctx.started()
await trio.sleep_forever()
async def main() -> None:
async with tractor.open_nursery(
debug_mode=True,
) as n:
portal = await n.start_actor(
'ctx_child',
# XXX: we don't enable the current module in order
# to trigger `ModuleNotFound`.
enable_modules=[],
)
async with portal.open_context(
just_sleep, # taken from pytest parameterization
) as (ctx, sent):
raise KeyboardInterrupt
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,50 @@
import tractor
import trio
async def gen():
yield 'yo'
await tractor.breakpoint()
yield 'yo'
await tractor.breakpoint()
@tractor.context
async def just_bp(
ctx: tractor.Context,
) -> None:
await ctx.started()
await tractor.breakpoint()
# TODO: bps and errors in this call..
async for val in gen():
print(val)
# await trio.sleep(0.5)
# prematurely destroy the connection
await ctx.chan.aclose()
# THIS CAUSES AN UNRECOVERABLE HANG
# without latest ``pdbpp``:
assert 0
async def main():
async with tractor.open_nursery(
debug_mode=True,
) as n:
p = await n.start_actor(
'bp_boi',
enable_modules=[__name__],
)
async with p.open_context(
just_bp,
) as (ctx, first):
await trio.sleep_forever()
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,49 @@
import trio
import click
import tractor
import pydantic
# from multiprocessing import shared_memory
@tractor.context
async def just_sleep(
ctx: tractor.Context,
**kwargs,
) -> None:
'''
Test a small ping-pong 2-way streaming server.
'''
await ctx.started()
await trio.sleep_forever()
async def main() -> None:
proc = await trio.open_process( (
'python',
'-c',
'import trio; trio.run(trio.sleep_forever)',
))
await proc.wait()
# await trio.sleep_forever()
# async with tractor.open_nursery() as n:
# portal = await n.start_actor(
# 'rpc_server',
# enable_modules=[__name__],
# )
# async with portal.open_context(
# just_sleep, # taken from pytest parameterization
# ) as (ctx, sent):
# await trio.sleep_forever()
if __name__ == '__main__':
import time
# time.sleep(999)
trio.run(main)

View File

@ -0,0 +1,36 @@
Add SIGINT protection to our `pdbpp` based debugger subystem such that
for (single-depth) actor trees in debug mode we ignore interrupts in any
actor currently holding the TTY lock thus avoiding clobbering IPC
connections and/or task and process state when working in the REPL.
As a big note currently so called "nested" actor trees (trees with
actors having more then one parent/ancestor) are not fully supported
since we don't yet have a mechanism to relay the debug mode knowledge
"up" the actor tree (for eg. when handling a crash in a leaf actor).
As such currently there is a set of tests and known scenarios which will
result in process cloberring by the zombie repaing machinery and these
have been documented in https://github.com/goodboy/tractor/issues/320.
The implementation details include:
- utilizing a custom SIGINT handler which we apply whenever an actor's
runtime enters the debug machinery, which we also make sure the
stdlib's `pdb` configuration doesn't override (which it does by
default without special instance config).
- litter the runtime with `maybe_wait_for_debugger()` mostly in spots
where the root actor should block before doing embedded nursery
teardown ops which both cancel potential-children-in-deubg as well
as eventually trigger zombie reaping machinery.
- hardening of the TTY locking semantics/API both in terms of IPC
terminations and cancellation and lock release determinism from
sync debugger instance methods.
- factoring of locking infrastructure into a new `._debug.Lock` global
which encapsulates all details of the ``trio`` sync primitives and
task/actor uid management and tracking.
We also add `ctrl-c` cases throughout the test suite though these are
disabled for py3.9 (`pdbpp` UX differences that don't seem worth
compensating for, especially since this will be our last 3.9 supported
release) and there are a slew of marked cases that aren't expected to
work in CI more generally (as mentioned in the "nested" tree note
above) despite seemingly working when run manually on linux.

View File

@ -1,5 +1,6 @@
pytest pytest
pytest-trio pytest-trio
pytest-timeout
pdbpp pdbpp
mypy<0.920 mypy<0.920
trio_typing<0.7.0 trio_typing<0.7.0

View File

@ -43,7 +43,7 @@ setup(
install_requires=[ install_requires=[
# trio related # trio related
'trio>0.8', 'trio >= 0.20',
'async_generator', 'async_generator',
'trio_typing', 'trio_typing',
@ -54,12 +54,23 @@ setup(
# tooling # tooling
'colorlog', 'colorlog',
'wrapt', 'wrapt',
'pdbpp',
# pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
# and pep:
# https://peps.python.org/pep-0440/#version-specifiers
'pdbpp <= 0.10.1; python_version < "3.10"',
# windows deps workaround for ``pdbpp`` # windows deps workaround for ``pdbpp``
# https://github.com/pdbpp/pdbpp/issues/498 # https://github.com/pdbpp/pdbpp/issues/498
# https://github.com/pdbpp/fancycompleter/issues/37 # https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"', 'pyreadline3 ; platform_system == "Windows"',
# 3.10 has an outstanding unreleased issue and `pdbpp` itself
# pins to patched forks of its own dependencies as well..and
# we need a specific patch on master atm.
'pdbpp @ git+https://github.com/pdbpp/pdbpp@76c4be5#egg=pdbpp ; python_version > "3.9"', # noqa: E501
# serialization # serialization
'msgspec >= "0.4.0"' 'msgspec >= "0.4.0"'
@ -83,8 +94,8 @@ setup(
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Intended Audience :: Science/Research", "Intended Audience :: Science/Research",
"Intended Audience :: Developers", "Intended Audience :: Developers",
"Topic :: System :: Distributed Computing", "Topic :: System :: Distributed Computing",

View File

@ -85,11 +85,14 @@ def spawn_backend(request):
return request.config.option.spawn_backend return request.config.option.spawn_backend
_ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def ci_env() -> bool: def ci_env() -> bool:
"""Detect CI envoirment. """Detect CI envoirment.
""" """
return os.environ.get('TRAVIS', False) or os.environ.get('CI', False) return _ci_env
@pytest.fixture(scope='session') @pytest.fixture(scope='session')

View File

@ -265,42 +265,44 @@ async def test_callee_closes_ctx_after_stream_open():
enable_modules=[__name__], enable_modules=[__name__],
) )
async with portal.open_context( with trio.fail_after(2):
close_ctx_immediately, async with portal.open_context(
close_ctx_immediately,
# flag to avoid waiting the final result # flag to avoid waiting the final result
# cancel_on_exit=True, # cancel_on_exit=True,
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
with trio.fail_after(0.5): with trio.fail_after(0.5):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# should fall through since ``StopAsyncIteration`` # should fall through since ``StopAsyncIteration``
# should be raised through translation of # should be raised through translation of
# a ``trio.EndOfChannel`` by # a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()`` # ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream: async for _ in stream:
assert 0 assert 0
else: else:
# verify stream is now closed # verify stream is now closed
try: try:
await stream.receive() await stream.receive()
except trio.EndOfChannel: except trio.EndOfChannel:
pass
# TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of
# if/when we decide that's a good idea?)
try:
with trio.fail_after(0.5):
async with ctx.open_stream() as stream:
pass pass
except trio.ClosedResourceError:
# TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of
# if/when we decide that's a good idea?)
try:
async with ctx.open_stream() as stream:
pass pass
except trio.ClosedResourceError:
pass
await portal.cancel_actor() await portal.cancel_actor()

View File

@ -1,5 +1,5 @@
""" """
That native debug better work! That "native" debug mode better work!
All these tests can be understood (somewhat) by running the equivalent All these tests can be understood (somewhat) by running the equivalent
`examples/debugging/` scripts manually. `examples/debugging/` scripts manually.
@ -10,15 +10,20 @@ TODO:
- wonder if any of it'll work on OS X? - wonder if any of it'll work on OS X?
""" """
import time
from os import path from os import path
from typing import Optional
import platform import platform
import sys
import time
import pytest import pytest
import pexpect import pexpect
from pexpect.exceptions import (
TIMEOUT,
EOF,
)
from conftest import repodir from conftest import repodir, _ci_env
# TODO: The next great debugger audit could be done by you! # TODO: The next great debugger audit could be done by you!
# - recurrent entry to breakpoint() from single actor *after* and an # - recurrent entry to breakpoint() from single actor *after* and an
@ -52,6 +57,24 @@ def mk_cmd(ex_name: str) -> str:
) )
# TODO: was trying to this xfail style but some weird bug i see in CI
# that's happening at collect time.. pretty soon gonna dump actions i'm
# thinkin...
# in CI we skip tests which >= depth 1 actor trees due to there
# still being an oustanding issue with relaying the debug-mode-state
# through intermediary parents.
has_nested_actors = pytest.mark.has_nested_actors
# .xfail(
# os.environ.get('CI', False),
# reason=(
# 'This test uses nested actors and fails in CI\n'
# 'The test seems to run fine locally but until we solve the '
# 'following issue this CI test will be xfail:\n'
# 'https://github.com/goodboy/tractor/issues/320'
# )
# )
@pytest.fixture @pytest.fixture
def spawn( def spawn(
start_method, start_method,
@ -73,6 +96,96 @@ def spawn(
return _spawn return _spawn
PROMPT = r"\(Pdb\+\+\)"
def expect(
child,
# prompt by default
patt: str = PROMPT,
**kwargs,
) -> None:
'''
Expect wrapper that prints last seen console
data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before = str(child.before.decode())
print(before)
raise
def assert_before(
child,
patts: list[str],
) -> None:
before = str(child.before.decode())
for patt in patts:
try:
assert patt in before
except AssertionError:
print(before)
raise
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(
request,
ci_env: bool,
) -> bool:
use_ctlc = request.param
if (
sys.version_info <= (3, 10)
and use_ctlc
):
# on 3.9 it seems the REPL UX
# is highly unreliable and frankly annoying
# to test for. It does work from manual testing
# but i just don't think it's wroth it to try
# and get this working especially since we want to
# be 3.10+ mega-asap.
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
if ci_env:
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test for {node} uses nested actors and fails in CI\n'
f'The test seems to run fine locally but until we solve'
'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
if use_ctlc:
# XXX: disable pygments highlighting for auto-tests
# since some envs (like actions CI) will struggle
# the the added color-char encoding..
from tractor._debug import TractorConfig
TractorConfig.use_pygements = False
yield use_ctlc
@pytest.mark.parametrize( @pytest.mark.parametrize(
'user_in_out', 'user_in_out',
[ [
@ -89,7 +202,7 @@ def test_root_actor_error(spawn, user_in_out):
child = spawn('root_actor_error') child = spawn('root_actor_error')
# scan for the pdbpp prompt # scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)") expect(child, PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
@ -101,7 +214,7 @@ def test_root_actor_error(spawn, user_in_out):
child.sendline(user_input) child.sendline(user_input)
# process should exit # process should exit
child.expect(pexpect.EOF) expect(child, EOF)
assert expect_err_str in str(child.before) assert expect_err_str in str(child.before)
@ -137,20 +250,68 @@ def test_root_actor_bp(spawn, user_in_out):
assert expect_err_str in str(child.before) assert expect_err_str in str(child.before)
def test_root_actor_bp_forever(spawn): def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
patt: Optional[str] = None,
# expect repl UX to reprint the prompt after every
# ctrl-c send.
# XXX: no idea but, in CI this never seems to work even on 3.10 so
# needs some further investigation potentially...
expect_prompt: bool = not _ci_env,
) -> None:
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
time.sleep(delay)
child.sendcontrol('c')
# TODO: figure out why this makes CI fail..
# if you run this test manually it works just fine..
if expect_prompt:
before = str(child.before.decode())
time.sleep(delay)
child.expect(r"\(Pdb\+\+\)")
time.sleep(delay)
if patt:
# should see the last line on console
assert patt in before
def test_root_actor_bp_forever(
spawn,
ctlc: bool,
):
"Re-enter a breakpoint from the root actor-task." "Re-enter a breakpoint from the root actor-task."
child = spawn('root_actor_breakpoint_forever') child = spawn('root_actor_breakpoint_forever')
# do some "next" commands to demonstrate recurrent breakpoint # do some "next" commands to demonstrate recurrent breakpoint
# entries # entries
for _ in range(10): for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
# do one continue which should trigger a new task to lock the tty if ctlc:
do_ctlc(child)
child.sendline('next')
# do one continue which should trigger a
# new task to lock the tty
child.sendline('continue') child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
# seems that if we hit ctrl-c too fast the
# sigint guard machinery might not kick in..
time.sleep(0.001)
if ctlc:
do_ctlc(child)
# XXX: this previously caused a bug! # XXX: this previously caused a bug!
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
@ -158,10 +319,25 @@ def test_root_actor_bp_forever(spawn):
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
# quit out of the loop
child.sendline('q')
child.expect(pexpect.EOF)
def test_subactor_error(spawn):
"Single subactor raising an error"
@pytest.mark.parametrize(
'do_next',
(True, False),
ids='do_next={}'.format,
)
def test_subactor_error(
spawn,
ctlc: bool,
do_next: bool,
):
'''
Single subactor raising an error
'''
child = spawn('subactor_error') child = spawn('subactor_error')
# scan for the pdbpp prompt # scan for the pdbpp prompt
@ -170,23 +346,33 @@ def test_subactor_error(spawn):
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "Attaching to pdb in crashed actor: ('name_error'" in before
# send user command if do_next:
# (in this case it's the same for 'continue' vs. 'quit') child.sendline('n')
child.sendline('continue')
# the debugger should enter a second time in the nursery else:
# creating actor # make sure ctl-c sends don't do anything but repeat output
if ctlc:
do_ctlc(
child,
)
# send user command and (in this case it's the same for 'continue'
# vs. 'quit') the debugger should enter a second time in the nursery
# creating actor
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
# root actor gets debugger engaged # root actor gets debugger engaged
assert "Attaching to pdb in crashed actor: ('root'" in before assert "Attaching to pdb in crashed actor: ('root'" in before
# error is a remote error propagated from the subactor # error is a remote error propagated from the subactor
assert "RemoteActorError: ('name_error'" in before assert "RemoteActorError: ('name_error'" in before
# another round
if ctlc:
do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect('\r\n') child.expect('\r\n')
@ -194,7 +380,10 @@ def test_subactor_error(spawn):
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
def test_subactor_breakpoint(spawn): def test_subactor_breakpoint(
spawn,
ctlc: bool,
):
"Single subactor with an infinite breakpoint loop" "Single subactor with an infinite breakpoint loop"
child = spawn('subactor_breakpoint') child = spawn('subactor_breakpoint')
@ -211,6 +400,9 @@ def test_subactor_breakpoint(spawn):
child.sendline('next') child.sendline('next')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
if ctlc:
do_ctlc(child)
# now run some "continues" to show re-entries # now run some "continues" to show re-entries
for _ in range(5): for _ in range(5):
child.sendline('continue') child.sendline('continue')
@ -218,6 +410,9 @@ def test_subactor_breakpoint(spawn):
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
if ctlc:
do_ctlc(child)
# finally quit the loop # finally quit the loop
child.sendline('q') child.sendline('q')
@ -228,6 +423,9 @@ def test_subactor_breakpoint(spawn):
assert "RemoteActorError: ('breakpoint_forever'" in before assert "RemoteActorError: ('breakpoint_forever'" in before
assert 'bdb.BdbQuit' in before assert 'bdb.BdbQuit' in before
if ctlc:
do_ctlc(child)
# quit the parent # quit the parent
child.sendline('c') child.sendline('c')
@ -239,11 +437,16 @@ def test_subactor_breakpoint(spawn):
assert 'bdb.BdbQuit' in before assert 'bdb.BdbQuit' in before
def test_multi_subactors(spawn): @has_nested_actors
""" def test_multi_subactors(
Multiple subactors, both erroring and breakpointing as well as spawn,
a nested subactor erroring. ctlc: bool,
""" ):
'''
Multiple subactors, both erroring and
breakpointing as well as a nested subactor erroring.
'''
child = spawn(r'multi_subactors') child = spawn(r'multi_subactors')
# scan for the pdbpp prompt # scan for the pdbpp prompt
@ -252,12 +455,18 @@ def test_multi_subactors(spawn):
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
if ctlc:
do_ctlc(child)
# do some "next" commands to demonstrate recurrent breakpoint # do some "next" commands to demonstrate recurrent breakpoint
# entries # entries
for _ in range(10): for _ in range(10):
child.sendline('next') child.sendline('next')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
if ctlc:
do_ctlc(child)
# continue to next error # continue to next error
child.sendline('c') child.sendline('c')
@ -267,14 +476,22 @@ def test_multi_subactors(spawn):
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert "NameError" in before assert "NameError" in before
if ctlc:
do_ctlc(child)
# continue again # continue again
child.sendline('c') child.sendline('c')
# 2nd name_error failure # 2nd name_error failure
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error_1'" in before assert_before(child, [
assert "NameError" in before "Attaching to pdb in crashed actor: ('name_error_1'",
"NameError",
])
if ctlc:
do_ctlc(child)
# breakpoint loop should re-engage # breakpoint loop should re-engage
child.sendline('c') child.sendline('c')
@ -282,20 +499,33 @@ def test_multi_subactors(spawn):
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
if ctlc:
do_ctlc(child)
# wait for spawn error to show up # wait for spawn error to show up
spawn_err = "Attaching to pdb in crashed actor: ('spawn_error'" spawn_err = "Attaching to pdb in crashed actor: ('spawn_error'"
while spawn_err not in before: start = time.time()
while (
spawn_err not in before
and (time.time() - start) < 3 # timeout eventually
):
child.sendline('c') child.sendline('c')
time.sleep(0.1) time.sleep(0.1)
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
if ctlc:
do_ctlc(child)
# 2nd depth nursery should trigger # 2nd depth nursery should trigger
# child.sendline('c') # (XXX: this below if guard is technically a hack that makes the
# child.expect(r"\(Pdb\+\+\)") # nested case seem to work locally on linux but ideally in the long
# before = str(child.before.decode()) # run this can be dropped.)
assert spawn_err in before if not ctlc:
assert "RemoteActorError: ('name_error_1'" in before assert_before(child, [
spawn_err,
"RemoteActorError: ('name_error_1'",
])
# now run some "continues" to show re-entries # now run some "continues" to show re-entries
for _ in range(5): for _ in range(5):
@ -306,31 +536,46 @@ def test_multi_subactors(spawn):
child.sendline('q') child.sendline('q')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
# debugger attaches to root
assert "Attaching to pdb in crashed actor: ('root'" in before assert_before(child, [
# expect a multierror with exceptions for each sub-actor # debugger attaches to root
assert "RemoteActorError: ('breakpoint_forever'" in before "Attaching to pdb in crashed actor: ('root'",
assert "RemoteActorError: ('name_error'" in before
assert "RemoteActorError: ('spawn_error'" in before # expect a multierror with exceptions for each sub-actor
assert "RemoteActorError: ('name_error_1'" in before "RemoteActorError: ('breakpoint_forever'",
assert 'bdb.BdbQuit' in before "RemoteActorError: ('name_error'",
"RemoteActorError: ('spawn_error'",
"RemoteActorError: ('name_error_1'",
'bdb.BdbQuit',
])
if ctlc:
do_ctlc(child)
# process should exit # process should exit
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
# repeat of previous multierror for final output # repeat of previous multierror for final output
before = str(child.before.decode()) assert_before(child, [
assert "RemoteActorError: ('breakpoint_forever'" in before "RemoteActorError: ('breakpoint_forever'",
assert "RemoteActorError: ('name_error'" in before "RemoteActorError: ('name_error'",
assert "RemoteActorError: ('spawn_error'" in before "RemoteActorError: ('spawn_error'",
assert "RemoteActorError: ('name_error_1'" in before "RemoteActorError: ('name_error_1'",
assert 'bdb.BdbQuit' in before 'bdb.BdbQuit',
])
def test_multi_daemon_subactors(spawn, loglevel): def test_multi_daemon_subactors(
"""Multiple daemon subactors, both erroring and breakpointing within a spawn,
loglevel: str,
ctlc: bool
):
'''
Multiple daemon subactors, both erroring and breakpointing within a
stream. stream.
"""
'''
child = spawn('multi_daemon_subactors') child = spawn('multi_daemon_subactors')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
@ -352,6 +597,9 @@ def test_multi_daemon_subactors(spawn, loglevel):
else: else:
raise ValueError("Neither log msg was found !?") raise ValueError("Neither log msg was found !?")
if ctlc:
do_ctlc(child)
# NOTE: previously since we did not have clobber prevention # NOTE: previously since we did not have clobber prevention
# in the root actor this final resume could result in the debugger # in the root actor this final resume could result in the debugger
# tearing down since both child actors would be cancelled and it was # tearing down since both child actors would be cancelled and it was
@ -371,7 +619,7 @@ def test_multi_daemon_subactors(spawn, loglevel):
# now the root actor won't clobber the bp_forever child # now the root actor won't clobber the bp_forever child
# during it's first access to the debug lock, but will instead # during it's first access to the debug lock, but will instead
# wait for the lock to release, by the edge triggered # wait for the lock to release, by the edge triggered
# ``_debug._no_remote_has_tty`` event before sending cancel messages # ``_debug.Lock.no_remote_has_tty`` event before sending cancel messages
# (via portals) to its underlings B) # (via portals) to its underlings B)
# at some point here there should have been some warning msg from # at some point here there should have been some warning msg from
@ -379,6 +627,9 @@ def test_multi_daemon_subactors(spawn, loglevel):
# it seems unreliable in testing here to gnab it: # it seems unreliable in testing here to gnab it:
# assert "in use by child ('bp_forever'," in before # assert "in use by child ('bp_forever'," in before
if ctlc:
do_ctlc(child)
# wait for final error in root # wait for final error in root
while True: while True:
@ -394,17 +645,24 @@ def test_multi_daemon_subactors(spawn, loglevel):
except AssertionError: except AssertionError:
assert bp_forever_msg in before assert bp_forever_msg in before
if ctlc:
do_ctlc(child)
try: try:
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
except pexpect.exceptions.TIMEOUT: except TIMEOUT:
# Failed to exit using continue..? # Failed to exit using continue..?
child.sendline('q') child.sendline('q')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
def test_multi_subactors_root_errors(spawn): @has_nested_actors
def test_multi_subactors_root_errors(
spawn,
ctlc: bool
):
''' '''
Multiple subactors, both erroring and breakpointing as well as Multiple subactors, both erroring and breakpointing as well as
a nested subactor erroring. a nested subactor erroring.
@ -419,33 +677,48 @@ def test_multi_subactors_root_errors(spawn):
before = str(child.before.decode()) before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before assert "NameError: name 'doggypants' is not defined" in before
if ctlc:
do_ctlc(child)
# continue again to catch 2nd name error from # continue again to catch 2nd name error from
# actor 'name_error_1' (which is 2nd depth). # actor 'name_error_1' (which is 2nd depth).
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) assert_before(child, [
assert "Attaching to pdb in crashed actor: ('name_error_1'" in before "Attaching to pdb in crashed actor: ('name_error_1'",
assert "NameError" in before "NameError",
])
if ctlc:
do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) assert_before(child, [
assert "Attaching to pdb in crashed actor: ('spawn_error'" in before "Attaching to pdb in crashed actor: ('spawn_error'",
# boxed error from previous step # boxed error from previous step
assert "RemoteActorError: ('name_error_1'" in before "RemoteActorError: ('name_error_1'",
assert "NameError" in before "NameError",
])
if ctlc:
do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) assert_before(child, [
assert "Attaching to pdb in crashed actor: ('root'" in before "Attaching to pdb in crashed actor: ('root'",
# boxed error from first level failure # boxed error from previous step
assert "RemoteActorError: ('name_error'" in before "RemoteActorError: ('name_error'",
assert "NameError" in before "NameError",
])
# warnings assert we probably don't need # warnings assert we probably don't need
# assert "Cancelling nursery in ('spawn_error'," in before # assert "Cancelling nursery in ('spawn_error'," in before
if ctlc:
do_ctlc(child)
# continue again # continue again
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
@ -455,7 +728,14 @@ def test_multi_subactors_root_errors(spawn):
assert "AssertionError" in before assert "AssertionError" in before
def test_multi_nested_subactors_error_through_nurseries(spawn): @has_nested_actors
def test_multi_nested_subactors_error_through_nurseries(
spawn,
# TODO: address debugger issue for nested tree:
# https://github.com/goodboy/tractor/issues/320
# ctlc: bool,
):
"""Verify deeply nested actors that error trigger debugger entries """Verify deeply nested actors that error trigger debugger entries
at each actor nurserly (level) all the way up the tree. at each actor nurserly (level) all the way up the tree.
@ -476,7 +756,7 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
child.sendline('c') child.sendline('c')
time.sleep(0.1) time.sleep(0.1)
except pexpect.exceptions.EOF: except EOF:
# race conditions on how fast the continue is sent? # race conditions on how fast the continue is sent?
print(f"Failed early on {i}?") print(f"Failed early on {i}?")
@ -490,14 +770,19 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
assert "NameError" in before assert "NameError" in before
@pytest.mark.timeout(15)
@has_nested_actors
def test_root_nursery_cancels_before_child_releases_tty_lock( def test_root_nursery_cancels_before_child_releases_tty_lock(
spawn, spawn,
start_method start_method,
ctlc: bool,
): ):
"""Test that when the root sends a cancel message before a nested '''
child has unblocked (which can happen when it has the tty lock and Test that when the root sends a cancel message before a nested child
is engaged in pdb) it is indeed cancelled after exiting the debugger. has unblocked (which can happen when it has the tty lock and is
""" engaged in pdb) it is indeed cancelled after exiting the debugger.
'''
timed_out_early = False timed_out_early = False
child = spawn('root_cancelled_but_child_is_in_tty_lock') child = spawn('root_cancelled_but_child_is_in_tty_lock')
@ -509,6 +794,9 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before
time.sleep(0.5) time.sleep(0.5)
if ctlc:
do_ctlc(child)
child.sendline('c') child.sendline('c')
for i in range(4): for i in range(4):
@ -517,8 +805,8 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
except ( except (
pexpect.exceptions.EOF, EOF,
pexpect.exceptions.TIMEOUT, TIMEOUT,
): ):
# races all over.. # races all over..
@ -533,26 +821,37 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
before = str(child.before.decode()) before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before assert "NameError: name 'doggypants' is not defined" in before
child.sendline('c') if ctlc:
do_ctlc(child)
while True: child.sendline('c')
time.sleep(0.1)
for i in range(3):
try: try:
child.expect(pexpect.EOF) child.expect(pexpect.EOF, timeout=0.5)
break break
except pexpect.exceptions.TIMEOUT: except TIMEOUT:
child.sendline('c') child.sendline('c')
time.sleep(0.1)
print('child was able to grab tty lock again?') print('child was able to grab tty lock again?')
else:
print('giving up on child releasing, sending `quit` cmd')
child.sendline('q')
expect(child, EOF)
if not timed_out_early: if not timed_out_early:
before = str(child.before.decode()) before = str(child.before.decode())
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before assert_before(child, [
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before "tractor._exceptions.RemoteActorError: ('spawner0'",
assert "NameError: name 'doggypants' is not defined" in before "tractor._exceptions.RemoteActorError: ('name_error'",
"NameError: name 'doggypants' is not defined",
])
def test_root_cancels_child_context_during_startup( def test_root_cancels_child_context_during_startup(
spawn, spawn,
ctlc: bool,
): ):
'''Verify a fast fail in the root doesn't lock up the child reaping '''Verify a fast fail in the root doesn't lock up the child reaping
and all while using the new context api. and all while using the new context api.
@ -565,12 +864,16 @@ def test_root_cancels_child_context_during_startup(
before = str(child.before.decode()) before = str(child.before.decode())
assert "AssertionError" in before assert "AssertionError" in before
if ctlc:
do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)
def test_different_debug_mode_per_actor( def test_different_debug_mode_per_actor(
spawn, spawn,
ctlc: bool,
): ):
child = spawn('per_actor_debug') child = spawn('per_actor_debug')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
@ -580,6 +883,9 @@ def test_different_debug_mode_per_actor(
assert "Attaching to pdb in crashed actor: ('debugged_boi'" in before assert "Attaching to pdb in crashed actor: ('debugged_boi'" in before
assert "RuntimeError" in before assert "RuntimeError" in before
if ctlc:
do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(pexpect.EOF) child.expect(pexpect.EOF)

View File

@ -81,11 +81,14 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
'example_script', 'example_script',
# walk yields: (dirpath, dirnames, filenames) # walk yields: (dirpath, dirnames, filenames)
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2] [
(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
if '__' not in f if '__' not in f
and f[0] != '_' and f[0] != '_'
and 'debugging' not in p[0]], and 'debugging' not in p[0]
and 'integration' not in p[0]
],
ids=lambda t: t[1], ids=lambda t: t[1],
) )
@ -113,9 +116,19 @@ def test_example(run_example_in_subproc, example_script):
# print(f'STDOUT: {out}') # print(f'STDOUT: {out}')
# if we get some gnarly output let's aggregate and raise # if we get some gnarly output let's aggregate and raise
errmsg = err.decode() if err:
errlines = errmsg.splitlines() errmsg = err.decode()
if err and 'Error' in errlines[-1]: errlines = errmsg.splitlines()
raise Exception(errmsg) last_error = errlines[-1]
if (
'Error' in last_error
# XXX: currently we print this to console, but maybe
# shouldn't eventually once we figure out what's
# a better way to be explicit about aio side
# cancels?
and 'asyncio.exceptions.CancelledError' not in last_error
):
raise Exception(errmsg)
assert proc.returncode == 0 assert proc.returncode == 0

View File

@ -150,13 +150,13 @@ def test_loglevel_propagated_to_subactor(
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
name='arbiter', name='arbiter',
loglevel=level,
start_method=start_method, start_method=start_method,
arbiter_addr=arb_addr, arbiter_addr=arb_addr,
) as tn: ) as tn:
await tn.run_in_actor( await tn.run_in_actor(
check_loglevel, check_loglevel,
loglevel=level,
level=level, level=level,
) )

View File

@ -26,8 +26,11 @@ import importlib
import importlib.util import importlib.util
import inspect import inspect
import uuid import uuid
import typing from typing import (
from typing import Any, Optional, Union Any, Optional,
Union, TYPE_CHECKING,
Callable,
)
from types import ModuleType from types import ModuleType
import sys import sys
import os import os
@ -57,6 +60,10 @@ from . import _state
from . import _mp_fixup_main from . import _mp_fixup_main
if TYPE_CHECKING:
from ._supervise import ActorNursery
log = get_logger('tractor') log = get_logger('tractor')
@ -65,7 +72,7 @@ async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
chan: Channel, chan: Channel,
func: typing.Callable, func: Callable,
kwargs: dict[str, Any], kwargs: dict[str, Any],
is_rpc: bool = True, is_rpc: bool = True,
@ -80,9 +87,10 @@ async def _invoke(
''' '''
__tracebackhide__ = True __tracebackhide__ = True
treat_as_gen = False treat_as_gen: bool = False
failed_resp: bool = False
# possible a traceback (not sure what typing is for this..) # possibly a traceback (not sure what typing is for this..)
tb = None tb = None
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
@ -183,7 +191,8 @@ async def _invoke(
ctx._scope_nursery = scope_nursery ctx._scope_nursery = scope_nursery
cs = scope_nursery.cancel_scope cs = scope_nursery.cancel_scope
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) res = await coro
await chan.send({'return': res, 'cid': cid})
except trio.MultiError: except trio.MultiError:
# if a context error was set then likely # if a context error was set then likely
@ -197,10 +206,15 @@ async def _invoke(
# XXX: only pop the context tracking if # XXX: only pop the context tracking if
# a ``@tractor.context`` entrypoint was called # a ``@tractor.context`` entrypoint was called
assert chan.uid assert chan.uid
# don't pop the local context until we know the
# associated child isn't in debug any more
await _debug.maybe_wait_for_debugger()
ctx = actor._contexts.pop((chan.uid, cid)) ctx = actor._contexts.pop((chan.uid, cid))
if ctx: if ctx:
log.runtime( log.runtime(
f'Context entrypoint for {func} was terminated:\n{ctx}' f'Context entrypoint {func} was terminated:\n{ctx}'
) )
assert cs assert cs
@ -228,12 +242,29 @@ async def _invoke(
else: else:
# regular async function # regular async function
await chan.send({'functype': 'asyncfunc', 'cid': cid}) try:
await chan.send({'functype': 'asyncfunc', 'cid': cid})
except trio.BrokenResourceError:
failed_resp = True
if is_rpc:
raise
else:
log.warning(
f'Failed to respond to non-rpc request: {func}'
)
with cancel_scope as cs: with cancel_scope as cs:
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) result = await coro
log.cancel(f'result: {result}')
if not failed_resp:
# only send result if we know IPC isn't down
await chan.send({'return': result, 'cid': cid})
except (Exception, trio.MultiError) as err: except (
Exception,
trio.MultiError
) as err:
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
@ -267,7 +298,14 @@ async def _invoke(
try: try:
await chan.send(err_msg) await chan.send(err_msg)
except trio.ClosedResourceError: # TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
):
# if we can't propagate the error that's a big boo boo # if we can't propagate the error that's a big boo boo
log.error( log.error(
f"Failed to ship error to caller @ {chan.uid} !?" f"Failed to ship error to caller @ {chan.uid} !?"
@ -316,7 +354,9 @@ async def try_ship_error_to_parent(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
log.error( # in SC terms this is one of the worst things that can
# happen and creates the 2-general's dilemma.
log.critical(
f"Failed to ship error to parent " f"Failed to ship error to parent "
f"{channel.uid}, channel was closed" f"{channel.uid}, channel was closed"
) )
@ -424,7 +464,7 @@ class Actor:
# (chan, cid) -> (cancel_scope, func) # (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[ self._rpc_tasks: dict[
tuple[Channel, str], tuple[Channel, str],
tuple[trio.CancelScope, typing.Callable, trio.Event] tuple[trio.CancelScope, Callable, trio.Event]
] = {} ] = {}
# map {actor uids -> Context} # map {actor uids -> Context}
@ -491,13 +531,20 @@ class Actor:
mne = ModuleNotExposed(*err.args) mne = ModuleNotExposed(*err.args)
if ns == '__main__': if ns == '__main__':
msg = ( modpath = '__name__'
"\n\nMake sure you exposed the current module using:\n\n" else:
"ActorNursery.start_actor(<name>, enable_modules=" modpath = f"'{ns}'"
"[__name__])"
)
mne.msg += msg msg = (
"\n\nMake sure you exposed the target module, `{ns}`, "
"using:\n"
"ActorNursery.start_actor(<name>, enable_modules=[{mod}])"
).format(
ns=ns,
mod=modpath,
)
mne.msg += msg
raise mne raise mne
@ -513,6 +560,7 @@ class Actor:
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
uid: Optional[tuple[str, str]] = chan.uid
log.runtime(f"New connection to us {chan}") log.runtime(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
@ -560,39 +608,51 @@ class Actor:
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
local_nursery: Optional[ActorNursery] = None # noqa
disconnected: bool = False
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
try: try:
await self._process_messages(chan) disconnected = await self._process_messages(chan)
except trio.Cancelled: except (
trio.Cancelled,
):
log.cancel(f"Msg loop was cancelled for {chan}") log.cancel(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
# This is set in ``Portal.cancel_actor()``. So if # This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them # the peer was cancelled we try to wait for them
# to tear down their side of the connection before # to tear down their side of the connection before
# moving on with closing our own side. # moving on with closing our own side.
local_nursery = self._actoruid2nursery.get(chan.uid)
if ( if (
local_nursery local_nursery
): ):
log.cancel(f"Waiting on cancel request to peer {chan.uid}") log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the remote # underlying transport protocol) to close from the
# peer side since we presume that any channel which # remote peer side since we presume that any channel
# is mapped to a sub-actor (i.e. it's managed by # which is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries) # one of our local nurseries) has a message is sent to
# message is sent to the peer likely by this actor which is # the peer likely by this actor (which is now in
# now in a cancelled condition) when the local runtime here # a cancelled condition) when the local runtime here is
# is now cancelled while (presumably) in the middle of msg # now cancelled while (presumably) in the middle of msg
# loop processing. # loop processing.
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
# Attempt to wait for the far end to close the channel # Attempt to wait for the far end to close the channel
# and bail after timeout (2-generals on closure). # and bail after timeout (2-generals on closure).
assert chan.msgstream assert chan.msgstream
log.runtime(
f'Draining lingering msgs from stream {chan.msgstream}'
)
async for msg in chan.msgstream.drain(): async for msg in chan.msgstream.drain():
# try to deliver any lingering msgs # try to deliver any lingering msgs
# before we destroy the channel. # before we destroy the channel.
@ -609,6 +669,21 @@ class Actor:
await local_nursery.exited.wait() await local_nursery.exited.wait()
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report that the
# IPC layer may have failed unexpectedly since it may be
# the cause of other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
_, proc, _ = entry
poll = getattr(proc, 'poll', None)
if poll and poll() is None:
log.cancel(
f'Actor {uid} IPC broke but proc is alive?'
)
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
@ -618,7 +693,7 @@ class Actor:
if not chans: if not chans:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(uid, None)
# for (uid, cid) in self._contexts.copy(): # for (uid, cid) in self._contexts.copy():
# if chan.uid == uid: # if chan.uid == uid:
@ -626,11 +701,13 @@ class Actor:
log.runtime(f"Peers is {self._peers}") log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected # No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channels") log.runtime("Signalling no more peer channels")
self._no_more_peers.set() self._no_more_peers.set()
# # XXX: is this necessary (GC should do it?) # XXX: is this necessary (GC should do it)?
if chan.connected(): if chan.connected():
# if the channel is still connected it may mean the far # if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to # end has not closed and we may have gotten here due to
@ -647,7 +724,7 @@ class Actor:
# await chan.aclose() # await chan.aclose()
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.warning(f"Channel for {chan.uid} was already closed") log.runtime(f"Channel {chan.uid} was already closed")
async def _push_result( async def _push_result(
self, self,
@ -665,8 +742,8 @@ class Actor:
ctx = self._contexts[(uid, cid)] ctx = self._contexts[(uid, cid)]
except KeyError: except KeyError:
log.warning( log.warning(
f'Ignoring msg from [no-longer/un]known context with {uid}:' f'Ignoring msg from [no-longer/un]known context {uid}:'
f'\n{msg}') f'\n{msg}')
return return
send_chan = ctx._send_chan send_chan = ctx._send_chan
@ -813,7 +890,7 @@ class Actor:
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> bool:
''' '''
Process messages for the channel async-RPC style. Process messages for the channel async-RPC style.
@ -839,7 +916,7 @@ class Actor:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.cancel( log.cancel(
f"Channerl to {chan.uid} terminated?\n" f"Channel to {chan.uid} terminated?\n"
"Cancelling all associated tasks..") "Cancelling all associated tasks..")
for (channel, cid) in self._rpc_tasks.copy(): for (channel, cid) in self._rpc_tasks.copy():
@ -881,14 +958,16 @@ class Actor:
log.runtime( log.runtime(
f"Processing request from {actorid}\n" f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})") f"{ns}.{funcname}({kwargs})")
if ns == 'self': if ns == 'self':
func = getattr(self, funcname) func = getattr(self, funcname)
if funcname == 'cancel': if funcname == 'cancel':
# don't start entire actor runtime # don't start entire actor runtime
# cancellation if this actor is in debug # cancellation if this actor is in debug
# mode # mode
pdb_complete = _debug._local_pdb_complete pdb_complete = _debug.Lock.local_pdb_complete
if pdb_complete: if pdb_complete:
await pdb_complete.wait() await pdb_complete.wait()
@ -919,12 +998,22 @@ class Actor:
# ``_async_main()`` # ``_async_main()``
kwargs['chan'] = chan kwargs['chan'] = chan
log.cancel( log.cancel(
f'{self.uid} was remotely cancelled by\n' f'Remote request to cancel task\n'
f'{chan.uid}!' f'remote actor: {chan.uid}\n'
) f'task: {cid}'
await _invoke(
self, cid, chan, func, kwargs, is_rpc=False
) )
try:
await _invoke(
self,
cid,
chan,
func,
kwargs,
is_rpc=False,
)
except BaseException:
log.exception("failed to cancel task?")
continue continue
else: else:
# complain to client about restricted modules # complain to client about restricted modules
@ -986,6 +1075,9 @@ class Actor:
# up. # up.
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn = self._service_n sn = self._service_n
@ -1010,6 +1102,9 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
# transport **was not** disconnected
return False
async def _from_parent( async def _from_parent(
self, self,
parent_addr: Optional[tuple[str, int]], parent_addr: Optional[tuple[str, int]],
@ -1323,7 +1418,7 @@ class Actor:
# kill any debugger request task to avoid deadlock # kill any debugger request task to avoid deadlock
# with the root actor in this tree # with the root actor in this tree
dbcs = _debug._debugger_request_cs dbcs = _debug.Lock._debugger_request_cs
if dbcs is not None: if dbcs is not None:
log.cancel("Cancelling active debugger request") log.cancel("Cancelling active debugger request")
dbcs.cancel() dbcs.cancel()
@ -1356,12 +1451,14 @@ class Actor:
# n.cancel_scope.cancel() # n.cancel_scope.cancel()
async def _cancel_task(self, cid, chan): async def _cancel_task(self, cid, chan):
"""Cancel a local task by call-id / channel. '''
Cancel a local task by call-id / channel.
Note this method will be treated as a streaming function Note this method will be treated as a streaming function
by remote actor-callers due to the declaration of ``ctx`` by remote actor-callers due to the declaration of ``ctx``
in the signature (for now). in the signature (for now).
"""
'''
# right now this is only implicitly called by # right now this is only implicitly called by
# streaming IPC but it should be called # streaming IPC but it should be called
# to cancel any remotely spawned task # to cancel any remotely spawned task

View File

@ -18,8 +18,10 @@
Multi-core debugging for da peeps! Multi-core debugging for da peeps!
""" """
from __future__ import annotations
import bdb import bdb
import sys import sys
import signal
from functools import partial from functools import partial
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from typing import ( from typing import (
@ -29,16 +31,18 @@ from typing import (
AsyncIterator, AsyncIterator,
AsyncGenerator, AsyncGenerator,
) )
from types import FrameType
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from .log import get_logger from .log import get_logger
from . import _state
from ._discovery import get_root from ._discovery import get_root
from ._state import is_root_process, debug_mode from ._state import is_root_process, debug_mode
from ._exceptions import is_multi_cancelled from ._exceptions import is_multi_cancelled
from ._ipc import Channel
try: try:
# wtf: only exported when installed in dev mode? # wtf: only exported when installed in dev mode?
@ -46,7 +50,8 @@ try:
except ImportError: except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff # pdbpp is installed in regular mode...it monkey patches stuff
import pdb import pdb
assert pdb.xpm, "pdbpp is not installed?" # type: ignore xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb pdbpp = pdb
log = get_logger(__name__) log = get_logger(__name__)
@ -55,102 +60,122 @@ log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem'] __all__ = ['breakpoint', 'post_mortem']
# TODO: wrap all these in a static global class: ``DebugLock`` maybe? class Lock:
'''
Actor global debug lock state.
# placeholder for function to set a ``trio.Event`` on debugger exit Mostly to avoid a lot of ``global`` declarations for now XD.
_pdb_release_hook: Optional[Callable] = None
# actor-wide variable pointing to current task name using debugger '''
_local_task_in_debug: Optional[str] = None # placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None
# actor tree-wide actor uid that supposedly has the tty lock # actor-wide variable pointing to current task name using debugger
_global_actor_in_debug: Optional[Tuple[str, str]] = None local_task_in_debug: Optional[str] = None
# lock in root actor preventing multi-access to local tty # actor tree-wide actor uid that supposedly has the tty lock
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() global_actor_in_debug: Optional[Tuple[str, str]] = None
_local_pdb_complete: Optional[trio.Event] = None
_no_remote_has_tty: Optional[trio.Event] = None
# XXX: set by the current task waiting on the root tty lock local_pdb_complete: Optional[trio.Event] = None
# and must be cancelled if this actor is cancelled via message no_remote_has_tty: Optional[trio.Event] = None
# otherwise deadlocks with the parent actor may ensure
_debugger_request_cs: Optional[trio.CancelScope] = None # lock in root actor preventing multi-access to local tty
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
# XXX: set by the current task waiting on the root tty lock
# and must be cancelled if this actor is cancelled via message
# otherwise deadlocks with the parent actor may ensure
_debugger_request_cs: Optional[trio.CancelScope] = None
_orig_sigint_handler: Optional[Callable] = None
@classmethod
def shield_sigint(cls):
cls._orig_sigint_handler = signal.signal(
signal.SIGINT,
shield_sigint,
)
@classmethod
def unshield_sigint(cls):
if cls._orig_sigint_handler is not None:
# restore original sigint handler
signal.signal(
signal.SIGINT,
cls._orig_sigint_handler
)
cls._orig_sigint_handler = None
@classmethod
def release(cls):
try:
cls._debug_lock.release()
except RuntimeError:
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = cls._debug_lock.statistics().owner
if owner:
raise
# actor-local state, irrelevant for non-root.
cls.global_actor_in_debug = None
cls.local_task_in_debug = None
try:
# sometimes the ``trio`` might already be terminated in
# which case this call will raise.
cls.local_pdb_complete.set()
finally:
# restore original sigint handler
cls.unshield_sigint()
class TractorConfig(pdbpp.DefaultConfig): class TractorConfig(pdbpp.DefaultConfig):
"""Custom ``pdbpp`` goodness. '''
""" Custom ``pdbpp`` goodness.
'''
# use_pygments = True
# sticky_by_default = True # sticky_by_default = True
enable_hidden_frames = False
class PdbwTeardown(pdbpp.Pdb): class MultiActorPdb(pdbpp.Pdb):
"""Add teardown hooks to the regular ``pdbpp.Pdb``. '''
""" Add teardown hooks to the regular ``pdbpp.Pdb``.
'''
# override the pdbpp config with our coolio one # override the pdbpp config with our coolio one
DefaultConfig = TractorConfig DefaultConfig = TractorConfig
# def preloop(self):
# print('IN PRELOOP')
# super().preloop()
# TODO: figure out how to disallow recursive .set_trace() entry # TODO: figure out how to disallow recursive .set_trace() entry
# since that'll cause deadlock for us. # since that'll cause deadlock for us.
def set_continue(self): def set_continue(self):
try: try:
super().set_continue() super().set_continue()
finally: finally:
global _local_task_in_debug Lock.release()
_local_task_in_debug = None
_pdb_release_hook()
def set_quit(self): def set_quit(self):
try: try:
super().set_quit() super().set_quit()
finally: finally:
global _local_task_in_debug Lock.release()
_local_task_in_debug = None
_pdb_release_hook()
# TODO: will be needed whenever we get to true remote debugging.
# XXX see https://github.com/goodboy/tractor/issues/130
# # TODO: is there some way to determine this programatically?
# _pdb_exit_patterns = tuple(
# str.encode(patt + "\n") for patt in (
# 'c', 'cont', 'continue', 'q', 'quit')
# )
# def subactoruid2proc(
# actor: 'Actor', # noqa
# uid: Tuple[str, str]
# ) -> trio.Process:
# n = actor._actoruid2nursery[uid]
# _, proc, _ = n._children[uid]
# return proc
# async def hijack_stdin():
# log.info(f"Hijacking stdin from {actor.uid}")
# trap std in and relay to subproc
# async_stdin = trio.wrap_file(sys.stdin)
# async with aclosing(async_stdin):
# async for msg in async_stdin:
# log.runtime(f"Stdin input:\n{msg}")
# # encode to bytes
# bmsg = str.encode(msg)
# # relay bytes to subproc over pipe
# # await proc.stdin.send_all(bmsg)
# if bmsg in _pdb_exit_patterns:
# log.info("Closing stdin hijack")
# break
@acm @acm
async def _acquire_debug_lock( async def _acquire_debug_lock_from_root_task(
uid: Tuple[str, str] uid: Tuple[str, str]
) -> AsyncIterator[trio.StrictFIFOLock]: ) -> AsyncIterator[trio.StrictFIFOLock]:
'''Acquire a root-actor local FIFO lock which tracks mutex access of '''
Acquire a root-actor local FIFO lock which tracks mutex access of
the process tree's global debugger breakpoint. the process tree's global debugger breakpoint.
This lock avoids tty clobbering (by preventing multiple processes This lock avoids tty clobbering (by preventing multiple processes
@ -158,94 +183,86 @@ async def _acquire_debug_lock(
to the ``pdb`` repl. to the ``pdb`` repl.
''' '''
global _debug_lock, _global_actor_in_debug, _no_remote_has_tty
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
log.debug( log.runtime(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}" f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
) )
we_acquired = False we_acquired = False
if _no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
_no_remote_has_tty = trio.Event()
try: try:
log.debug( log.runtime(
f"entering lock checkpoint, remote task: {task_name}:{uid}" f"entering lock checkpoint, remote task: {task_name}:{uid}"
) )
we_acquired = True we_acquired = True
await _debug_lock.acquire() await Lock._debug_lock.acquire()
_global_actor_in_debug = uid if Lock.no_remote_has_tty is None:
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") # mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
Lock.no_remote_has_tty = trio.Event()
Lock.global_actor_in_debug = uid
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
# NOTE: critical section: this yield is unshielded! # NOTE: critical section: this yield is unshielded!
# IF we received a cancel during the shielded lock entry of some # IF we received a cancel during the shielded lock entry of some
# next-in-queue requesting task, then the resumption here will # next-in-queue requesting task, then the resumption here will
# result in that ``trio.Cancelled`` being raised to our caller # result in that ``trio.Cancelled`` being raised to our caller
# (likely from ``_hijack_stdin_for_child()`` below)! In # (likely from ``lock_tty_for_child()`` below)! In
# this case the ``finally:`` below should trigger and the # this case the ``finally:`` below should trigger and the
# surrounding caller side context should cancel normally # surrounding caller side context should cancel normally
# relaying back to the caller. # relaying back to the caller.
yield _debug_lock yield Lock._debug_lock
finally: finally:
# if _global_actor_in_debug == uid: if (
if we_acquired and _debug_lock.locked(): we_acquired
_debug_lock.release() and Lock._debug_lock.locked()
):
Lock._debug_lock.release()
# IFF there are no more requesting tasks queued up fire, the # IFF there are no more requesting tasks queued up fire, the
# "tty-unlocked" event thereby alerting any monitors of the lock that # "tty-unlocked" event thereby alerting any monitors of the lock that
# we are now back in the "tty unlocked" state. This is basically # we are now back in the "tty unlocked" state. This is basically
# and edge triggered signal around an empty queue of sub-actor # and edge triggered signal around an empty queue of sub-actor
# tasks that may have tried to acquire the lock. # tasks that may have tried to acquire the lock.
stats = _debug_lock.statistics() stats = Lock._debug_lock.statistics()
if ( if (
not stats.owner not stats.owner
): ):
log.debug(f"No more tasks waiting on tty lock! says {uid}") log.runtime(f"No more tasks waiting on tty lock! says {uid}")
_no_remote_has_tty.set() if Lock.no_remote_has_tty is not None:
_no_remote_has_tty = None Lock.no_remote_has_tty.set()
Lock.no_remote_has_tty = None
_global_actor_in_debug = None Lock.global_actor_in_debug = None
log.debug(f"TTY lock released, remote task: {task_name}:{uid}") log.runtime(
f"TTY lock released, remote task: {task_name}:{uid}"
def handler(signum, frame, *args):
"""Specialized debugger compatible SIGINT handler.
In childred we always ignore to avoid deadlocks since cancellation
should always be managed by the parent supervising actor. The root
is always cancelled on ctrl-c.
"""
if is_root_process():
tractor.current_actor().cancel_soon()
else:
print(
"tractor ignores SIGINT while in debug mode\n"
"If you have a special need for it please open an issue.\n"
) )
@tractor.context @tractor.context
async def _hijack_stdin_for_child( async def lock_tty_for_child(
ctx: tractor.Context, ctx: tractor.Context,
subactor_uid: Tuple[str, str] subactor_uid: Tuple[str, str]
) -> str: ) -> str:
''' '''
Hijack the tty in the root process of an actor tree such that Lock the TTY in the root process of an actor tree in a new
the pdbpp debugger console can be allocated to a sub-actor for repl inter-actor-context-task such that the ``pdbpp`` debugger console
bossing. can be mutex-allocated to the calling sub-actor for REPL control
without interference by other processes / threads.
NOTE: this task must be invoked in the root process of the actor
tree. It is meant to be invoked as an rpc-task and should be
highly reliable at releasing the mutex complete!
''' '''
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
@ -259,47 +276,28 @@ async def _hijack_stdin_for_child(
) )
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock") log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
Lock.shield_sigint()
with trio.CancelScope(shield=True): try:
with (
try: trio.CancelScope(shield=True),
lock = None ):
async with _acquire_debug_lock(subactor_uid) as lock: async with _acquire_debug_lock_from_root_task(subactor_uid):
# indicate to child that we've locked stdio # indicate to child that we've locked stdio
await ctx.started('Locked') await ctx.started('Locked')
log.debug(f"Actor {subactor_uid} acquired stdin hijack lock") log.debug(
f"Actor {subactor_uid} acquired stdin hijack lock"
)
# wait for unlock pdb by child # wait for unlock pdb by child
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
assert await stream.receive() == 'pdb_unlock' assert await stream.receive() == 'pdb_unlock'
# try: return "pdb_unlock_complete"
# assert await stream.receive() == 'pdb_unlock'
except ( finally:
# BaseException, Lock.unshield_sigint()
trio.MultiError,
trio.BrokenResourceError,
trio.Cancelled, # by local cancellation
trio.ClosedResourceError, # by self._rx_chan
) as err:
# XXX: there may be a race with the portal teardown
# with the calling actor which we can safely ignore.
# The alternative would be sending an ack message
# and allowing the client to wait for us to teardown
# first?
if lock and lock.locked():
lock.release()
if isinstance(err, trio.Cancelled):
raise
finally:
log.debug(
"TTY lock released, remote task:"
f"{task_name}:{subactor_uid}")
return "pdb_unlock_complete"
async def wait_for_parent_stdin_hijack( async def wait_for_parent_stdin_hijack(
@ -307,20 +305,21 @@ async def wait_for_parent_stdin_hijack(
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED
): ):
''' '''
Connect to the root actor via a ctx and invoke a task which locks Connect to the root actor via a ``Context`` and invoke a task which
a root-local TTY lock. locks a root-local TTY lock: ``lock_tty_for_child()``; this func
should be called in a new task from a child actor **and never the
root*.
This function is used by any sub-actor to acquire mutex access to This function is used by any sub-actor to acquire mutex access to
pdb and the root's TTY for interactive debugging (see below inside the ``pdb`` REPL and thus the root's TTY for interactive debugging
``_breakpoint()``). It can be used to ensure that an intermediate (see below inside ``_breakpoint()``). It can be used to ensure that
nursery-owning actor does not clobber its children if they are in an intermediate nursery-owning actor does not clobber its children
debug (see below inside ``maybe_wait_for_debugger()``). if they are in debug (see below inside
``maybe_wait_for_debugger()``).
''' '''
global _debugger_request_cs
with trio.CancelScope(shield=True) as cs: with trio.CancelScope(shield=True) as cs:
_debugger_request_cs = cs Lock._debugger_request_cs = cs
try: try:
async with get_root() as portal: async with get_root() as portal:
@ -328,7 +327,7 @@ async def wait_for_parent_stdin_hijack(
# this syncs to child's ``Context.started()`` call. # this syncs to child's ``Context.started()`` call.
async with portal.open_context( async with portal.open_context(
tractor._debug._hijack_stdin_for_child, tractor._debug.lock_tty_for_child,
subactor_uid=actor_uid, subactor_uid=actor_uid,
) as (ctx, val): ) as (ctx, val):
@ -338,28 +337,45 @@ async def wait_for_parent_stdin_hijack(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# unblock local caller # unblock local caller
task_status.started(cs)
try: try:
assert _local_pdb_complete assert Lock.local_pdb_complete
await _local_pdb_complete.wait() task_status.started(cs)
await Lock.local_pdb_complete.wait()
finally: finally:
# TODO: shielding currently can cause hangs... # TODO: shielding currently can cause hangs...
with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
await stream.send('pdb_unlock') await stream.send('pdb_unlock')
# sync with callee termination # sync with callee termination
assert await ctx.result() == "pdb_unlock_complete" assert await ctx.result() == "pdb_unlock_complete"
log.pdb('unlocked context')
except tractor.ContextCancelled: except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock') log.warning('Root actor cancelled debug lock')
finally: finally:
log.debug(f"Exiting debugger for actor {actor_uid}") log.pdb(f"Exiting debugger for actor {actor_uid}")
global _local_task_in_debug Lock.local_task_in_debug = None
_local_task_in_debug = None log.pdb(f"Child {actor_uid} released parent stdio lock")
log.debug(f"Child {actor_uid} released parent stdio lock")
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb = MultiActorPdb()
# signal.signal = pdbpp.hideframe(signal.signal)
Lock.shield_sigint()
# XXX: These are the important flags mentioned in
# https://github.com/python-trio/trio/issues/1155
# which resolve the traceback spews to console.
pdb.allow_kbdint = True
pdb.nosigint = True
return pdb, Lock.unshield_sigint
async def _breakpoint( async def _breakpoint(
@ -370,34 +386,39 @@ async def _breakpoint(
# shield: bool = False # shield: bool = False
) -> None: ) -> None:
'''``tractor`` breakpoint entry for engaging pdb machinery '''
in the root or a subactor. Breakpoint entry for engaging debugger instance sync-interaction,
from async code, executing in actor runtime (task).
''' '''
__tracebackhide__ = True
pdb, undo_sigint = mk_mpdb()
actor = tractor.current_actor()
task_name = trio.lowlevel.current_task().name
# TODO: is it possible to debug a trio.Cancelled except block? # TODO: is it possible to debug a trio.Cancelled except block?
# right now it seems like we can kinda do with by shielding # right now it seems like we can kinda do with by shielding
# around ``tractor.breakpoint()`` but not if we move the shielded # around ``tractor.breakpoint()`` but not if we move the shielded
# scope here??? # scope here???
# with trio.CancelScope(shield=shield): # with trio.CancelScope(shield=shield):
# await trio.lowlevel.checkpoint()
actor = tractor.current_actor() if not Lock.local_pdb_complete or Lock.local_pdb_complete.is_set():
task_name = trio.lowlevel.current_task().name Lock.local_pdb_complete = trio.Event()
global _local_pdb_complete, _pdb_release_hook
global _local_task_in_debug, _global_actor_in_debug
await trio.lowlevel.checkpoint()
if not _local_pdb_complete or _local_pdb_complete.is_set():
_local_pdb_complete = trio.Event()
# TODO: need a more robust check for the "root" actor # TODO: need a more robust check for the "root" actor
if actor._parent_chan and not is_root_process(): if (
not is_root_process()
and actor._parent_chan # a connected child
):
if _local_task_in_debug: if Lock.local_task_in_debug:
if _local_task_in_debug == task_name:
# this task already has the lock and is # Recurrence entry case: this task already has the lock and
# likely recurrently entering a breakpoint # is likely recurrently entering a breakpoint
if Lock.local_task_in_debug == task_name:
# noop on recurrent entry case
return return
# if **this** actor is already in debug mode block here # if **this** actor is already in debug mode block here
@ -405,15 +426,12 @@ async def _breakpoint(
# support for recursive entries to `tractor.breakpoint()` # support for recursive entries to `tractor.breakpoint()`
log.warning(f"{actor.uid} already has a debug lock, waiting...") log.warning(f"{actor.uid} already has a debug lock, waiting...")
await _local_pdb_complete.wait() await Lock.local_pdb_complete.wait()
await trio.sleep(0.1) await trio.sleep(0.1)
# mark local actor as "in debug mode" to avoid recurrent # mark local actor as "in debug mode" to avoid recurrent
# entries/requests to the root process # entries/requests to the root process
_local_task_in_debug = task_name Lock.local_task_in_debug = task_name
# assign unlock callback for debugger teardown hooks
_pdb_release_hook = _local_pdb_complete.set
# this **must** be awaited by the caller and is done using the # this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without # root nursery so that the debugger can continue to run without
@ -423,101 +441,232 @@ async def _breakpoint(
# we have to figure out how to avoid having the service nursery # we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below? # cancel on this task start? I *think* this works below?
# actor._service_n.cancel_scope.shield = shield # actor._service_n.cancel_scope.shield = shield
with trio.CancelScope(shield=True): try:
await actor._service_n.start( with trio.CancelScope(shield=True):
wait_for_parent_stdin_hijack, await actor._service_n.start(
actor.uid, wait_for_parent_stdin_hijack,
) actor.uid,
)
except RuntimeError:
Lock.release()
raise
elif is_root_process(): elif is_root_process():
# we also wait in the root-parent for any child that # we also wait in the root-parent for any child that
# may have the tty locked prior # may have the tty locked prior
global _debug_lock
# TODO: wait, what about multiple root tasks acquiring it though? # TODO: wait, what about multiple root tasks acquiring it though?
# root process (us) already has it; ignore if Lock.global_actor_in_debug == actor.uid:
if _global_actor_in_debug == actor.uid: # re-entrant root process already has it: noop.
return return
# XXX: since we need to enter pdb synchronously below, # XXX: since we need to enter pdb synchronously below,
# we have to release the lock manually from pdb completion # we have to release the lock manually from pdb completion
# callbacks. Can't think of a nicer way then this atm. # callbacks. Can't think of a nicer way then this atm.
if _debug_lock.locked(): if Lock._debug_lock.locked():
log.warning( log.warning(
'Root actor attempting to shield-acquire active tty lock' 'Root actor attempting to shield-acquire active tty lock'
f' owned by {_global_actor_in_debug}') f' owned by {Lock.global_actor_in_debug}')
# must shield here to avoid hitting a ``Cancelled`` and # must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty # a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await _debug_lock.acquire() await Lock._debug_lock.acquire()
else: else:
# may be cancelled # may be cancelled
await _debug_lock.acquire() await Lock._debug_lock.acquire()
_global_actor_in_debug = actor.uid Lock.global_actor_in_debug = actor.uid
_local_task_in_debug = task_name Lock.local_task_in_debug = task_name
# the lock must be released on pdb completion try:
def teardown(): # block here one (at the appropriate frame *up*) where
global _local_pdb_complete, _debug_lock # ``breakpoint()`` was awaited and begin handling stdio.
global _global_actor_in_debug, _local_task_in_debug log.debug("Entering the synchronous world of pdb")
debug_func(actor, pdb)
_debug_lock.release() except bdb.BdbQuit:
_global_actor_in_debug = None Lock.release()
_local_task_in_debug = None raise
_local_pdb_complete.set()
_pdb_release_hook = teardown # XXX: apparently we can't do this without showing this frame
# in the backtrace on first entry to the REPL? Seems like an odd
# block here one (at the appropriate frame *up*) where # behaviour that should have been fixed by now. This is also why
# ``breakpoint()`` was awaited and begin handling stdio. # we scrapped all the @cm approaches that were tried previously.
log.debug("Entering the synchronous world of pdb") # finally:
debug_func(actor) # __tracebackhide__ = True
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbpp.hideframe(signal.signal)
# signal.signal(
# signal.SIGINT,
# orig_handler
# )
def _mk_pdb() -> PdbwTeardown: def shield_sigint(
signum: int,
frame: 'frame', # type: ignore # noqa
pdb_obj: Optional[MultiActorPdb] = None,
*args,
# XXX: setting these flags on the pdb instance are absolutely ) -> None:
# critical to having ctrl-c work in the ``trio`` standard way! The '''
# stdlib's pdb supports entering the current sync frame on a SIGINT, Specialized debugger compatible SIGINT handler.
# with ``trio`` we pretty much never want this and if we did we can
# handle it in the ``tractor`` task runtime.
pdb = PdbwTeardown() In childred we always ignore to avoid deadlocks since cancellation
pdb.allow_kbdint = True should always be managed by the parent supervising actor. The root
pdb.nosigint = True is always cancelled on ctrl-c.
return pdb '''
__tracebackhide__ = True
uid_in_debug = Lock.global_actor_in_debug
def _set_trace(actor=None): actor = tractor.current_actor()
pdb = _mk_pdb()
if actor is not None: def do_cancel():
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") # If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
pdb.set_trace( # If the runtime is already cancelled it likely means the user
# start 2 levels up in user code # hit ctrl-c again because teardown didn't full take place in
frame=sys._getframe().f_back.f_back, # which case we do the "hard" raising of a local KBI.
) else:
raise KeyboardInterrupt
any_connected = False
if uid_in_debug is not None:
# try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not
# it's likely they aren't using the TTY lock / debugger
# and we should propagate SIGINT normally.
chans = actor._peers.get(tuple(uid_in_debug))
if chans:
any_connected = any(chan.connected() for chan in chans)
if not any_connected:
log.warning(
'A global actor reported to be in debug '
'but no connection exists for this child:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
# root actor branch that reports whether or not a child
# has locked debugger.
if (
is_root_process()
and uid_in_debug is not None
# XXX: only if there is an existing connection to the
# (sub-)actor in debug do we ignore SIGINT in this
# parent! Otherwise we may hang waiting for an actor
# which has already terminated to unlock.
and any_connected
):
name = uid_in_debug[0]
if name != 'root':
log.pdb(
f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`"
)
else:
log.pdb(
"Ignoring SIGINT while in debug mode"
)
# child actor that has locked the debugger
elif not is_root_process():
chan: Channel = actor._parent_chan
if not chan or not chan.connected():
log.warning(
'A global actor reported to be in debug '
'but no connection exists for its parent:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
task = Lock.local_task_in_debug
if task:
log.pdb(
f"Ignoring SIGINT while task in debug mode: `{task}`"
)
# TODO: how to handle the case of an intermediary-child actor
# that **is not** marked in debug mode? See oustanding issue:
# https://github.com/goodboy/tractor/issues/320
# elif debug_mode():
else: else:
# we entered the global ``breakpoint()`` built-in from sync code log.pdb(
global _local_task_in_debug, _pdb_release_hook "Ignoring SIGINT since debug mode is enabled"
_local_task_in_debug = 'sync'
def nuttin():
pass
_pdb_release_hook = nuttin
pdb.set_trace(
# start 2 levels up in user code
frame=sys._getframe().f_back,
) )
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it lookks to be that the last command that was run (eg. ll)
# will be repeated by default.
# TODO: maybe redraw/print last REPL output to console
if (
pdb_obj
and sys.version_info <= (3, 10)
):
# TODO: make this work like sticky mode where if there is output
# detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above?
# pdb_obj.sticky = True
# pdb_obj._print_if_sticky()
# also see these links for an approach from ``ptk``:
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
# XXX: lol, see ``pdbpp`` issue:
# https://github.com/pdbpp/pdbpp/issues/496
# TODO: pretty sure this is what we should expect to have to run
# in total but for now we're just going to wait until `pdbpp`
# figures out it's own stuff on 3.10 (and maybe we'll help).
# pdb_obj.do_longlist(None)
# XXX: we were doing this but it shouldn't be required..
print(pdb_obj.prompt, end='', flush=True)
def _set_trace(
actor: Optional[tractor._actor.Actor] = None,
pdb: Optional[MultiActorPdb] = None,
):
__tracebackhide__ = True
actor = actor or tractor.current_actor()
# start 2 levels up in user code
frame: Optional[FrameType] = sys._getframe()
if frame:
frame = frame.f_back # type: ignore
if frame and pdb and actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
# no f!#$&* idea, but when we're in async land
# we need 2x frames up?
frame = frame.f_back
else:
pdb, undo_sigint = mk_mpdb()
# we entered the global ``breakpoint()`` built-in from sync code?
Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame)
breakpoint = partial( breakpoint = partial(
_breakpoint, _breakpoint,
@ -525,11 +674,24 @@ breakpoint = partial(
) )
def _post_mortem(actor): def _post_mortem(
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") actor: tractor._actor.Actor,
pdb = _mk_pdb() pdb: MultiActorPdb,
) -> None:
'''
Enter the ``pdbpp`` port mortem entrypoint using our custom
debugger instance.
'''
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
# TODO: you need ``pdbpp`` master (at least this commit
# https://github.com/pdbpp/pdbpp/commit/b757794857f98d53e3ebbe70879663d7d843a6c2)
# to fix this and avoid the hang it causes. See issue:
# https://github.com/pdbpp/pdbpp/issues/480
# TODO: help with a 3.10+ major release if/when it arrives.
# custom Pdb post-mortem entry
pdbpp.xpm(Pdb=lambda: pdb) pdbpp.xpm(Pdb=lambda: pdb)
@ -560,6 +722,7 @@ async def _maybe_enter_pm(err):
): ):
log.debug("Actor crashed, entering debug mode") log.debug("Actor crashed, entering debug mode")
await post_mortem() await post_mortem()
Lock.release()
return True return True
else: else:
@ -575,7 +738,7 @@ async def acquire_debug_lock(
This helper is for actor's who don't actually need This helper is for actor's who don't actually need
to acquired the debugger but want to wait until the to acquired the debugger but want to wait until the
lock is free in the tree root. lock is free in the process-tree root.
''' '''
if not debug_mode(): if not debug_mode():
@ -604,8 +767,6 @@ async def maybe_wait_for_debugger(
if ( if (
is_root_process() is_root_process()
): ):
global _no_remote_has_tty, _global_actor_in_debug, _wait_all_tasks_lock
# If we error in the root but the debugger is # If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and # engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it # thus clobber access to) the local tty since it
@ -617,11 +778,10 @@ async def maybe_wait_for_debugger(
for _ in range(poll_steps): for _ in range(poll_steps):
if _global_actor_in_debug: if Lock.global_actor_in_debug:
sub_in_debug = tuple(_global_actor_in_debug) sub_in_debug = tuple(Lock.global_actor_in_debug)
log.debug( log.debug('Root polling for debug')
'Root polling for debug')
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(poll_delay) await trio.sleep(poll_delay)
@ -632,7 +792,7 @@ async def maybe_wait_for_debugger(
# XXX: doesn't seem to work # XXX: doesn't seem to work
# await trio.testing.wait_all_tasks_blocked(cushion=0) # await trio.testing.wait_all_tasks_blocked(cushion=0)
debug_complete = _no_remote_has_tty debug_complete = Lock.no_remote_has_tty
if ( if (
(debug_complete and (debug_complete and
not debug_complete.is_set()) not debug_complete.is_set())

View File

@ -24,7 +24,8 @@ import importlib
import inspect import inspect
from typing import ( from typing import (
Any, Optional, Any, Optional,
Callable, AsyncGenerator Callable, AsyncGenerator,
Type,
) )
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
@ -442,6 +443,10 @@ class Portal:
_err: Optional[BaseException] = None _err: Optional[BaseException] = None
ctx._portal = self ctx._portal = self
uid = self.channel.uid
cid = ctx.cid
etype: Optional[Type[BaseException]] = None
# deliver context instance and .started() msg value in open tuple. # deliver context instance and .started() msg value in open tuple.
try: try:
async with trio.open_nursery() as scope_nursery: async with trio.open_nursery() as scope_nursery:
@ -477,13 +482,24 @@ class Portal:
# KeyboardInterrupt, # KeyboardInterrupt,
) as err: ) as err:
_err = err etype = type(err)
# the context cancels itself on any cancel # the context cancels itself on any cancel
# causing error. # causing error.
log.cancel(
f'Context to {self.channel.uid} sending cancel request..')
await ctx.cancel() if ctx.chan.connected():
log.cancel(
'Context cancelled for task, sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
await ctx.cancel()
else:
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
f'actor:{uid}'
)
raise raise
finally: finally:
@ -492,7 +508,13 @@ class Portal:
# sure we get the error the underlying feeder mem chan. # sure we get the error the underlying feeder mem chan.
# if it's not raised here it *should* be raised from the # if it's not raised here it *should* be raised from the
# msg loop nursery right? # msg loop nursery right?
result = await ctx.result() if ctx.chan.connected():
log.info(
'Waiting on final context-task result for\n'
f'task: {cid}\n'
f'actor: {uid}'
)
result = await ctx.result()
# though it should be impossible for any tasks # though it should be impossible for any tasks
# operating *in* this scope to have survived # operating *in* this scope to have survived
@ -502,14 +524,17 @@ class Portal:
# should we encapsulate this in the context api? # should we encapsulate this in the context api?
await ctx._recv_chan.aclose() await ctx._recv_chan.aclose()
if _err: if etype:
if ctx._cancel_called: if ctx._cancel_called:
log.cancel( log.cancel(
f'Context {fn_name} cancelled by caller with\n{_err}' f'Context {fn_name} cancelled by caller with\n{etype}'
) )
elif _err is not None: elif _err is not None:
log.cancel( log.cancel(
f'Context {fn_name} cancelled by callee with\n{_err}' f'Context for task cancelled by callee with {etype}\n'
f'target: `{fn_name}`\n'
f'task:{cid}\n'
f'actor:{uid}'
) )
else: else:
log.runtime( log.runtime(
@ -517,6 +542,17 @@ class Portal:
f'value from callee `{result}`' f'value from callee `{result}`'
) )
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
from ._debug import maybe_wait_for_debugger
await maybe_wait_for_debugger()
# remove the context from runtime tracking # remove the context from runtime tracking
self.actor._contexts.pop((self.channel.uid, ctx.cid)) self.actor._contexts.pop((self.channel.uid, ctx.cid))

View File

@ -103,13 +103,7 @@ async def open_root_actor(
_default_arbiter_port, _default_arbiter_port,
) )
if loglevel is None: loglevel = (loglevel or log._default_loglevel).upper()
loglevel = log.get_loglevel()
else:
log._default_loglevel = loglevel
log.get_console_log(loglevel)
assert loglevel
if debug_mode and _spawn._spawn_method == 'trio': if debug_mode and _spawn._spawn_method == 'trio':
_state._runtime_vars['_debug_mode'] = True _state._runtime_vars['_debug_mode'] = True
@ -124,7 +118,7 @@ async def open_root_actor(
logging.getLevelName( logging.getLevelName(
# lul, need the upper case for the -> int map? # lul, need the upper case for the -> int map?
# sweet "dynamic function behaviour" stdlib... # sweet "dynamic function behaviour" stdlib...
loglevel.upper() loglevel,
) > logging.getLevelName('PDB') ) > logging.getLevelName('PDB')
): ):
loglevel = 'PDB' loglevel = 'PDB'
@ -134,19 +128,24 @@ async def open_root_actor(
"Debug mode is only supported for the `trio` backend!" "Debug mode is only supported for the `trio` backend!"
) )
# make a temporary connection to see if an arbiter exists log.get_console_log(loglevel)
arbiter_found = False
try: try:
# make a temporary connection to see if an arbiter exists,
# if one can't be made quickly we assume none exists.
arbiter_found = False
# TODO: this connect-and-bail forces us to have to carefully # TODO: this connect-and-bail forces us to have to carefully
# rewrap TCP 104-connection-reset errors as EOF so as to avoid # rewrap TCP 104-connection-reset errors as EOF so as to avoid
# propagating cancel-causing errors to the channel-msg loop # propagating cancel-causing errors to the channel-msg loop
# machinery. Likely it would be better to eventually have # machinery. Likely it would be better to eventually have
# a "discovery" protocol with basic handshake instead. # a "discovery" protocol with basic handshake instead.
async with _connect_chan(host, port): with trio.move_on_after(1):
arbiter_found = True async with _connect_chan(host, port):
arbiter_found = True
except OSError: except OSError:
# TODO: make this a "discovery" log level?
logger.warning(f"No actor could be found @ {host}:{port}") logger.warning(f"No actor could be found @ {host}:{port}")
# create a local actor and start up its main routine/task # create a local actor and start up its main routine/task
@ -216,7 +215,8 @@ async def open_root_actor(
finally: finally:
# NOTE: not sure if we'll ever need this but it's # NOTE: not sure if we'll ever need this but it's
# possibly better for even more determinism? # possibly better for even more determinism?
# logger.cancel(f'Waiting on {len(nurseries)} nurseries in root..') # logger.cancel(
# f'Waiting on {len(nurseries)} nurseries in root..')
# nurseries = actor._actoruid2nursery.values() # nurseries = actor._actoruid2nursery.values()
# async with trio.open_nursery() as tempn: # async with trio.open_nursery() as tempn:
# for an in nurseries: # for an in nurseries:

View File

@ -307,7 +307,8 @@ async def new_proc(
proc: Optional[trio.Process] = None proc: Optional[trio.Process] = None
try: try:
try: try:
proc = await trio.open_process(spawn_cmd) # TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process(spawn_cmd) # type: ignore
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
@ -334,6 +335,9 @@ async def new_proc(
await proc.wait() await proc.wait()
raise raise
# a sub-proc ref **must** exist now
assert proc
portal = Portal(chan) portal = Portal(chan)
actor_nursery._children[subactor.uid] = ( actor_nursery._children[subactor.uid] = (
subactor, proc, portal) subactor, proc, portal)

View File

@ -601,10 +601,18 @@ class Context:
finally: finally:
if self._portal: if self._portal:
self._portal._streams.remove(rchan) try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
async def result(self) -> Any: async def result(self) -> Any:
'''From a caller side, wait for and return the final result from '''
From a caller side, wait for and return the final result from
the callee side task. the callee side task.
''' '''

View File

@ -271,7 +271,12 @@ def _run_asyncio_task(
task.exception() task.exception()
except BaseException as terr: except BaseException as terr:
task_err = terr task_err = terr
log.exception(f'`asyncio` task: {task.get_name()} errored')
if isinstance(terr, CancelledError):
log.cancel(f'`asyncio` task cancelled: {task.get_name()}')
else:
log.exception(f'`asyncio` task: {task.get_name()} errored')
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
if aio_err is not None: if aio_err is not None: