Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 6a280da597 Add basic ctl-c testing cases to suite 2022-06-27 16:22:45 -04:00
Tyler Goodlet f984fa8daa Hack around double long list print issue..
See https://github.com/pdbpp/pdbpp/issues/496
2022-06-27 16:21:38 -04:00
Tyler Goodlet cc18c84389 Decode bytes prior to log msg 2022-06-26 16:23:38 -04:00
Tyler Goodlet af205c08f2 Show full KBI trace for help with CI hangs 2022-06-26 16:00:14 -04:00
Tyler Goodlet ab557fae21 Move pydantic-click hang example to new dir, skip in test suite 2022-06-26 15:34:33 -04:00
Tyler Goodlet 0b1c1ac568 Drop asyncio-cancelled-itself msg for now, report task names 2022-06-26 15:06:00 -04:00
Tyler Goodlet 9e37bb22e1 Add spaces before values in log msg 2022-06-26 15:06:00 -04:00
Tyler Goodlet 01dea6fe32 Add runtime level msg around channel draining 2022-06-26 15:06:00 -04:00
Tyler Goodlet 79faffd577 Always undo SIGINT overrides , cancel detached children
Ensure that even when `pdb` resumption methods are called during a crash
where `trio`'s runtime has already terminated (eg. `Event.set()` will
raise) we always revert our sigint handler to the original. Further
inside the handler if we hit a case where a child is in debug and
(thinks it) has the global pdb lock, if it has no IPC connection to
a parent, simply presume tty sync-coordination is now lost and cancel
the child immediately.
2022-06-26 15:06:00 -04:00
Tyler Goodlet b3fd5da1be Allow up to 4 `msgpsec` decode failures 2022-06-26 15:06:00 -04:00
9 changed files with 158 additions and 58 deletions

View File

@ -120,4 +120,4 @@ jobs:
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace

View File

@ -1,5 +1,5 @@
""" """
That native debug better work! That "native" debug mode better work!
All these tests can be understood (somewhat) by running the equivalent All these tests can be understood (somewhat) by running the equivalent
`examples/debugging/` scripts manually. `examples/debugging/` scripts manually.
@ -13,6 +13,7 @@ TODO:
import time import time
from os import path from os import path
import platform import platform
from typing import Optional
import pytest import pytest
import pexpect import pexpect
@ -73,6 +74,14 @@ def spawn(
return _spawn return _spawn
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(request) -> bool:
yield request.param
@pytest.mark.parametrize( @pytest.mark.parametrize(
'user_in_out', 'user_in_out',
[ [
@ -137,20 +146,50 @@ def test_root_actor_bp(spawn, user_in_out):
assert expect_err_str in str(child.before) assert expect_err_str in str(child.before)
def test_root_actor_bp_forever(spawn): def do_ctlc(
child,
count: int = 3,
patt: Optional[str] = None,
) -> None:
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
child.sendcontrol('c')
child.expect(r"\(Pdb\+\+\)")
if patt:
# should see the last line on console
before = str(child.before.decode())
assert patt in before
def test_root_actor_bp_forever(
spawn,
ctlc: bool,
):
"Re-enter a breakpoint from the root actor-task." "Re-enter a breakpoint from the root actor-task."
child = spawn('root_actor_breakpoint_forever') child = spawn('root_actor_breakpoint_forever')
# do some "next" commands to demonstrate recurrent breakpoint # do some "next" commands to demonstrate recurrent breakpoint
# entries # entries
for _ in range(10): for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
# do one continue which should trigger a new task to lock the tty if ctlc:
do_ctlc(child)
child.sendline('next')
# do one continue which should trigger a
# new task to lock the tty
child.sendline('continue') child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
if ctlc:
do_ctlc(child)
# XXX: this previously caused a bug! # XXX: this previously caused a bug!
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
@ -158,8 +197,15 @@ def test_root_actor_bp_forever(spawn):
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
# quit out of the loop
child.sendline('q')
child.expect(pexpect.EOF)
def test_subactor_error(spawn):
def test_subactor_error(
spawn,
ctlc: bool,
):
"Single subactor raising an error" "Single subactor raising an error"
child = spawn('subactor_error') child = spawn('subactor_error')
@ -170,23 +216,29 @@ def test_subactor_error(spawn):
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "Attaching to pdb in crashed actor: ('name_error'" in before
# send user command # make sure ctl-c sends don't do anything but repeat output
# (in this case it's the same for 'continue' vs. 'quit') if ctlc:
child.sendline('continue') do_ctlc(
child,
patt='(doggypants)',
)
# the debugger should enter a second time in the nursery # send user command and (in this case it's the same for 'continue'
# vs. 'quit') the debugger should enter a second time in the nursery
# creating actor # creating actor
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
# root actor gets debugger engaged # root actor gets debugger engaged
assert "Attaching to pdb in crashed actor: ('root'" in before assert "Attaching to pdb in crashed actor: ('root'" in before
# error is a remote error propagated from the subactor # error is a remote error propagated from the subactor
assert "RemoteActorError: ('name_error'" in before assert "RemoteActorError: ('name_error'" in before
# another round
if ctlc:
do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect('\r\n') child.expect('\r\n')

View File

@ -81,11 +81,14 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
'example_script', 'example_script',
# walk yields: (dirpath, dirnames, filenames) # walk yields: (dirpath, dirnames, filenames)
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2] [
(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
if '__' not in f if '__' not in f
and f[0] != '_' and f[0] != '_'
and 'debugging' not in p[0]], and 'debugging' not in p[0]
and 'integration' not in p[0]
],
ids=lambda t: t[1], ids=lambda t: t[1],
) )

View File

@ -611,7 +611,8 @@ class Actor:
entry = local_nursery._children.get(uid) entry = local_nursery._children.get(uid)
if entry: if entry:
_, proc, _ = entry _, proc, _ = entry
log.warning(f'Actor {uid}@{proc} IPC connection broke!?') log.warning(
f'Actor {uid}@{proc} IPC connection broke!?')
# if proc.poll() is not None: # if proc.poll() is not None:
# log.error('Actor {uid} proc died and IPC broke?') # log.error('Actor {uid} proc died and IPC broke?')
@ -630,6 +631,11 @@ class Actor:
# Attempt to wait for the far end to close the channel # Attempt to wait for the far end to close the channel
# and bail after timeout (2-generals on closure). # and bail after timeout (2-generals on closure).
assert chan.msgstream assert chan.msgstream
log.runtime(
f'Draining lingering msgs from stream {chan.msgstream}'
)
async for msg in chan.msgstream.drain(): async for msg in chan.msgstream.drain():
# try to deliver any lingering msgs # try to deliver any lingering msgs
# before we destroy the channel. # before we destroy the channel.

View File

@ -41,6 +41,7 @@ from .log import get_logger
from ._discovery import get_root from ._discovery import get_root
from ._state import is_root_process, debug_mode from ._state import is_root_process, debug_mode
from ._exceptions import is_multi_cancelled from ._exceptions import is_multi_cancelled
from ._ipc import Channel
try: try:
@ -396,6 +397,10 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
signal.SIGINT, signal.SIGINT,
partial(shield_sigint, pdb_obj=pdb), partial(shield_sigint, pdb_obj=pdb),
) )
# XXX: These are the important flags mentioned in
# https://github.com/python-trio/trio/issues/1155
# which resolve the traceback spews to console.
pdb.allow_kbdint = True pdb.allow_kbdint = True
pdb.nosigint = True pdb.nosigint = True
@ -464,11 +469,15 @@ async def _breakpoint(
_local_task_in_debug = task_name _local_task_in_debug = task_name
def child_release_hook(): def child_release_hook():
# _local_task_in_debug = None try:
# sometimes the ``trio`` might already be termianated in
# which case this call will raise.
_local_pdb_complete.set() _local_pdb_complete.set()
finally:
# restore original sigint handler # restore original sigint handler
undo_sigint() undo_sigint()
# should always be cleared in the hijack hook aboved right?
# _local_task_in_debug = None
# assign unlock callback for debugger teardown hooks # assign unlock callback for debugger teardown hooks
# _pdb_release_hook = _local_pdb_complete.set # _pdb_release_hook = _local_pdb_complete.set
@ -539,8 +548,12 @@ async def _breakpoint(
_global_actor_in_debug = None _global_actor_in_debug = None
_local_task_in_debug = None _local_task_in_debug = None
_local_pdb_complete.set()
try:
# sometimes the ``trio`` might already be termianated in
# which case this call will raise.
_local_pdb_complete.set()
finally:
# restore original sigint handler # restore original sigint handler
undo_sigint() undo_sigint()
@ -600,7 +613,21 @@ def shield_sigint(
actor = tractor.current_actor() actor = tractor.current_actor()
def do_cancel():
# If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
# If the runtime is already cancelled it likely means the user
# hit ctrl-c again because teardown didn't full take place in
# which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
any_connected = False any_connected = False
if uid_in_debug is not None: if uid_in_debug is not None:
# try to see if the supposed (sub)actor in debug still # try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not # has an active connection to *this* actor, and if not
@ -616,6 +643,7 @@ def shield_sigint(
f'{uid_in_debug}\n' f'{uid_in_debug}\n'
'Allowing SIGINT propagation..' 'Allowing SIGINT propagation..'
) )
return do_cancel()
# root actor branch that reports whether or not a child # root actor branch that reports whether or not a child
# has locked debugger. # has locked debugger.
@ -644,6 +672,16 @@ def shield_sigint(
elif ( elif (
not is_root_process() not is_root_process()
): ):
chan: Channel = actor._parent_chan
if not chan or not chan.connected():
log.warning(
'A global actor reported to be in debug '
'but no connection exists for its parent:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
task = _local_task_in_debug task = _local_task_in_debug
if task: if task:
log.pdb( log.pdb(
@ -659,20 +697,6 @@ def shield_sigint(
"Ignoring SIGINT since debug mode is enabled" "Ignoring SIGINT since debug mode is enabled"
) )
# noone has the debugger so raise KBI
else:
# If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
# If the runtime is already cancelled it likely means the user
# hit ctrl-c again because teardown didn't full take place in
# which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
# maybe redraw/print last REPL output to console # maybe redraw/print last REPL output to console
if pdb_obj: if pdb_obj:
@ -687,8 +711,13 @@ def shield_sigint(
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
try: try:
pdb_obj.do_longlist(None) # XXX: lol, see ``pdbpp`` issue:
print(pdb_obj.prompt, end='', flush=True) # https://github.com/pdbpp/pdbpp/issues/496
# pdb_obj.do_longlist(None)
# pdb_obj.lastcmd = 'longlist'
pdb_obj._printlonglist(max_lines=False)
# print(pdb_obj.prompt, end='', flush=True)
except AttributeError: except AttributeError:
log.exception('pdbpp longlist failed...') log.exception('pdbpp longlist failed...')
raise KeyboardInterrupt raise KeyboardInterrupt
@ -709,7 +738,7 @@ def _set_trace(
# start 2 levels up in user code # start 2 levels up in user code
frame: FrameType = sys._getframe() frame: FrameType = sys._getframe()
if frame: if frame:
frame = frame.f_back.f_back # type: ignore frame = frame.f_back # type: ignore
if pdb and actor is not None: if pdb and actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")

View File

@ -231,7 +231,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
''' '''
import msgspec # noqa import msgspec # noqa
last_decode_failed: bool = False decodes_failed: int = 0
while True: while True:
try: try:
@ -268,12 +268,17 @@ class MsgspecTCPStream(MsgpackTCPStream):
msgspec.DecodeError, msgspec.DecodeError,
UnicodeDecodeError, UnicodeDecodeError,
): ):
if not last_decode_failed: if decodes_failed < 4:
# ignore decoding errors for now and assume they have to # ignore decoding errors for now and assume they have to
# do with a channel drop - hope that receiving from the # do with a channel drop - hope that receiving from the
# channel will raise an expected error and bubble up. # channel will raise an expected error and bubble up.
log.error('`msgspec` failed to decode!?') decoded_bytes = msg_bytes.decode()
last_decode_failed = True log.error(
'`msgspec` failed to decode!?\n'
'dumping bytes:\n'
f'{decoded_bytes}'
)
decodes_failed += 1
else: else:
raise raise

View File

@ -511,8 +511,8 @@ class Portal:
if ctx.chan.connected(): if ctx.chan.connected():
log.info( log.info(
'Waiting on final context-task result for\n' 'Waiting on final context-task result for\n'
f'task:{cid}\n' f'task: {cid}\n'
f'actor:{uid}' f'actor: {uid}'
) )
result = await ctx.result() result = await ctx.result()

View File

@ -259,6 +259,7 @@ def _run_asyncio_task(
nonlocal chan nonlocal chan
aio_err = chan._aio_err aio_err = chan._aio_err
task_err: Optional[BaseException] = None task_err: Optional[BaseException] = None
tname = task.get_name()
# only to avoid ``asyncio`` complaining about uncaptured # only to avoid ``asyncio`` complaining about uncaptured
# task exceptions # task exceptions
@ -268,9 +269,10 @@ def _run_asyncio_task(
task_err = terr task_err = terr
if isinstance(terr, CancelledError): if isinstance(terr, CancelledError):
log.cancel(f'`asyncio` task cancelled: {task.get_name()}') log.cancel(
f'infected `asyncio` task cancelled: {tname}')
else: else:
log.exception(f'`asyncio` task: {task.get_name()} errored') log.exception(f'`asyncio` task: {tname} errored')
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
@ -285,21 +287,24 @@ def _run_asyncio_task(
# We might want to change this in the future though. # We might want to change this in the future though.
from_aio.close() from_aio.close()
if type(aio_err) is CancelledError: # if type(aio_err) is CancelledError:
log.cancel("infected task was cancelled") # if not task_err:
# log.cancel(
# f"infected task {tname} cancelled itself, was not ``trio``"
# )
# TODO: show that the cancellation originated # TODO: show that the cancellation originated
# from the ``trio`` side? right? # from the ``trio`` side? right?
# if cancel_scope.cancelled: # if cancel_scope.cancelled:
# raise aio_err from err # raise aio_err from err
elif task_err is None: if task_err is None:
assert aio_err assert aio_err
aio_err.with_traceback(aio_err.__traceback__) aio_err.with_traceback(aio_err.__traceback__)
msg = ''.join(traceback.format_exception(type(aio_err))) # msg = ''.join(traceback.format_exception(type(aio_err)))
log.error( # log.error(
f'infected task errorred:\n{msg}' # f'infected task errorred:\n{msg}'
) # )
# raise any ``asyncio`` side error. # raise any ``asyncio`` side error.
raise aio_err raise aio_err
@ -392,8 +397,8 @@ async def run_task(
) -> Any: ) -> Any:
''' '''
Run an ``asyncio`` async function or generator in a task, return Run an ``asyncio`` async function or generator in a new task, block
or stream the result back to ``trio``. and return the result back to ``trio``.
''' '''
# simple async func # simple async func