Compare commits

...

42 Commits

Author SHA1 Message Date
Tyler Goodlet 6a280da597 Add basic ctl-c testing cases to suite 2022-06-27 16:22:45 -04:00
Tyler Goodlet f984fa8daa Hack around double long list print issue..
See https://github.com/pdbpp/pdbpp/issues/496
2022-06-27 16:21:38 -04:00
Tyler Goodlet cc18c84389 Decode bytes prior to log msg 2022-06-26 16:23:38 -04:00
Tyler Goodlet af205c08f2 Show full KBI trace for help with CI hangs 2022-06-26 16:00:14 -04:00
Tyler Goodlet ab557fae21 Move pydantic-click hang example to new dir, skip in test suite 2022-06-26 15:34:33 -04:00
Tyler Goodlet 0b1c1ac568 Drop asyncio-cancelled-itself msg for now, report task names 2022-06-26 15:06:00 -04:00
Tyler Goodlet 9e37bb22e1 Add spaces before values in log msg 2022-06-26 15:06:00 -04:00
Tyler Goodlet 01dea6fe32 Add runtime level msg around channel draining 2022-06-26 15:06:00 -04:00
Tyler Goodlet 79faffd577 Always undo SIGINT overrides , cancel detached children
Ensure that even when `pdb` resumption methods are called during a crash
where `trio`'s runtime has already terminated (eg. `Event.set()` will
raise) we always revert our sigint handler to the original. Further
inside the handler if we hit a case where a child is in debug and
(thinks it) has the global pdb lock, if it has no IPC connection to
a parent, simply presume tty sync-coordination is now lost and cancel
the child immediately.
2022-06-26 15:06:00 -04:00
Tyler Goodlet b3fd5da1be Allow up to 4 `msgpsec` decode failures 2022-06-26 15:06:00 -04:00
Tyler Goodlet 414c59cca6 Readme formatting tweaks 2022-06-26 15:06:00 -04:00
Tyler Goodlet 7710213604 Pin to `pdbpp` upstream master, 3.10 problem?
See issues:
- https://github.com/pdbpp/pdbpp/issues/480
- https://github.com/pdbpp/pdbpp/pull/482
2022-06-26 15:06:00 -04:00
Tyler Goodlet 71e779dca3 Tolerate double `.remove()`s of stream on portal teardowns 2022-06-26 15:05:59 -04:00
Tyler Goodlet 5457aa566c Always propagate SIGINT when no locking peer found
A hopefully significant fix here is to always avoid suppressing a SIGINT
when the root actor can not detect an active IPC connections (via
a connected channel) to the supposed debug lock holding actor. In that
case it is most likely that the actor has either terminated or has lost
its connection for debugger control and there is no way the root can
verify the lock is in use; thus we choose to allow KBI cancellation.

Drop the (by comment) `try`-`finally` block in
`_hijoack_stdin_for_child()` around the `_acquire_debug_lock()` call
since all that logic should now be handled internal to that locking
manager. Try to catch a weird error around the `.do_longlist()` method
call that seems to sometimes break on py3.10 and latest `pdbpp`.
2022-06-26 15:05:16 -04:00
Tyler Goodlet 20b902b300 Always call pdb hook even if tty locking fails 2022-04-17 13:15:33 -04:00
Tyler Goodlet 3955906654 Handle a connection reset on `msgspec` transport 2022-04-17 13:15:33 -04:00
Tyler Goodlet af1ee3f0a6 Log cancels with appropriate level 2022-04-17 13:15:33 -04:00
Tyler Goodlet f9f4bcf27c Just warn on IPC breaks 2022-04-17 13:15:33 -04:00
Tyler Goodlet 07ac6eb5d0 Only warn on `trio.BrokenResourceError`s from `_invoke()` 2022-04-17 13:15:33 -04:00
Tyler Goodlet 83ed2f6286 Make example a subpkg for `python -m <mod>` testing 2022-04-17 13:15:33 -04:00
Tyler Goodlet a40b168dfb Add example that triggers bug #302 2022-04-17 13:15:33 -04:00
Tyler Goodlet 1822d0b48b Add back in async gen loop 2022-04-17 13:15:33 -04:00
Tyler Goodlet dbc689d55a Pre-declare disconnected flag 2022-04-17 13:15:33 -04:00
Tyler Goodlet e49cccf666 Avoid attr error XD 2022-04-17 13:15:33 -04:00
Tyler Goodlet 5892d64d6e Type annot updates 2022-04-17 13:15:33 -04:00
Tyler Goodlet e83d158bfb Drop uneeded backframe traceback hide annotation 2022-04-17 13:15:33 -04:00
Tyler Goodlet e107257ac0 Make `Actor._process_messages()` report disconnects
The method now returns a `bool` which flags whether the transport died
to the caller and allows for reporting a disconnect in the
channel-transport handler task. This is something a user will normally
want to know about on the caller side especially after seeing
a traceback from the peer (if in tree) on console.
2022-04-17 13:15:33 -04:00
Tyler Goodlet 83367caf42 Only cancel/get-result from a ctx if transport is up
There's no point in sending a cancel message to the remote linked task
and especially no reason to block waiting on a result from that task if
the transport layer is detected to be disconnected. We expect that the
transport shouldn't go down at the layer of the message loop
(reconnection logic should be handled in the transport layer itself) so
if we detect the channel is not connected we don't bother requesting
cancels nor waiting on a final result message.

Why?

- if the connection goes down in error the caller side won't have a way
  to know "how long" it should block to wait for a cancel ack or result
  and causes a potential hang that may require an additional ctrl-c from
  the user especially if using the debugger or if the traceback is not
  seen on console.
- obviously there's no point in waiting for messages when there's no
  transport to deliver them XD

Further, add some more detailed cancel logging detailing the task and
actor ids.
2022-04-17 13:15:33 -04:00
Tyler Goodlet e6a520c944 Drop high log level in ctx example 2022-04-17 13:15:33 -04:00
Tyler Goodlet 5a83f373ef Typing fixes, simplify `_set_trace()` 2022-04-17 13:15:33 -04:00
Tyler Goodlet 228dfff91c Add notes around py3.10 stdlib bug from `pdb++`
There's a bug that's triggered in the stdlib without latest `pdb++`
installed; add a note for that.

Further inside `wait_for_parent_stdin_hijack()` don't `.started()` until
the interactor stream has been opened to avoid races when debugging this
`._debug.py` module (at the least) since we usually don't want the
spawning (parent) task to resume until we know for sure the tty lock has
been acquired. Also, drop the random checkpoint we had inside
`_breakpoint()`, not sure it was actually adding anything useful since
we're (mostly) carefully shielded throughout this func.
2022-04-17 13:15:33 -04:00
Tyler Goodlet 866f6f9d40 Add and use a pdb instance factory 2022-04-17 13:15:33 -04:00
Tyler Goodlet f9a8543811 A `.open_context()` example that causes a hang!
Finally! I think this may be the root issue we've been seeing in
production in a client project.

No idea yet why this is happening but the fault-causing sequence seems
to be:
- `.open_context()` in a child actor
- enter the debugger via `tractor.breakpoint()`
- continue from that entry via `c` command in REPL
- raise an error just after inside the context task's body

Looking at logging it appears as though the child thinks it has the tty
but no input is accepted on the REPL and a further `ctrl-c` results in
some teardown but also a further hang where both parent and child become
unresponsive..
2022-04-17 13:15:33 -04:00
Tyler Goodlet aa09a31d25 Drop all the `@cm.__exit__()` override attempts..
None of it worked (you still will see `.__exit__()` frames on debugger
entry - you'd think this would have been solved by now but, shrug) so
instead wrap the debugger entry-point in a `try:` and put the SIGINT
handler restoration inside `MultiActorPdb` teardown hooks.

This seems to restore the UX as it was prior but with also giving the
desired SIGINT override handler behaviour.
2022-04-17 13:15:33 -04:00
Tyler Goodlet 9a1dadecff Try overriding `_GeneratorContextManager.__exit__()`; didn't work..
Using either of `@pdb.hideframe` or `__tracebackhide__` on stdlib
methods doesn't seem to work either.. This all seems to have something
to do with async generator usage I think ?
2022-04-17 13:15:33 -04:00
Tyler Goodlet 861884e075 Fix example name typo 2022-04-17 13:15:33 -04:00
Tyler Goodlet e204f858ac Handle a context cancel? Might be a noop 2022-04-17 13:15:33 -04:00
Tyler Goodlet 36e92b9faf Add a pre-started breakpoint example 2022-04-17 13:15:33 -04:00
Tyler Goodlet 742e004810 Make `mypy` happy 2022-04-17 13:15:33 -04:00
Tyler Goodlet ef7921ce11 Refine the handler for child vs. root cases
This gets very close to avoiding any possible hangs to do with tty
locking and SIGINT handling minus a special case that will be detailed
below.

Summary of implementation changes:

- convert `_mk_pdb()` -> `with _open_pdb() as pdb:` which implicitly
  handles the `bdb.BdbQuit` case such that debugger teardown hooks are
  always called.
- rename the handler to `shield_sigint()` and handle a variety of new
  cases:
  * the root is in debug but hasn't been cancelled -> call
    `Actor.cancel_soon()`
  * the root is in debug but *has* been called (`Actor.cancel_soon()`
    already called) -> raise KBI
  * a child is in debug *and* has a task locking the debugger -> ignore
    SIGINT in child *and* the root actor.
- if the debugger instance is provided to the handler at acquire time,
  on SIGINT handling completion re-print the last pdb++ REPL output so
  that the user realizes they are still actively in debug.
- ignore the unlock case where a race condition of "no task" holding the
  lock causes the `RuntimeError` normally associated with the "wrong
  task" doing so (not sure if this is a `trio` bug?).
- change debug logs to runtime level.

Unhandled case(s):

- a child is maybe in debug mode but does not itself have any task using
  the debugger.
    * ToDo: we need a way to decide what to do with
      "intermediate" child actors who themselves either are not in
      `debug_mode=True` but have children who *are* such that a SIGINT
      won't cause cancellation of that child-as-parent-of-another-child
      **iff** any of their children are in in debug mode.
2022-04-17 13:15:33 -04:00
Tyler Goodlet 542fe0372b (facepalm) Reraise `BdbQuit` and discard ownerless lock releases 2022-04-17 13:15:33 -04:00
Tyler Goodlet 3e9998ea83 Add WIP while-debugger-active SIGINT ignore handler 2022-04-17 13:15:33 -04:00
14 changed files with 717 additions and 194 deletions

View File

@ -120,4 +120,4 @@ jobs:
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace

View File

@ -3,13 +3,14 @@
|gh_actions| |gh_actions|
|docs| |docs|
``tractor`` is a `structured concurrent`_, multi-processing_ runtime built on trio_. ``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*": Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
our nurseries_ let you spawn new Python processes which each run a ``trio`` our nurseries_ let you spawn new Python processes which each run a ``trio``
scheduled runtime - a call to ``trio.run()``. scheduled runtime - a call to ``trio.run()``.
We believe the system adhere's to the `3 axioms`_ of an "`actor model`_" We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
but likely *does not* look like what *you* probably think an "actor but likely *does not* look like what *you* probably think an "actor
model" looks like, and that's *intentional*. model" looks like, and that's *intentional*.
@ -579,13 +580,13 @@ say hi, please feel free to reach us in our `matrix channel`_. If
matrix seems too hip, we're also mostly all in the the `trio gitter matrix seems too hip, we're also mostly all in the the `trio gitter
channel`_! channel`_!
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trio: https://github.com/python-trio/trio
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements .. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
.. _actor model: https://en.wikipedia.org/wiki/Actor_model .. _actor model: https://en.wikipedia.org/wiki/Actor_model
.. _trio: https://github.com/python-trio/trio
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles .. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts .. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s

View File

View File

@ -0,0 +1,50 @@
import tractor
import trio
async def gen():
yield 'yo'
await tractor.breakpoint()
yield 'yo'
await tractor.breakpoint()
@tractor.context
async def just_bp(
ctx: tractor.Context,
) -> None:
await ctx.started()
await tractor.breakpoint()
# TODO: bps and errors in this call..
async for val in gen():
print(val)
# await trio.sleep(0.5)
# prematurely destroy the connection
await ctx.chan.aclose()
# THIS CAUSES AN UNRECOVERABLE HANG
# without latest ``pdbpp``:
assert 0
async def main():
async with tractor.open_nursery(
debug_mode=True,
) as n:
p = await n.start_actor(
'bp_boi',
enable_modules=[__name__],
)
async with p.open_context(
just_bp,
) as (ctx, first):
await trio.sleep_forever()
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,49 @@
import trio
import click
import tractor
import pydantic
# from multiprocessing import shared_memory
@tractor.context
async def just_sleep(
ctx: tractor.Context,
**kwargs,
) -> None:
'''
Test a small ping-pong 2-way streaming server.
'''
await ctx.started()
await trio.sleep_forever()
async def main() -> None:
proc = await trio.open_process( (
'python',
'-c',
'import trio; trio.run(trio.sleep_forever)',
))
await proc.wait()
# await trio.sleep_forever()
# async with tractor.open_nursery() as n:
# portal = await n.start_actor(
# 'rpc_server',
# enable_modules=[__name__],
# )
# async with portal.open_context(
# just_sleep, # taken from pytest parameterization
# ) as (ctx, sent):
# await trio.sleep_forever()
if __name__ == '__main__':
import time
# time.sleep(999)
trio.run(main)

View File

@ -54,7 +54,10 @@ setup(
# tooling # tooling
'colorlog', 'colorlog',
'wrapt', 'wrapt',
'pdbpp',
# 3.10 has an outstanding unreleased issue and `pdbpp` itself
# pins to patched forks of its own dependencies as well.
"pdbpp @ git+https://github.com/pdbpp/pdbpp@master#egg=pdbpp", # noqa: E501
# serialization # serialization
'msgpack>=1.0.3', 'msgpack>=1.0.3',

View File

@ -1,5 +1,5 @@
""" """
That native debug better work! That "native" debug mode better work!
All these tests can be understood (somewhat) by running the equivalent All these tests can be understood (somewhat) by running the equivalent
`examples/debugging/` scripts manually. `examples/debugging/` scripts manually.
@ -13,6 +13,7 @@ TODO:
import time import time
from os import path from os import path
import platform import platform
from typing import Optional
import pytest import pytest
import pexpect import pexpect
@ -73,6 +74,14 @@ def spawn(
return _spawn return _spawn
@pytest.fixture(
params=[False, True],
ids='ctl-c={}'.format,
)
def ctlc(request) -> bool:
yield request.param
@pytest.mark.parametrize( @pytest.mark.parametrize(
'user_in_out', 'user_in_out',
[ [
@ -137,20 +146,50 @@ def test_root_actor_bp(spawn, user_in_out):
assert expect_err_str in str(child.before) assert expect_err_str in str(child.before)
def test_root_actor_bp_forever(spawn): def do_ctlc(
child,
count: int = 3,
patt: Optional[str] = None,
) -> None:
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
child.sendcontrol('c')
child.expect(r"\(Pdb\+\+\)")
if patt:
# should see the last line on console
before = str(child.before.decode())
assert patt in before
def test_root_actor_bp_forever(
spawn,
ctlc: bool,
):
"Re-enter a breakpoint from the root actor-task." "Re-enter a breakpoint from the root actor-task."
child = spawn('root_actor_breakpoint_forever') child = spawn('root_actor_breakpoint_forever')
# do some "next" commands to demonstrate recurrent breakpoint # do some "next" commands to demonstrate recurrent breakpoint
# entries # entries
for _ in range(10): for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
# do one continue which should trigger a new task to lock the tty if ctlc:
do_ctlc(child)
child.sendline('next')
# do one continue which should trigger a
# new task to lock the tty
child.sendline('continue') child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
if ctlc:
do_ctlc(child)
# XXX: this previously caused a bug! # XXX: this previously caused a bug!
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
@ -158,8 +197,15 @@ def test_root_actor_bp_forever(spawn):
child.sendline('n') child.sendline('n')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
# quit out of the loop
child.sendline('q')
child.expect(pexpect.EOF)
def test_subactor_error(spawn):
def test_subactor_error(
spawn,
ctlc: bool,
):
"Single subactor raising an error" "Single subactor raising an error"
child = spawn('subactor_error') child = spawn('subactor_error')
@ -170,23 +216,29 @@ def test_subactor_error(spawn):
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "Attaching to pdb in crashed actor: ('name_error'" in before
# send user command # make sure ctl-c sends don't do anything but repeat output
# (in this case it's the same for 'continue' vs. 'quit') if ctlc:
child.sendline('continue') do_ctlc(
child,
patt='(doggypants)',
)
# the debugger should enter a second time in the nursery # send user command and (in this case it's the same for 'continue'
# vs. 'quit') the debugger should enter a second time in the nursery
# creating actor # creating actor
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
# root actor gets debugger engaged # root actor gets debugger engaged
assert "Attaching to pdb in crashed actor: ('root'" in before assert "Attaching to pdb in crashed actor: ('root'" in before
# error is a remote error propagated from the subactor # error is a remote error propagated from the subactor
assert "RemoteActorError: ('name_error'" in before assert "RemoteActorError: ('name_error'" in before
# another round
if ctlc:
do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect('\r\n') child.expect('\r\n')

View File

@ -81,11 +81,14 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
'example_script', 'example_script',
# walk yields: (dirpath, dirnames, filenames) # walk yields: (dirpath, dirnames, filenames)
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2] [
(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
if '__' not in f if '__' not in f
and f[0] != '_' and f[0] != '_'
and 'debugging' not in p[0]], and 'debugging' not in p[0]
and 'integration' not in p[0]
],
ids=lambda t: t[1], ids=lambda t: t[1],
) )

View File

@ -26,8 +26,11 @@ import importlib
import importlib.util import importlib.util
import inspect import inspect
import uuid import uuid
import typing from typing import (
from typing import Any, Optional, Union Any, Optional,
Union, TYPE_CHECKING,
Callable,
)
from types import ModuleType from types import ModuleType
import sys import sys
import os import os
@ -57,6 +60,10 @@ from . import _state
from . import _mp_fixup_main from . import _mp_fixup_main
if TYPE_CHECKING:
from ._supervise import ActorNursery
log = get_logger('tractor') log = get_logger('tractor')
@ -65,7 +72,7 @@ async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
chan: Channel, chan: Channel,
func: typing.Callable, func: Callable,
kwargs: dict[str, Any], kwargs: dict[str, Any],
is_rpc: bool = True, is_rpc: bool = True,
@ -200,7 +207,7 @@ async def _invoke(
ctx = actor._contexts.pop((chan.uid, cid)) ctx = actor._contexts.pop((chan.uid, cid))
if ctx: if ctx:
log.runtime( log.runtime(
f'Context entrypoint for {func} was terminated:\n{ctx}' f'Context entrypoint {func} was terminated:\n{ctx}'
) )
assert cs assert cs
@ -233,7 +240,10 @@ async def _invoke(
task_status.started(cs) task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err: except (
Exception,
trio.MultiError
) as err:
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
@ -267,7 +277,13 @@ async def _invoke(
try: try:
await chan.send(err_msg) await chan.send(err_msg)
except trio.ClosedResourceError: # TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
# if we can't propagate the error that's a big boo boo # if we can't propagate the error that's a big boo boo
log.error( log.error(
f"Failed to ship error to caller @ {chan.uid} !?" f"Failed to ship error to caller @ {chan.uid} !?"
@ -316,7 +332,9 @@ async def try_ship_error_to_parent(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
log.error( # in SC terms this is one of the worst things that can
# happen and creates the 2-general's dilemma.
log.critical(
f"Failed to ship error to parent " f"Failed to ship error to parent "
f"{channel.uid}, channel was closed" f"{channel.uid}, channel was closed"
) )
@ -424,7 +442,7 @@ class Actor:
# (chan, cid) -> (cancel_scope, func) # (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[ self._rpc_tasks: dict[
tuple[Channel, str], tuple[Channel, str],
tuple[trio.CancelScope, typing.Callable, trio.Event] tuple[trio.CancelScope, Callable, trio.Event]
] = {} ] = {}
# map {actor uids -> Context} # map {actor uids -> Context}
@ -513,6 +531,7 @@ class Actor:
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
uid: Optional[tuple[str, str]] = chan.uid
log.runtime(f"New connection to us {chan}") log.runtime(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
@ -560,39 +579,63 @@ class Actor:
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
local_nursery: Optional[ActorNursery] = None # noqa
disconnected: bool = False
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
try: try:
await self._process_messages(chan) disconnected = await self._process_messages(chan)
except trio.Cancelled: except (
trio.Cancelled,
):
log.cancel(f"Msg loop was cancelled for {chan}") log.cancel(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
# This is set in ``Portal.cancel_actor()``. So if # This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them # the peer was cancelled we try to wait for them
# to tear down their side of the connection before # to tear down their side of the connection before
# moving on with closing our own side. # moving on with closing our own side.
local_nursery = self._actoruid2nursery.get(chan.uid)
if ( if (
local_nursery local_nursery
): ):
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report that the
# IPC layer may have failed unexpectedly since it may be
# the cause of other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
_, proc, _ = entry
log.warning(
f'Actor {uid}@{proc} IPC connection broke!?')
# if proc.poll() is not None:
# log.error('Actor {uid} proc died and IPC broke?')
log.cancel(f"Waiting on cancel request to peer {chan.uid}") log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the remote # underlying transport protocol) to close from the
# peer side since we presume that any channel which # remote peer side since we presume that any channel
# is mapped to a sub-actor (i.e. it's managed by # which is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries) # one of our local nurseries) has a message is sent to
# message is sent to the peer likely by this actor which is # the peer likely by this actor (which is now in
# now in a cancelled condition) when the local runtime here # a cancelled condition) when the local runtime here is
# is now cancelled while (presumably) in the middle of msg # now cancelled while (presumably) in the middle of msg
# loop processing. # loop processing.
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
# Attempt to wait for the far end to close the channel # Attempt to wait for the far end to close the channel
# and bail after timeout (2-generals on closure). # and bail after timeout (2-generals on closure).
assert chan.msgstream assert chan.msgstream
log.runtime(
f'Draining lingering msgs from stream {chan.msgstream}'
)
async for msg in chan.msgstream.drain(): async for msg in chan.msgstream.drain():
# try to deliver any lingering msgs # try to deliver any lingering msgs
# before we destroy the channel. # before we destroy the channel.
@ -609,6 +652,8 @@ class Actor:
await local_nursery.exited.wait() await local_nursery.exited.wait()
# if local_nursery._children
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
@ -618,7 +663,7 @@ class Actor:
if not chans: if not chans:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(uid, None)
# for (uid, cid) in self._contexts.copy(): # for (uid, cid) in self._contexts.copy():
# if chan.uid == uid: # if chan.uid == uid:
@ -626,11 +671,13 @@ class Actor:
log.runtime(f"Peers is {self._peers}") log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected # No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channels") log.runtime("Signalling no more peer channels")
self._no_more_peers.set() self._no_more_peers.set()
# # XXX: is this necessary (GC should do it?) # XXX: is this necessary (GC should do it)?
if chan.connected(): if chan.connected():
# if the channel is still connected it may mean the far # if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to # end has not closed and we may have gotten here due to
@ -665,7 +712,7 @@ class Actor:
ctx = self._contexts[(uid, cid)] ctx = self._contexts[(uid, cid)]
except KeyError: except KeyError:
log.warning( log.warning(
f'Ignoring msg from [no-longer/un]known context with {uid}:' f'Ignoring msg from [no-longer/un]known context {uid}:'
f'\n{msg}') f'\n{msg}')
return return
@ -813,7 +860,7 @@ class Actor:
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> bool:
''' '''
Process messages for the channel async-RPC style. Process messages for the channel async-RPC style.
@ -839,7 +886,7 @@ class Actor:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.cancel( log.cancel(
f"Channerl to {chan.uid} terminated?\n" f"Channel to {chan.uid} terminated?\n"
"Cancelling all associated tasks..") "Cancelling all associated tasks..")
for (channel, cid) in self._rpc_tasks.copy(): for (channel, cid) in self._rpc_tasks.copy():
@ -986,6 +1033,9 @@ class Actor:
# up. # up.
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn = self._service_n sn = self._service_n
@ -1010,6 +1060,9 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
# transport **was not** disconnected
return False
async def _from_parent( async def _from_parent(
self, self,
parent_addr: Optional[tuple[str, int]], parent_addr: Optional[tuple[str, int]],

View File

@ -18,8 +18,10 @@
Multi-core debugging for da peeps! Multi-core debugging for da peeps!
""" """
from __future__ import annotations
import bdb import bdb
import sys import sys
import signal
from functools import partial from functools import partial
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from typing import ( from typing import (
@ -29,16 +31,18 @@ from typing import (
AsyncIterator, AsyncIterator,
AsyncGenerator, AsyncGenerator,
) )
from types import FrameType
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from .log import get_logger from .log import get_logger
from . import _state
from ._discovery import get_root from ._discovery import get_root
from ._state import is_root_process, debug_mode from ._state import is_root_process, debug_mode
from ._exceptions import is_multi_cancelled from ._exceptions import is_multi_cancelled
from ._ipc import Channel
try: try:
# wtf: only exported when installed in dev mode? # wtf: only exported when installed in dev mode?
@ -46,7 +50,8 @@ try:
except ImportError: except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff # pdbpp is installed in regular mode...it monkey patches stuff
import pdb import pdb
assert pdb.xpm, "pdbpp is not installed?" # type: ignore xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb pdbpp = pdb
log = get_logger(__name__) log = get_logger(__name__)
@ -81,11 +86,14 @@ class TractorConfig(pdbpp.DefaultConfig):
"""Custom ``pdbpp`` goodness. """Custom ``pdbpp`` goodness.
""" """
# sticky_by_default = True # sticky_by_default = True
enable_hidden_frames = False
class PdbwTeardown(pdbpp.Pdb): class MultiActorPdb(pdbpp.Pdb):
"""Add teardown hooks to the regular ``pdbpp.Pdb``. '''
""" Add teardown hooks to the regular ``pdbpp.Pdb``.
'''
# override the pdbpp config with our coolio one # override the pdbpp config with our coolio one
DefaultConfig = TractorConfig DefaultConfig = TractorConfig
@ -95,16 +103,18 @@ class PdbwTeardown(pdbpp.Pdb):
try: try:
super().set_continue() super().set_continue()
finally: finally:
global _local_task_in_debug global _local_task_in_debug, _pdb_release_hook
_local_task_in_debug = None _local_task_in_debug = None
if _pdb_release_hook:
_pdb_release_hook() _pdb_release_hook()
def set_quit(self): def set_quit(self):
try: try:
super().set_quit() super().set_quit()
finally: finally:
global _local_task_in_debug global _local_task_in_debug, _pdb_release_hook
_local_task_in_debug = None _local_task_in_debug = None
if _pdb_release_hook:
_pdb_release_hook() _pdb_release_hook()
@ -150,7 +160,8 @@ async def _acquire_debug_lock(
uid: Tuple[str, str] uid: Tuple[str, str]
) -> AsyncIterator[trio.StrictFIFOLock]: ) -> AsyncIterator[trio.StrictFIFOLock]:
'''Acquire a root-actor local FIFO lock which tracks mutex access of '''
Acquire a root-actor local FIFO lock which tracks mutex access of
the process tree's global debugger breakpoint. the process tree's global debugger breakpoint.
This lock avoids tty clobbering (by preventing multiple processes This lock avoids tty clobbering (by preventing multiple processes
@ -162,27 +173,27 @@ async def _acquire_debug_lock(
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
log.debug( log.runtime(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}" f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
) )
we_acquired = False we_acquired = False
try:
log.runtime(
f"entering lock checkpoint, remote task: {task_name}:{uid}"
)
we_acquired = True
await _debug_lock.acquire()
if _no_remote_has_tty is None: if _no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime # mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child # can try to avoid clobbering any connection from a child
# that's currently relying on it. # that's currently relying on it.
_no_remote_has_tty = trio.Event() _no_remote_has_tty = trio.Event()
try:
log.debug(
f"entering lock checkpoint, remote task: {task_name}:{uid}"
)
we_acquired = True
await _debug_lock.acquire()
_global_actor_in_debug = uid _global_actor_in_debug = uid
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}") log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
# NOTE: critical section: this yield is unshielded! # NOTE: critical section: this yield is unshielded!
@ -198,7 +209,11 @@ async def _acquire_debug_lock(
finally: finally:
# if _global_actor_in_debug == uid: # if _global_actor_in_debug == uid:
if we_acquired and _debug_lock.locked():
if (
we_acquired
and _debug_lock.locked()
):
_debug_lock.release() _debug_lock.release()
# IFF there are no more requesting tasks queued up fire, the # IFF there are no more requesting tasks queued up fire, the
@ -210,28 +225,15 @@ async def _acquire_debug_lock(
if ( if (
not stats.owner not stats.owner
): ):
log.debug(f"No more tasks waiting on tty lock! says {uid}") log.runtime(f"No more tasks waiting on tty lock! says {uid}")
if _no_remote_has_tty is not None:
_no_remote_has_tty.set() _no_remote_has_tty.set()
_no_remote_has_tty = None _no_remote_has_tty = None
_global_actor_in_debug = None _global_actor_in_debug = None
log.debug(f"TTY lock released, remote task: {task_name}:{uid}") log.runtime(
f"TTY lock released, remote task: {task_name}:{uid}"
def handler(signum, frame, *args):
"""Specialized debugger compatible SIGINT handler.
In childred we always ignore to avoid deadlocks since cancellation
should always be managed by the parent supervising actor. The root
is always cancelled on ctrl-c.
"""
if is_root_process():
tractor.current_actor().cancel_soon()
else:
print(
"tractor ignores SIGINT while in debug mode\n"
"If you have a special need for it please open an issue.\n"
) )
@ -247,6 +249,10 @@ async def _hijack_stdin_for_child(
the pdbpp debugger console can be allocated to a sub-actor for repl the pdbpp debugger console can be allocated to a sub-actor for repl
bossing. bossing.
NOTE: this task is invoked in the root actor-process of the actor
tree. It is meant to be invoked as an rpc-task which should be
highly reliable at cleaning out the tty-lock state when complete!
''' '''
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
@ -260,47 +266,66 @@ async def _hijack_stdin_for_child(
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock") log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
with trio.CancelScope(shield=True): orig_handler = signal.signal(
signal.SIGINT,
shield_sigint,
)
try: try:
lock = None with (
async with _acquire_debug_lock(subactor_uid) as lock: trio.CancelScope(shield=True),
):
# try:
# lock = None
async with _acquire_debug_lock(subactor_uid): # as lock:
# indicate to child that we've locked stdio # indicate to child that we've locked stdio
await ctx.started('Locked') await ctx.started('Locked')
log.debug(f"Actor {subactor_uid} acquired stdin hijack lock") log.debug(
f"Actor {subactor_uid} acquired stdin hijack lock"
)
# wait for unlock pdb by child # wait for unlock pdb by child
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
assert await stream.receive() == 'pdb_unlock' assert await stream.receive() == 'pdb_unlock'
# try: # except (
# assert await stream.receive() == 'pdb_unlock'
except (
# BaseException, # BaseException,
trio.MultiError, # # trio.MultiError,
trio.BrokenResourceError, # # Exception,
trio.Cancelled, # by local cancellation # # trio.BrokenResourceError,
trio.ClosedResourceError, # by self._rx_chan # # trio.Cancelled, # by local cancellation
) as err: # # trio.ClosedResourceError, # by self._rx_chan
# XXX: there may be a race with the portal teardown # # ContextCancelled,
# with the calling actor which we can safely ignore. # # ConnectionResetError,
# The alternative would be sending an ack message # ):
# and allowing the client to wait for us to teardown # # XXX: there may be a race with the portal teardown
# first? # # with the calling actor which we can safely ignore.
if lock and lock.locked(): # # The alternative would be sending an ack message
lock.release() # # and allowing the client to wait for us to teardown
# # first?
# if lock and lock.locked():
# try:
# lock.release()
# except RuntimeError:
# log.exception(f"we don't own the tty lock?")
if isinstance(err, trio.Cancelled): # # if isinstance(err, trio.Cancelled):
raise # raise
finally:
log.debug( # finally:
"TTY lock released, remote task:" # log.runtime(
f"{task_name}:{subactor_uid}") # "TTY lock released, remote task:"
# f"{task_name}:{subactor_uid}"
# )
return "pdb_unlock_complete" return "pdb_unlock_complete"
finally:
signal.signal(
signal.SIGINT,
orig_handler
)
async def wait_for_parent_stdin_hijack( async def wait_for_parent_stdin_hijack(
actor_uid: Tuple[str, str], actor_uid: Tuple[str, str],
@ -338,20 +363,22 @@ async def wait_for_parent_stdin_hijack(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# unblock local caller # unblock local caller
task_status.started(cs)
try: try:
assert _local_pdb_complete assert _local_pdb_complete
task_status.started(cs)
await _local_pdb_complete.wait() await _local_pdb_complete.wait()
finally: finally:
# TODO: shielding currently can cause hangs... # TODO: shielding currently can cause hangs...
with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
await stream.send('pdb_unlock') await stream.send('pdb_unlock')
# sync with callee termination # sync with callee termination
assert await ctx.result() == "pdb_unlock_complete" assert await ctx.result() == "pdb_unlock_complete"
log.pdb('unlocked context')
except tractor.ContextCancelled: except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock') log.warning('Root actor cancelled debug lock')
@ -362,6 +389,32 @@ async def wait_for_parent_stdin_hijack(
log.debug(f"Child {actor_uid} released parent stdio lock") log.debug(f"Child {actor_uid} released parent stdio lock")
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb = MultiActorPdb()
signal.signal = pdbpp.hideframe(signal.signal)
orig_handler = signal.signal(
signal.SIGINT,
partial(shield_sigint, pdb_obj=pdb),
)
# XXX: These are the important flags mentioned in
# https://github.com/python-trio/trio/issues/1155
# which resolve the traceback spews to console.
pdb.allow_kbdint = True
pdb.nosigint = True
# TODO: add this as method on our pdb obj?
def undo_sigint():
# restore original sigint handler
signal.signal(
signal.SIGINT,
orig_handler
)
return pdb, undo_sigint
async def _breakpoint( async def _breakpoint(
debug_func, debug_func,
@ -370,23 +423,26 @@ async def _breakpoint(
# shield: bool = False # shield: bool = False
) -> None: ) -> None:
'''``tractor`` breakpoint entry for engaging pdb machinery '''
in the root or a subactor. breakpoint entry for engaging pdb machinery in the root or
a subactor.
''' '''
# TODO: is it possible to debug a trio.Cancelled except block? __tracebackhide__ = True
# right now it seems like we can kinda do with by shielding
# around ``tractor.breakpoint()`` but not if we move the shielded
# scope here???
# with trio.CancelScope(shield=shield):
pdb, undo_sigint = mk_mpdb()
actor = tractor.current_actor() actor = tractor.current_actor()
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
global _local_pdb_complete, _pdb_release_hook global _local_pdb_complete, _pdb_release_hook
global _local_task_in_debug, _global_actor_in_debug global _local_task_in_debug, _global_actor_in_debug
await trio.lowlevel.checkpoint() # TODO: is it possible to debug a trio.Cancelled except block?
# right now it seems like we can kinda do with by shielding
# around ``tractor.breakpoint()`` but not if we move the shielded
# scope here???
# with trio.CancelScope(shield=shield):
# await trio.lowlevel.checkpoint()
if not _local_pdb_complete or _local_pdb_complete.is_set(): if not _local_pdb_complete or _local_pdb_complete.is_set():
_local_pdb_complete = trio.Event() _local_pdb_complete = trio.Event()
@ -412,8 +468,20 @@ async def _breakpoint(
# entries/requests to the root process # entries/requests to the root process
_local_task_in_debug = task_name _local_task_in_debug = task_name
def child_release_hook():
try:
# sometimes the ``trio`` might already be termianated in
# which case this call will raise.
_local_pdb_complete.set()
finally:
# restore original sigint handler
undo_sigint()
# should always be cleared in the hijack hook aboved right?
# _local_task_in_debug = None
# assign unlock callback for debugger teardown hooks # assign unlock callback for debugger teardown hooks
_pdb_release_hook = _local_pdb_complete.set # _pdb_release_hook = _local_pdb_complete.set
_pdb_release_hook = child_release_hook
# this **must** be awaited by the caller and is done using the # this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without # root nursery so that the debugger can continue to run without
@ -423,11 +491,15 @@ async def _breakpoint(
# we have to figure out how to avoid having the service nursery # we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below? # cancel on this task start? I *think* this works below?
# actor._service_n.cancel_scope.shield = shield # actor._service_n.cancel_scope.shield = shield
try:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await actor._service_n.start( await actor._service_n.start(
wait_for_parent_stdin_hijack, wait_for_parent_stdin_hijack,
actor.uid, actor.uid,
) )
except RuntimeError:
child_release_hook()
raise
elif is_root_process(): elif is_root_process():
@ -464,59 +536,221 @@ async def _breakpoint(
global _local_pdb_complete, _debug_lock global _local_pdb_complete, _debug_lock
global _global_actor_in_debug, _local_task_in_debug global _global_actor_in_debug, _local_task_in_debug
try:
_debug_lock.release() _debug_lock.release()
except RuntimeError:
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = _debug_lock.statistics().owner
if owner:
raise
_global_actor_in_debug = None _global_actor_in_debug = None
_local_task_in_debug = None _local_task_in_debug = None
try:
# sometimes the ``trio`` might already be termianated in
# which case this call will raise.
_local_pdb_complete.set() _local_pdb_complete.set()
finally:
# restore original sigint handler
undo_sigint()
_pdb_release_hook = teardown _pdb_release_hook = teardown
# frame = sys._getframe()
# last_f = frame.f_back
# last_f.f_globals['__tracebackhide__'] = True
try:
# block here one (at the appropriate frame *up*) where # block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio. # ``breakpoint()`` was awaited and begin handling stdio.
log.debug("Entering the synchronous world of pdb") log.debug("Entering the synchronous world of pdb")
debug_func(actor) debug_func(actor, pdb)
except bdb.BdbQuit:
if _pdb_release_hook:
_pdb_release_hook()
raise
# XXX: apparently we can't do this without showing this frame
# in the backtrace on first entry to the REPL? Seems like an odd
# behaviour that should have been fixed by now. This is also why
# we scrapped all the @cm approaches that were tried previously.
# finally:
# __tracebackhide__ = True
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbpp.hideframe(signal.signal)
# signal.signal(
# signal.SIGINT,
# orig_handler
# )
def _mk_pdb() -> PdbwTeardown: def shield_sigint(
signum: int,
frame: 'frame', # type: ignore # noqa
pdb_obj: Optional[MultiActorPdb] = None,
*args,
# XXX: setting these flags on the pdb instance are absolutely ) -> None:
# critical to having ctrl-c work in the ``trio`` standard way! The '''
# stdlib's pdb supports entering the current sync frame on a SIGINT, Specialized debugger compatible SIGINT handler.
# with ``trio`` we pretty much never want this and if we did we can
# handle it in the ``tractor`` task runtime.
pdb = PdbwTeardown() In childred we always ignore to avoid deadlocks since cancellation
pdb.allow_kbdint = True should always be managed by the parent supervising actor. The root
pdb.nosigint = True is always cancelled on ctrl-c.
return pdb '''
__tracebackhide__ = True
global _local_task_in_debug, _global_actor_in_debug
uid_in_debug = _global_actor_in_debug
def _set_trace(actor=None): actor = tractor.current_actor()
pdb = _mk_pdb()
if actor is not None: def do_cancel():
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") # If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
pdb.set_trace( # If the runtime is already cancelled it likely means the user
# start 2 levels up in user code # hit ctrl-c again because teardown didn't full take place in
frame=sys._getframe().f_back.f_back, # which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
any_connected = False
if uid_in_debug is not None:
# try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not
# it's likely they aren't using the TTY lock / debugger
# and we should propagate SIGINT normally.
chans = actor._peers.get(tuple(uid_in_debug))
if chans:
any_connected = any(chan.connected() for chan in chans)
if not any_connected:
log.warning(
'A global actor reported to be in debug '
'but no connection exists for this child:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
# root actor branch that reports whether or not a child
# has locked debugger.
if (
is_root_process()
and uid_in_debug is not None
# XXX: only if there is an existing connection to the
# (sub-)actor in debug do we ignore SIGINT in this
# parent! Otherwise we may hang waiting for an actor
# which has already terminated to unlock.
and any_connected
):
name = uid_in_debug[0]
if name != 'root':
log.pdb(
f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`"
) )
else: else:
# we entered the global ``breakpoint()`` built-in from sync code log.pdb(
"Ignoring SIGINT while in debug mode"
)
# child actor that has locked the debugger
elif (
not is_root_process()
):
chan: Channel = actor._parent_chan
if not chan or not chan.connected():
log.warning(
'A global actor reported to be in debug '
'but no connection exists for its parent:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
task = _local_task_in_debug
if task:
log.pdb(
f"Ignoring SIGINT while task in debug mode: `{task}`"
)
# TODO: how to handle the case of an intermediary-child actor
# that **is not** marked in debug mode?
# elif debug_mode():
else:
log.pdb(
"Ignoring SIGINT since debug mode is enabled"
)
# maybe redraw/print last REPL output to console
if pdb_obj:
# TODO: make this work like sticky mode where if there is output
# detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above?
# pdb_obj.sticky = True
# pdb_obj._print_if_sticky()
# also see these links for an approach from ``ptk``:
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
try:
# XXX: lol, see ``pdbpp`` issue:
# https://github.com/pdbpp/pdbpp/issues/496
# pdb_obj.do_longlist(None)
# pdb_obj.lastcmd = 'longlist'
pdb_obj._printlonglist(max_lines=False)
# print(pdb_obj.prompt, end='', flush=True)
except AttributeError:
log.exception('pdbpp longlist failed...')
raise KeyboardInterrupt
def _set_trace(
actor: Optional[tractor._actor.Actor] = None,
pdb: Optional[MultiActorPdb] = None,
):
__tracebackhide__ = True
actor = actor or tractor.current_actor()
# XXX: on latest ``pdbpp`` i guess we don't need this?
# frame = sys._getframe()
# last_f = frame.f_back
# last_f.f_globals['__tracebackhide__'] = True
# start 2 levels up in user code
frame: FrameType = sys._getframe()
if frame:
frame = frame.f_back # type: ignore
if pdb and actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
else:
pdb, undo_sigint = mk_mpdb()
# we entered the global ``breakpoint()`` built-in from sync code?
global _local_task_in_debug, _pdb_release_hook global _local_task_in_debug, _pdb_release_hook
_local_task_in_debug = 'sync' _local_task_in_debug = 'sync'
def nuttin(): pdb.set_trace(frame=frame)
pass
_pdb_release_hook = nuttin
pdb.set_trace(
# start 2 levels up in user code
frame=sys._getframe().f_back,
)
breakpoint = partial( breakpoint = partial(
@ -525,11 +759,40 @@ breakpoint = partial(
) )
def _post_mortem(actor): def _post_mortem(
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n") actor: tractor._actor.Actor,
pdb = _mk_pdb() pdb: MultiActorPdb,
) -> None:
'''
Enter the ``pdbpp`` port mortem entrypoint using our custom
debugger instance.
'''
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
# XXX: on py3.10 if you don't have latest ``pdbpp`` installed.
# The exception looks something like:
# Traceback (most recent call last):
# File ".../tractor/_debug.py", line 729, in _post_mortem
# for _ in range(100):
# File "../site-packages/pdb.py", line 1227, in xpm
# post_mortem(info[2], Pdb)
# File "../site-packages/pdb.py", line 1175, in post_mortem
# p.interaction(None, t)
# File "../site-packages/pdb.py", line 216, in interaction
# ret = self.setup(frame, traceback)
# File "../site-packages/pdb.py", line 259, in setup
# ret = super(Pdb, self).setup(frame, tb)
# File "/usr/lib/python3.10/pdb.py", line 217, in setup
# self.curframe = self.stack[self.curindex][0]
# IndexError: list index out of range
# NOTE: you need ``pdbpp`` master (at least this commit
# https://github.com/pdbpp/pdbpp/commit/b757794857f98d53e3ebbe70879663d7d843a6c2)
# to fix this and avoid the hang it causes XD.
# see also: https://github.com/pdbpp/pdbpp/issues/480
# custom Pdb post-mortem entry
pdbpp.xpm(Pdb=lambda: pdb) pdbpp.xpm(Pdb=lambda: pdb)

View File

@ -231,7 +231,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
''' '''
import msgspec # noqa import msgspec # noqa
last_decode_failed: bool = False decodes_failed: int = 0
while True: while True:
try: try:
@ -239,6 +239,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
except ( except (
ValueError, ValueError,
ConnectionResetError,
# not sure entirely why we need this but without it we # not sure entirely why we need this but without it we
# seem to be getting racy failures here on # seem to be getting racy failures here on
@ -267,12 +268,17 @@ class MsgspecTCPStream(MsgpackTCPStream):
msgspec.DecodeError, msgspec.DecodeError,
UnicodeDecodeError, UnicodeDecodeError,
): ):
if not last_decode_failed: if decodes_failed < 4:
# ignore decoding errors for now and assume they have to # ignore decoding errors for now and assume they have to
# do with a channel drop - hope that receiving from the # do with a channel drop - hope that receiving from the
# channel will raise an expected error and bubble up. # channel will raise an expected error and bubble up.
log.error('`msgspec` failed to decode!?') decoded_bytes = msg_bytes.decode()
last_decode_failed = True log.error(
'`msgspec` failed to decode!?\n'
'dumping bytes:\n'
f'{decoded_bytes}'
)
decodes_failed += 1
else: else:
raise raise

View File

@ -24,7 +24,8 @@ import importlib
import inspect import inspect
from typing import ( from typing import (
Any, Optional, Any, Optional,
Callable, AsyncGenerator Callable, AsyncGenerator,
Type,
) )
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
@ -442,6 +443,10 @@ class Portal:
_err: Optional[BaseException] = None _err: Optional[BaseException] = None
ctx._portal = self ctx._portal = self
uid = self.channel.uid
cid = ctx.cid
etype: Optional[Type[BaseException]] = None
# deliver context instance and .started() msg value in open tuple. # deliver context instance and .started() msg value in open tuple.
try: try:
async with trio.open_nursery() as scope_nursery: async with trio.open_nursery() as scope_nursery:
@ -477,13 +482,24 @@ class Portal:
# KeyboardInterrupt, # KeyboardInterrupt,
) as err: ) as err:
_err = err etype = type(err)
# the context cancels itself on any cancel # the context cancels itself on any cancel
# causing error. # causing error.
log.cancel(
f'Context to {self.channel.uid} sending cancel request..')
if ctx.chan.connected():
log.cancel(
'Context cancelled for task, sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
await ctx.cancel() await ctx.cancel()
else:
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
f'actor:{uid}'
)
raise raise
finally: finally:
@ -492,6 +508,12 @@ class Portal:
# sure we get the error the underlying feeder mem chan. # sure we get the error the underlying feeder mem chan.
# if it's not raised here it *should* be raised from the # if it's not raised here it *should* be raised from the
# msg loop nursery right? # msg loop nursery right?
if ctx.chan.connected():
log.info(
'Waiting on final context-task result for\n'
f'task: {cid}\n'
f'actor: {uid}'
)
result = await ctx.result() result = await ctx.result()
# though it should be impossible for any tasks # though it should be impossible for any tasks
@ -502,14 +524,17 @@ class Portal:
# should we encapsulate this in the context api? # should we encapsulate this in the context api?
await ctx._recv_chan.aclose() await ctx._recv_chan.aclose()
if _err: if etype:
if ctx._cancel_called: if ctx._cancel_called:
log.cancel( log.cancel(
f'Context {fn_name} cancelled by caller with\n{_err}' f'Context {fn_name} cancelled by caller with\n{etype}'
) )
elif _err is not None: elif _err is not None:
log.cancel( log.cancel(
f'Context {fn_name} cancelled by callee with\n{_err}' f'Context for task cancelled by callee with {etype}\n'
f'target: `{fn_name}`\n'
f'task:{cid}\n'
f'actor:{uid}'
) )
else: else:
log.runtime( log.runtime(

View File

@ -601,10 +601,18 @@ class Context:
finally: finally:
if self._portal: if self._portal:
self._portal._streams.remove(rchan) try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
async def result(self) -> Any: async def result(self) -> Any:
'''From a caller side, wait for and return the final result from '''
From a caller side, wait for and return the final result from
the callee side task. the callee side task.
''' '''

View File

@ -259,6 +259,7 @@ def _run_asyncio_task(
nonlocal chan nonlocal chan
aio_err = chan._aio_err aio_err = chan._aio_err
task_err: Optional[BaseException] = None task_err: Optional[BaseException] = None
tname = task.get_name()
# only to avoid ``asyncio`` complaining about uncaptured # only to avoid ``asyncio`` complaining about uncaptured
# task exceptions # task exceptions
@ -266,7 +267,13 @@ def _run_asyncio_task(
task.exception() task.exception()
except BaseException as terr: except BaseException as terr:
task_err = terr task_err = terr
log.exception(f'`asyncio` task: {task.get_name()} errored')
if isinstance(terr, CancelledError):
log.cancel(
f'infected `asyncio` task cancelled: {tname}')
else:
log.exception(f'`asyncio` task: {tname} errored')
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
if aio_err is not None: if aio_err is not None:
@ -280,21 +287,24 @@ def _run_asyncio_task(
# We might want to change this in the future though. # We might want to change this in the future though.
from_aio.close() from_aio.close()
if type(aio_err) is CancelledError: # if type(aio_err) is CancelledError:
log.cancel("infected task was cancelled") # if not task_err:
# log.cancel(
# f"infected task {tname} cancelled itself, was not ``trio``"
# )
# TODO: show that the cancellation originated # TODO: show that the cancellation originated
# from the ``trio`` side? right? # from the ``trio`` side? right?
# if cancel_scope.cancelled: # if cancel_scope.cancelled:
# raise aio_err from err # raise aio_err from err
elif task_err is None: if task_err is None:
assert aio_err assert aio_err
aio_err.with_traceback(aio_err.__traceback__) aio_err.with_traceback(aio_err.__traceback__)
msg = ''.join(traceback.format_exception(type(aio_err))) # msg = ''.join(traceback.format_exception(type(aio_err)))
log.error( # log.error(
f'infected task errorred:\n{msg}' # f'infected task errorred:\n{msg}'
) # )
# raise any ``asyncio`` side error. # raise any ``asyncio`` side error.
raise aio_err raise aio_err
@ -387,8 +397,8 @@ async def run_task(
) -> Any: ) -> Any:
''' '''
Run an ``asyncio`` async function or generator in a task, return Run an ``asyncio`` async function or generator in a new task, block
or stream the result back to ``trio``. and return the result back to ``trio``.
''' '''
# simple async func # simple async func