Compare commits

..

No commits in common. "6a280da59728967fb3ef7598adef24034a7b2eef" and "414c59cca6783044c91e90159c8ce3bc7eafee0f" have entirely different histories.

9 changed files with 59 additions and 159 deletions

View File

@ -120,4 +120,4 @@ jobs:
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs

View File

@ -1,5 +1,5 @@
""" """
That "native" debug mode better work! That native debug better work!
All these tests can be understood (somewhat) by running the equivalent All these tests can be understood (somewhat) by running the equivalent
`examples/debugging/` scripts manually. `examples/debugging/` scripts manually.
@ -13,7 +13,6 @@ TODO:
import time import time
from os import path from os import path
import platform import platform
from typing import Optional
import pytest import pytest
import pexpect import pexpect
@ -74,14 +73,6 @@ def spawn(
return _spawn return _spawn
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(request) -> bool:
yield request.param
@pytest.mark.parametrize( @pytest.mark.parametrize(
'user_in_out', 'user_in_out',
[ [
@ -146,50 +137,20 @@ def test_root_actor_bp(spawn, user_in_out):
assert expect_err_str in str(child.before) assert expect_err_str in str(child.before)
def do_ctlc( def test_root_actor_bp_forever(spawn):
child,
count: int = 3,
patt: Optional[str] = None,
) -> None:
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
child.sendcontrol('c')
child.expect(r"\(Pdb\+\+\)")
if patt:
# should see the last line on console
before = str(child.before.decode())
assert patt in before
def test_root_actor_bp_forever(
spawn,
ctlc: bool,
):
"Re-enter a breakpoint from the root actor-task." "Re-enter a breakpoint from the root actor-task."
child = spawn('root_actor_breakpoint_forever') child = spawn('root_actor_breakpoint_forever')
# do some "next" commands to demonstrate recurrent breakpoint # do some "next" commands to demonstrate recurrent breakpoint
# entries # entries
for _ in range(10): for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
if ctlc: # do one continue which should trigger a new task to lock the tty
do_ctlc(child)
child.sendline('next')
# do one continue which should trigger a
# new task to lock the tty
child.sendline('continue') child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
if ctlc:
do_ctlc(child)
# XXX: this previously caused a bug! # XXX: this previously caused a bug!
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
@ -197,15 +158,8 @@ def test_root_actor_bp_forever(
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
# quit out of the loop
child.sendline('q')
child.expect(pexpect.EOF)
def test_subactor_error(spawn):
def test_subactor_error(
spawn,
ctlc: bool,
):
"Single subactor raising an error" "Single subactor raising an error"
child = spawn('subactor_error') child = spawn('subactor_error')
@ -216,29 +170,23 @@ def test_subactor_error(
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "Attaching to pdb in crashed actor: ('name_error'" in before
# make sure ctl-c sends don't do anything but repeat output # send user command
if ctlc: # (in this case it's the same for 'continue' vs. 'quit')
do_ctlc(
child,
patt='(doggypants)',
)
# send user command and (in this case it's the same for 'continue'
# vs. 'quit') the debugger should enter a second time in the nursery
# creating actor
child.sendline('continue') child.sendline('continue')
# the debugger should enter a second time in the nursery
# creating actor
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
# root actor gets debugger engaged # root actor gets debugger engaged
assert "Attaching to pdb in crashed actor: ('root'" in before assert "Attaching to pdb in crashed actor: ('root'" in before
# error is a remote error propagated from the subactor # error is a remote error propagated from the subactor
assert "RemoteActorError: ('name_error'" in before assert "RemoteActorError: ('name_error'" in before
# another round
if ctlc:
do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect('\r\n') child.expect('\r\n')

View File

@ -81,14 +81,11 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
'example_script', 'example_script',
# walk yields: (dirpath, dirnames, filenames) # walk yields: (dirpath, dirnames, filenames)
[ [(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
if '__' not in f if '__' not in f
and f[0] != '_' and f[0] != '_'
and 'debugging' not in p[0] and 'debugging' not in p[0]],
and 'integration' not in p[0]
],
ids=lambda t: t[1], ids=lambda t: t[1],
) )

View File

@ -611,8 +611,7 @@ class Actor:
entry = local_nursery._children.get(uid) entry = local_nursery._children.get(uid)
if entry: if entry:
_, proc, _ = entry _, proc, _ = entry
log.warning( log.warning(f'Actor {uid}@{proc} IPC connection broke!?')
f'Actor {uid}@{proc} IPC connection broke!?')
# if proc.poll() is not None: # if proc.poll() is not None:
# log.error('Actor {uid} proc died and IPC broke?') # log.error('Actor {uid} proc died and IPC broke?')
@ -631,11 +630,6 @@ class Actor:
# Attempt to wait for the far end to close the channel # Attempt to wait for the far end to close the channel
# and bail after timeout (2-generals on closure). # and bail after timeout (2-generals on closure).
assert chan.msgstream assert chan.msgstream
log.runtime(
f'Draining lingering msgs from stream {chan.msgstream}'
)
async for msg in chan.msgstream.drain(): async for msg in chan.msgstream.drain():
# try to deliver any lingering msgs # try to deliver any lingering msgs
# before we destroy the channel. # before we destroy the channel.

View File

@ -41,7 +41,6 @@ from .log import get_logger
from ._discovery import get_root from ._discovery import get_root
from ._state import is_root_process, debug_mode from ._state import is_root_process, debug_mode
from ._exceptions import is_multi_cancelled from ._exceptions import is_multi_cancelled
from ._ipc import Channel
try: try:
@ -397,10 +396,6 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
signal.SIGINT, signal.SIGINT,
partial(shield_sigint, pdb_obj=pdb), partial(shield_sigint, pdb_obj=pdb),
) )
# XXX: These are the important flags mentioned in
# https://github.com/python-trio/trio/issues/1155
# which resolve the traceback spews to console.
pdb.allow_kbdint = True pdb.allow_kbdint = True
pdb.nosigint = True pdb.nosigint = True
@ -469,15 +464,11 @@ async def _breakpoint(
_local_task_in_debug = task_name _local_task_in_debug = task_name
def child_release_hook(): def child_release_hook():
try: # _local_task_in_debug = None
# sometimes the ``trio`` might already be termianated in _local_pdb_complete.set()
# which case this call will raise.
_local_pdb_complete.set() # restore original sigint handler
finally: undo_sigint()
# restore original sigint handler
undo_sigint()
# should always be cleared in the hijack hook aboved right?
# _local_task_in_debug = None
# assign unlock callback for debugger teardown hooks # assign unlock callback for debugger teardown hooks
# _pdb_release_hook = _local_pdb_complete.set # _pdb_release_hook = _local_pdb_complete.set
@ -548,14 +539,10 @@ async def _breakpoint(
_global_actor_in_debug = None _global_actor_in_debug = None
_local_task_in_debug = None _local_task_in_debug = None
_local_pdb_complete.set()
try: # restore original sigint handler
# sometimes the ``trio`` might already be termianated in undo_sigint()
# which case this call will raise.
_local_pdb_complete.set()
finally:
# restore original sigint handler
undo_sigint()
_pdb_release_hook = teardown _pdb_release_hook = teardown
@ -613,21 +600,7 @@ def shield_sigint(
actor = tractor.current_actor() actor = tractor.current_actor()
def do_cancel():
# If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
# If the runtime is already cancelled it likely means the user
# hit ctrl-c again because teardown didn't full take place in
# which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
any_connected = False any_connected = False
if uid_in_debug is not None: if uid_in_debug is not None:
# try to see if the supposed (sub)actor in debug still # try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not # has an active connection to *this* actor, and if not
@ -643,7 +616,6 @@ def shield_sigint(
f'{uid_in_debug}\n' f'{uid_in_debug}\n'
'Allowing SIGINT propagation..' 'Allowing SIGINT propagation..'
) )
return do_cancel()
# root actor branch that reports whether or not a child # root actor branch that reports whether or not a child
# has locked debugger. # has locked debugger.
@ -672,16 +644,6 @@ def shield_sigint(
elif ( elif (
not is_root_process() not is_root_process()
): ):
chan: Channel = actor._parent_chan
if not chan or not chan.connected():
log.warning(
'A global actor reported to be in debug '
'but no connection exists for its parent:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
task = _local_task_in_debug task = _local_task_in_debug
if task: if task:
log.pdb( log.pdb(
@ -697,6 +659,20 @@ def shield_sigint(
"Ignoring SIGINT since debug mode is enabled" "Ignoring SIGINT since debug mode is enabled"
) )
# noone has the debugger so raise KBI
else:
# If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
# If the runtime is already cancelled it likely means the user
# hit ctrl-c again because teardown didn't full take place in
# which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
# maybe redraw/print last REPL output to console # maybe redraw/print last REPL output to console
if pdb_obj: if pdb_obj:
@ -711,13 +687,8 @@ def shield_sigint(
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
try: try:
# XXX: lol, see ``pdbpp`` issue: pdb_obj.do_longlist(None)
# https://github.com/pdbpp/pdbpp/issues/496 print(pdb_obj.prompt, end='', flush=True)
# pdb_obj.do_longlist(None)
# pdb_obj.lastcmd = 'longlist'
pdb_obj._printlonglist(max_lines=False)
# print(pdb_obj.prompt, end='', flush=True)
except AttributeError: except AttributeError:
log.exception('pdbpp longlist failed...') log.exception('pdbpp longlist failed...')
raise KeyboardInterrupt raise KeyboardInterrupt
@ -738,7 +709,7 @@ def _set_trace(
# start 2 levels up in user code # start 2 levels up in user code
frame: FrameType = sys._getframe() frame: FrameType = sys._getframe()
if frame: if frame:
frame = frame.f_back # type: ignore frame = frame.f_back.f_back # type: ignore
if pdb and actor is not None: if pdb and actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")

View File

@ -231,7 +231,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
''' '''
import msgspec # noqa import msgspec # noqa
decodes_failed: int = 0 last_decode_failed: bool = False
while True: while True:
try: try:
@ -268,17 +268,12 @@ class MsgspecTCPStream(MsgpackTCPStream):
msgspec.DecodeError, msgspec.DecodeError,
UnicodeDecodeError, UnicodeDecodeError,
): ):
if decodes_failed < 4: if not last_decode_failed:
# ignore decoding errors for now and assume they have to # ignore decoding errors for now and assume they have to
# do with a channel drop - hope that receiving from the # do with a channel drop - hope that receiving from the
# channel will raise an expected error and bubble up. # channel will raise an expected error and bubble up.
decoded_bytes = msg_bytes.decode() log.error('`msgspec` failed to decode!?')
log.error( last_decode_failed = True
'`msgspec` failed to decode!?\n'
'dumping bytes:\n'
f'{decoded_bytes}'
)
decodes_failed += 1
else: else:
raise raise

View File

@ -511,8 +511,8 @@ class Portal:
if ctx.chan.connected(): if ctx.chan.connected():
log.info( log.info(
'Waiting on final context-task result for\n' 'Waiting on final context-task result for\n'
f'task: {cid}\n' f'task:{cid}\n'
f'actor: {uid}' f'actor:{uid}'
) )
result = await ctx.result() result = await ctx.result()

View File

@ -259,7 +259,6 @@ def _run_asyncio_task(
nonlocal chan nonlocal chan
aio_err = chan._aio_err aio_err = chan._aio_err
task_err: Optional[BaseException] = None task_err: Optional[BaseException] = None
tname = task.get_name()
# only to avoid ``asyncio`` complaining about uncaptured # only to avoid ``asyncio`` complaining about uncaptured
# task exceptions # task exceptions
@ -269,10 +268,9 @@ def _run_asyncio_task(
task_err = terr task_err = terr
if isinstance(terr, CancelledError): if isinstance(terr, CancelledError):
log.cancel( log.cancel(f'`asyncio` task cancelled: {task.get_name()}')
f'infected `asyncio` task cancelled: {tname}')
else: else:
log.exception(f'`asyncio` task: {tname} errored') log.exception(f'`asyncio` task: {task.get_name()} errored')
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
@ -287,24 +285,21 @@ def _run_asyncio_task(
# We might want to change this in the future though. # We might want to change this in the future though.
from_aio.close() from_aio.close()
# if type(aio_err) is CancelledError: if type(aio_err) is CancelledError:
# if not task_err: log.cancel("infected task was cancelled")
# log.cancel(
# f"infected task {tname} cancelled itself, was not ``trio``"
# )
# TODO: show that the cancellation originated # TODO: show that the cancellation originated
# from the ``trio`` side? right? # from the ``trio`` side? right?
# if cancel_scope.cancelled: # if cancel_scope.cancelled:
# raise aio_err from err # raise aio_err from err
if task_err is None: elif task_err is None:
assert aio_err assert aio_err
aio_err.with_traceback(aio_err.__traceback__) aio_err.with_traceback(aio_err.__traceback__)
# msg = ''.join(traceback.format_exception(type(aio_err))) msg = ''.join(traceback.format_exception(type(aio_err)))
# log.error( log.error(
# f'infected task errorred:\n{msg}' f'infected task errorred:\n{msg}'
# ) )
# raise any ``asyncio`` side error. # raise any ``asyncio`` side error.
raise aio_err raise aio_err
@ -397,8 +392,8 @@ async def run_task(
) -> Any: ) -> Any:
''' '''
Run an ``asyncio`` async function or generator in a new task, block Run an ``asyncio`` async function or generator in a task, return
and return the result back to ``trio``. or stream the result back to ``trio``.
''' '''
# simple async func # simple async func