Compare commits

..

22 Commits

Author SHA1 Message Date
goodboy e5ee2e3de8
Merge pull request #358 from goodboy/switch_to_pdbp
Switch to `pdbp` 🏄🏼
2023-05-15 09:58:58 -04:00
Tyler Goodlet 41aa91c8eb Add news file 2023-05-15 09:35:59 -04:00
Tyler Goodlet 6758e4487c Drop lingering `pdbpp` comment-refs in tests 2023-05-15 09:14:42 -04:00
Tyler Goodlet 1c3893a383 Drop commented `pdbpp` import logic 2023-05-15 09:01:55 -04:00
Tyler Goodlet 73befac9bc Switch to `pdbp` in test reqs 2023-05-15 09:01:27 -04:00
Tyler Goodlet 79622bbeea Restore `breakpoint()` hook after runtime exits
Previously we were leaking our (pdb++) override into the Python runtime
which would always result in a runtime error whenever `breakpoint()` is
called outside our runtime; after exit of the root actor . This
explicitly restores any previous hook override (detected during startup)
or deletes the hook and restores the environment if none existed prior.

Also adds a new WIP debugging example script to ensure breakpointing
works as normal after runtime close; this will be added to the test
suite.
2023-05-15 00:47:29 -04:00
Tyler Goodlet 95535b2226 Some more 3.10+ optional type sigs 2023-05-15 00:47:29 -04:00
Tyler Goodlet 87c6e09d6b Switch readme links to point @ `pdbp` B) 2023-05-14 22:52:24 -04:00
Tyler Goodlet 9ccd3a74b6 More detailed preface description 2023-05-14 22:38:47 -04:00
Tyler Goodlet ae4ff5dc8d pdbp: adding typing to config settings vars 2023-05-14 22:38:46 -04:00
Tyler Goodlet 705538398f `pdbp`: turn off line truncating by default, fixes terminal resizing stuff 2023-05-14 22:38:16 -04:00
Tyler Goodlet 86aef5238d Hide actor nursery exit frame 2023-05-14 21:24:26 -04:00
Tyler Goodlet cc82447db6 First try: switch debug machinery over to `pdbp` B) 2023-05-14 21:24:26 -04:00
Tyler Goodlet 23cffbd940 Use multiline import for debug mod 2023-05-14 21:24:26 -04:00
Tyler Goodlet 3d202272c4 Change over debugger tests to use `PROMPT` var.. 2023-05-14 21:24:26 -04:00
Tyler Goodlet 63cdb0891f Switch to `pdbp` since noone is maintaining `pdbpp` 2023-05-14 21:24:26 -04:00
goodboy 0f7db27b68
Merge pull request #356 from goodboy/drop_proc_actxmngr
`trio.Process.aclose()`?
2023-05-14 20:59:53 -04:00
Tyler Goodlet c53d62d2f7 Add news file 2023-05-14 20:31:26 -04:00
Tyler Goodlet f667d16d66 Copy the now deprecated `trio.Process.aclose()`
Move it into our `_spawn.do_hard_kill()` since we do indeed rely on
the particular process killing sequence on "soft kill" failure cases.
2023-05-14 19:31:50 -04:00
Tyler Goodlet 24a062341e Just call `trio.Process.aclose()` directly for now? 2023-04-02 14:34:41 -04:00
goodboy e714bec8db
Merge pull request #355 from kehrazy/patch-1
fixed the `Zombie` example having wrong indentation
2023-04-01 12:11:47 -04:00
Igor 009cd6552e
fixed the `Zombie` example having wrong indentation 2023-03-31 17:50:46 +03:00
14 changed files with 222 additions and 188 deletions

View File

@ -6,8 +6,14 @@
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
our nurseries_ let you spawn new Python processes which each run a ``trio``
Fundamentally, ``tractor`` gives you parallelism via
``trio``-"*actors*": independent Python processes (aka
non-shared-memory threads) which maintain structured
concurrency (SC) *end-to-end* inside a *supervision tree*.
Cross-process (and thus cross-host) SC is accomplished through the
combined use of our "actor nurseries_" and an "SC-transitive IPC
protocol" constructed on top of multiple Pythons each running a ``trio``
scheduled runtime - a call to ``trio.run()``.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
@ -23,7 +29,8 @@ Features
- **It's just** a ``trio`` API
- *Infinitely nesteable* process trees
- Builtin IPC streaming APIs with task fan-out broadcasting
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
- A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of
`pdb++`_ thanks to @mdmintz!)
- Support for a swappable, OS specific, process spawning layer
- A modular transport stack, allowing for custom serialization (eg. with
`msgspec`_), communications protocols, and environment specific IPC
@ -118,7 +125,7 @@ Zombie safe: self-destruct a process tree
f"running in pid {os.getpid()}"
)
await trio.sleep_forever()
await trio.sleep_forever()
async def main():
@ -149,7 +156,7 @@ it **is a bug**.
"Native" multi-process debugging
--------------------------------
Using the magic of `pdb++`_ and our internal IPC, we've
Using the magic of `pdbp`_ and our internal IPC, we've
been able to create a native feeling debugging experience for
any (sub-)process in your ``tractor`` tree.
@ -597,6 +604,7 @@ channel`_!
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
.. _pdbp: https://github.com/mdmintz/pdbp
.. _pdb++: https://github.com/pdbpp/pdbpp
.. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
.. _messages: https://en.wikipedia.org/wiki/Message_passing

View File

@ -0,0 +1,24 @@
import os
import sys
import trio
import tractor
async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
# TODO: an assert that verifies the hook has indeed been, hooked
# XD
assert sys.breakpointhook is not tractor._debug._set_trace
breakpoint()
# TODO: an assert that verifies the hook is unhooked..
assert sys.breakpointhook
breakpoint()
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,7 @@
Drop `trio.Process.aclose()` usage, copy into our spawning code.
The details are laid out in https://github.com/goodboy/tractor/issues/330.
`trio` changed is process running quite some time ago, this just copies
out the small bit we needed (from the old `.aclose()`) for hard kills
where a soft runtime cancel request fails and our "zombie killer"
implementation kicks in.

View File

@ -0,0 +1,15 @@
Switch to using the fork & fix of `pdb++`, `pdbp`:
https://github.com/mdmintz/pdbp
Allows us to sidestep a variety of issues that aren't being maintained
in the upstream project thanks to the hard work of @mdmintz!
We also include some default settings adjustments as per recent
development on the fork:
- sticky mode is still turned on by default but now activates when
a using the `ll` repl command.
- turn off line truncation by default to avoid inter-line gaps when
resizing the terimnal during use.
- when using the backtrace cmd either by `w` or `bt`, the config
automatically switches to non-sticky mode.

View File

@ -1,7 +1,7 @@
pytest
pytest-trio
pytest-timeout
pdbpp
pdbp
mypy
trio_typing
pexpect

View File

@ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup(
name="tractor",
version='0.1.0a6dev0', # alpha zone
description='structured concurrrent "actors"',
description='structured concurrrent `trio`-"actors"',
long_description=readme,
license='AGPLv3',
author='Tyler Goodlet',
maintainer='Tyler Goodlet',
maintainer_email='jgbt@protonmail.com',
maintainer_email='goodboy_foss@protonmail.com',
url='https://github.com/goodboy/tractor',
platforms=['linux', 'windows'],
packages=[
@ -52,16 +52,14 @@ setup(
# tooling
'tricycle',
'trio_typing',
# tooling
'colorlog',
'wrapt',
# serialization
# IPC serialization
'msgspec',
# debug mode REPL
'pdbpp',
'pdbp',
# pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
@ -73,10 +71,9 @@ setup(
# https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"',
],
tests_require=['pytest'],
python_requires=">=3.9",
python_requires=">=3.10",
keywords=[
'trio',
'async',

View File

@ -95,7 +95,7 @@ def spawn(
return _spawn
PROMPT = r"\(Pdb\+\+\)"
PROMPT = r"\(Pdb\+\)"
def expect(
@ -151,18 +151,6 @@ def ctlc(
use_ctlc = request.param
if (
sys.version_info <= (3, 10)
and use_ctlc
):
# on 3.9 it seems the REPL UX
# is highly unreliable and frankly annoying
# to test for. It does work from manual testing
# but i just don't think it's wroth it to try
# and get this working especially since we want to
# be 3.10+ mega-asap.
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
node = request.node
markers = node.own_markers
for mark in markers:
@ -193,13 +181,15 @@ def ctlc(
ids=lambda item: f'{item[0]} -> {item[1]}',
)
def test_root_actor_error(spawn, user_in_out):
"""Demonstrate crash handler entering pdbpp from basic error in root actor.
"""
'''
Demonstrate crash handler entering pdb from basic error in root actor.
'''
user_input, expect_err_str = user_in_out
child = spawn('root_actor_error')
# scan for the pdbpp prompt
# scan for the prompt
expect(child, PROMPT)
before = str(child.before.decode())
@ -230,8 +220,8 @@ def test_root_actor_bp(spawn, user_in_out):
user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
assert 'Error' not in str(child.before)
@ -272,7 +262,7 @@ def do_ctlc(
if expect_prompt:
before = str(child.before.decode())
time.sleep(delay)
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
time.sleep(delay)
if patt:
@ -291,7 +281,7 @@ def test_root_actor_bp_forever(
# entries
for _ in range(10):
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
if ctlc:
do_ctlc(child)
@ -301,7 +291,7 @@ def test_root_actor_bp_forever(
# do one continue which should trigger a
# new task to lock the tty
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# seems that if we hit ctrl-c too fast the
# sigint guard machinery might not kick in..
@ -312,10 +302,10 @@ def test_root_actor_bp_forever(
# XXX: this previously caused a bug!
child.sendline('n')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
child.sendline('n')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# quit out of the loop
child.sendline('q')
@ -338,8 +328,8 @@ def test_subactor_error(
'''
child = spawn('subactor_error')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
@ -359,7 +349,7 @@ def test_subactor_error(
# creating actor
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
# root actor gets debugger engaged
@ -386,8 +376,8 @@ def test_subactor_breakpoint(
child = spawn('subactor_breakpoint')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -396,7 +386,7 @@ def test_subactor_breakpoint(
# entries
for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
if ctlc:
do_ctlc(child)
@ -404,7 +394,7 @@ def test_subactor_breakpoint(
# now run some "continues" to show re-entries
for _ in range(5):
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -415,7 +405,7 @@ def test_subactor_breakpoint(
child.sendline('q')
# child process should exit but parent will capture pdb.BdbQuit
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "RemoteActorError: ('breakpoint_forever'" in before
@ -447,8 +437,8 @@ def test_multi_subactors(
'''
child = spawn(r'multi_subactors')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -460,7 +450,7 @@ def test_multi_subactors(
# entries
for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
if ctlc:
do_ctlc(child)
@ -469,7 +459,7 @@ def test_multi_subactors(
child.sendline('c')
# first name_error failure
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert "NameError" in before
@ -481,7 +471,7 @@ def test_multi_subactors(
child.sendline('c')
# 2nd name_error failure
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# TODO: will we ever get the race where this crash will show up?
# blocklist strat now prevents this crash
@ -495,7 +485,7 @@ def test_multi_subactors(
# breakpoint loop should re-engage
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -511,7 +501,7 @@ def test_multi_subactors(
):
child.sendline('c')
time.sleep(0.1)
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
if ctlc:
@ -530,11 +520,11 @@ def test_multi_subactors(
# now run some "continues" to show re-entries
for _ in range(5):
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# quit the loop and expect parent to attach
child.sendline('q')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert_before(child, [
@ -578,7 +568,7 @@ def test_multi_daemon_subactors(
'''
child = spawn('multi_daemon_subactors')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# there can be a race for which subactor will acquire
# the root's tty lock first so anticipate either crash
@ -608,7 +598,7 @@ def test_multi_daemon_subactors(
# second entry by `bp_forever`.
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
assert_before(child, [next_msg])
# XXX: hooray the root clobbering the child here was fixed!
@ -630,7 +620,7 @@ def test_multi_daemon_subactors(
# expect another breakpoint actor entry
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
try:
assert_before(child, [bp_forever_msg])
@ -646,7 +636,7 @@ def test_multi_daemon_subactors(
# after 1 or more further bp actor entries.
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
assert_before(child, [name_error_msg])
# wait for final error in root
@ -654,7 +644,7 @@ def test_multi_daemon_subactors(
while True:
try:
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
assert_before(
child,
[bp_forever_msg]
@ -687,8 +677,8 @@ def test_multi_subactors_root_errors(
'''
child = spawn('multi_subactor_root_errors')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
# at most one subactor should attach before the root is cancelled
before = str(child.before.decode())
@ -703,7 +693,7 @@ def test_multi_subactors_root_errors(
# due to block list strat from #337, this will no longer
# propagate before the root errors and cancels the spawner sub-tree.
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# only if the blocking condition doesn't kick in fast enough
before = str(child.before.decode())
@ -718,7 +708,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child)
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# check if the spawner crashed or was blocked from debug
# and if this intermediary attached check the boxed error
@ -735,7 +725,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child)
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# expect a root actor crash
assert_before(child, [
@ -784,7 +774,7 @@ def test_multi_nested_subactors_error_through_nurseries(
for send_char in itertools.cycle(['c', 'q']):
try:
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
child.sendline(send_char)
time.sleep(0.01)
@ -826,7 +816,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child = spawn('root_cancelled_but_child_is_in_tty_lock')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
@ -841,7 +831,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(4):
time.sleep(0.5)
try:
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
except (
EOF,
@ -898,7 +888,7 @@ def test_root_cancels_child_context_during_startup(
'''
child = spawn('fast_error_in_root_after_spawn')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "AssertionError" in before
@ -915,7 +905,7 @@ def test_different_debug_mode_per_actor(
ctlc: bool,
):
child = spawn('per_actor_debug')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# only one actor should enter the debugger
before = str(child.before.decode())

View File

@ -44,7 +44,10 @@ from ._exceptions import (
ModuleNotExposed,
ContextCancelled,
)
from ._debug import breakpoint, post_mortem
from ._debug import (
breakpoint,
post_mortem,
)
from . import msg
from ._root import (
run_daemon,

View File

@ -37,6 +37,7 @@ from typing import (
)
from types import FrameType
import pdbp
import tractor
import trio
from trio_typing import TaskStatus
@ -53,17 +54,6 @@ from ._exceptions import (
)
from ._ipc import Channel
try:
# wtf: only exported when installed in dev mode?
import pdbpp
except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff
import pdb
xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb
log = get_logger(__name__)
@ -154,22 +144,26 @@ class Lock:
cls.repl = None
class TractorConfig(pdbpp.DefaultConfig):
class TractorConfig(pdbp.DefaultConfig):
'''
Custom ``pdbpp`` goodness.
Custom ``pdbp`` goodness :surfer:
'''
# use_pygments = True
# sticky_by_default = True
enable_hidden_frames = False
use_pygments: bool = True
sticky_by_default: bool = False
enable_hidden_frames: bool = False
# much thanks @mdmintz for the hot tip!
# fixes line spacing issue when resizing terminal B)
truncate_long_lines: bool = False
class MultiActorPdb(pdbpp.Pdb):
class MultiActorPdb(pdbp.Pdb):
'''
Add teardown hooks to the regular ``pdbpp.Pdb``.
Add teardown hooks to the regular ``pdbp.Pdb``.
'''
# override the pdbpp config with our coolio one
# override the pdbp config with our coolio one
DefaultConfig = TractorConfig
# def preloop(self):
@ -313,7 +307,7 @@ async def lock_tty_for_child(
) -> str:
'''
Lock the TTY in the root process of an actor tree in a new
inter-actor-context-task such that the ``pdbpp`` debugger console
inter-actor-context-task such that the ``pdbp`` debugger console
can be mutex-allocated to the calling sub-actor for REPL control
without interference by other processes / threads.
@ -433,7 +427,7 @@ async def wait_for_parent_stdin_hijack(
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb = MultiActorPdb()
# signal.signal = pdbpp.hideframe(signal.signal)
# signal.signal = pdbp.hideframe(signal.signal)
Lock.shield_sigint()
@ -583,7 +577,7 @@ async def _breakpoint(
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbpp.hideframe(signal.signal)
# # signal.signal = pdbp.hideframe(signal.signal)
def shield_sigint_handler(
@ -743,13 +737,13 @@ def shield_sigint_handler(
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
# XXX: lol, see ``pdbpp`` issue:
# XXX LEGACY: lol, see ``pdbpp`` issue:
# https://github.com/pdbpp/pdbpp/issues/496
def _set_trace(
actor: Optional[tractor.Actor] = None,
pdb: Optional[MultiActorPdb] = None,
actor: tractor.Actor | None = None,
pdb: MultiActorPdb | None = None,
):
__tracebackhide__ = True
actor = actor or tractor.current_actor()
@ -759,7 +753,11 @@ def _set_trace(
if frame:
frame = frame.f_back # type: ignore
if frame and pdb and actor is not None:
if (
frame
and pdb
and actor is not None
):
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
# no f!#$&* idea, but when we're in async land
# we need 2x frames up?
@ -768,7 +766,8 @@ def _set_trace(
else:
pdb, undo_sigint = mk_mpdb()
# we entered the global ``breakpoint()`` built-in from sync code?
# we entered the global ``breakpoint()`` built-in from sync
# code?
Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame)
@ -798,7 +797,7 @@ def _post_mortem(
# https://github.com/pdbpp/pdbpp/issues/480
# TODO: help with a 3.10+ major release if/when it arrives.
pdbpp.xpm(Pdb=lambda: pdb)
pdbp.xpm(Pdb=lambda: pdb)
post_mortem = partial(

View File

@ -22,8 +22,9 @@ from contextlib import asynccontextmanager
from functools import partial
import importlib
import logging
import os
import signal
import sys
import os
import typing
import warnings
@ -84,8 +85,10 @@ async def open_root_actor(
'''
# Override the global debugger hook to make it play nice with
# ``trio``, see:
# ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it
@ -253,6 +256,16 @@ async def open_root_actor(
logger.cancel("Shutting down root actor")
await actor.cancel()
finally:
_state._current_actor = None
# restore breakpoint hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT')
logger.runtime("Root actor terminated")
@ -288,7 +301,7 @@ def run_daemon(
async def _main():
async with open_root_actor(
arbiter_addr=registry_addr,
registry_addr=registry_addr,
name=name,
start_method=start_method,
debug_mode=debug_mode,

View File

@ -199,8 +199,8 @@ async def _invoke(
except BaseExceptionGroup:
# if a context error was set then likely
# thei multierror was raised due to that
if ctx._remote_ctx_error is not None:
raise ctx._remote_ctx_error from None
if ctx._error is not None:
raise ctx._error from None
raise

View File

@ -23,13 +23,12 @@ import sys
import platform
from typing import (
Any,
Awaitable,
Literal,
Optional,
Callable,
TypeVar,
TYPE_CHECKING,
)
from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup
import trio
@ -60,7 +59,7 @@ if TYPE_CHECKING:
log = get_logger('tractor')
# placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None
_ctx: mp.context.BaseContext | None = None
SpawnMethodKey = Literal[
'trio', # supported on all platforms
'mp_spawn',
@ -86,7 +85,7 @@ else:
def try_set_start_method(
key: SpawnMethodKey
) -> Optional[mp.context.BaseContext]:
) -> mp.context.BaseContext | None:
'''
Attempt to set the method for process starting, aka the "actor
spawning backend".
@ -200,16 +199,37 @@ async def cancel_on_completion(
async def do_hard_kill(
proc: trio.Process,
terminate_after: int = 3,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs:
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
# NOTE: code below was copied verbatim from the now deprecated
# (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc
# string:
#
# Close any pipes we have to the process (both input and output)
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
if proc.stdout is not None:
await proc.stdout.aclose()
if proc.stderr is not None:
await proc.stderr.aclose()
try:
await proc.wait()
finally:
if proc.returncode is None:
proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
@ -355,12 +375,11 @@ async def trio_proc(
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: Optional[trio.Process] = None
proc: trio.Process | None = None
try:
try:
# TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process( # type: ignore
spawn_cmd)
proc = await trio.lowlevel.open_process(spawn_cmd)
log.runtime(f"Started {proc}")
@ -444,8 +463,8 @@ async def trio_proc(
nursery.cancel_scope.cancel()
finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# XXX NOTE XXX: The "hard" reap since no actor zombies are
# allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')

View File

@ -27,8 +27,7 @@ from typing import (
Optional,
Callable,
AsyncGenerator,
AsyncIterator,
TYPE_CHECKING,
AsyncIterator
)
import warnings
@ -42,10 +41,6 @@ from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__)
@ -75,7 +70,7 @@ class MsgStream(trio.abc.Channel):
'''
def __init__(
self,
ctx: Context, # typing: ignore # noqa
ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel,
_broadcaster: Optional[BroadcastReceiver] = None,
@ -88,9 +83,6 @@ class MsgStream(trio.abc.Channel):
self._eoc: bool = False
self._closed: bool = False
def ctx(self) -> Context:
return self._ctx
# delegate directly to underlying mem channel
def receive_nowait(self):
msg = self._rx_chan.receive_nowait()
@ -286,6 +278,7 @@ class MsgStream(trio.abc.Channel):
@asynccontextmanager
async def subscribe(
self,
) -> AsyncIterator[BroadcastReceiver]:
'''
Allocate and return a ``BroadcastReceiver`` which delegates
@ -342,8 +335,8 @@ class MsgStream(trio.abc.Channel):
Send a message over this stream to the far end.
'''
if self._ctx._remote_ctx_error:
raise self._ctx._remote_ctx_error # from None
if self._ctx._error:
raise self._ctx._error # from None
if self._closed:
raise trio.ClosedResourceError('This stream was already closed')
@ -382,10 +375,9 @@ class Context:
_remote_func_type: Optional[str] = None
# only set on the caller side
_portal: Optional[Portal] = None # type: ignore # noqa
_stream: Optional[MsgStream] = None
_portal: Optional['Portal'] = None # type: ignore # noqa
_result: Optional[Any] = False
_remote_ctx_error: Optional[BaseException] = None
_error: Optional[BaseException] = None
# status flags
_cancel_called: bool = False
@ -398,7 +390,7 @@ class Context:
# only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = True
_backpressure: bool = False
async def send_yield(self, data: Any) -> None:
@ -443,26 +435,21 @@ class Context:
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
error = unpack_error(msg, self.chan)
if (
isinstance(error, ContextCancelled)
isinstance(error, ContextCancelled) and
self._cancel_called
):
log.cancel(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
if self._cancel_called:
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
self._remote_ctx_error = error
self._error = error
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
@ -470,7 +457,7 @@ class Context:
if self._scope_nursery:
async def raiser():
raise self._remote_ctx_error from None
raise self._error from None
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
@ -496,7 +483,6 @@ class Context:
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
ipc_broken: bool = False
if side == 'caller':
if not self._portal:
@ -514,14 +500,7 @@ class Context:
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
try:
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
except trio.BrokenResourceError:
ipc_broken = True
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
@ -537,10 +516,7 @@ class Context:
"Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}")
elif ipc_broken:
log.cancel(
"Transport layer was broken before cancel request "
f"{cid} for {self._portal.channel.uid}")
# callee side remote task
else:
self._cancel_msg = msg
@ -628,7 +604,6 @@ class Context:
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
self._stream = stream
if self._portal:
self._portal._streams.add(stream)
@ -670,22 +645,25 @@ class Context:
if not self._recv_chan._closed: # type: ignore
def consume(
msg: dict,
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
) -> Optional[dict]:
msg = await self._recv_chan.receive()
try:
return msg['return']
self._result = msg['return']
break
except KeyError as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
return
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
return
continue
# internal error should never get here
assert msg.get('cid'), (
@ -695,25 +673,6 @@ class Context:
msg, self._portal.channel
) from msgerr
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
if self._stream:
async with self._stream.subscribe() as bstream:
async for msg in bstream:
result = consume(msg)
if result:
self._result = result
break
if not self._result:
while True:
msg = await self._recv_chan.receive()
result = consume(msg)
if result:
self._result = result
break
return self._result
async def started(

View File

@ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay?
# __tracebackhide__ = True
__tracebackhide__ = True
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {}