forked from goodboy/tractor
1
0
Fork 0

Compare commits

..

6 Commits

Author SHA1 Message Date
Tyler Goodlet 6120e99d7e Rename `._error` -> `._remote_ctx_error` 2023-01-30 14:13:43 -05:00
Tyler Goodlet 33b0e36ad6 Break loop after result retreival 2023-01-30 14:13:43 -05:00
Tyler Goodlet a71a958f54 Log context cancellation using `.cancel()` loglevel 2023-01-30 14:13:43 -05:00
Tyler Goodlet c9eb466d76 Use `MsgStream.subscribe()` in `Context.result()`
The case exists where there is multiple tasks consuming from an open
2-way stream created via `Context.open_stream()` where a sibling task is
pulling from the stream while some other task also calls `.result()`.
Previously the `.result()` call would consume (drain) stream messages
directly from the underlying mem chan which would mean any sibling task
would not receive those same messages. Instead, make `.result()` check
if a stream is open and instead consume (and discard) stream msgs using
a `BroadcastReceiver` (via `MsgStream.subscribe()`) such that all
interested tasks get copies of the same packets.
2023-01-30 14:13:42 -05:00
Tyler Goodlet f7a1f3832f Enable stream backpressure by default, add `MsgStream.ctx: Context` 2023-01-30 14:09:35 -05:00
Tyler Goodlet 3f2e33a120 Don't unset actor global on root teardown 2023-01-30 14:09:35 -05:00
14 changed files with 188 additions and 222 deletions

View File

@ -6,14 +6,8 @@
``tractor`` is a `structured concurrent`_, multi-processing_ runtime ``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_. built on trio_.
Fundamentally, ``tractor`` gives you parallelism via Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
``trio``-"*actors*": independent Python processes (aka our nurseries_ let you spawn new Python processes which each run a ``trio``
non-shared-memory threads) which maintain structured
concurrency (SC) *end-to-end* inside a *supervision tree*.
Cross-process (and thus cross-host) SC is accomplished through the
combined use of our "actor nurseries_" and an "SC-transitive IPC
protocol" constructed on top of multiple Pythons each running a ``trio``
scheduled runtime - a call to ``trio.run()``. scheduled runtime - a call to ``trio.run()``.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_" We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
@ -29,8 +23,7 @@ Features
- **It's just** a ``trio`` API - **It's just** a ``trio`` API
- *Infinitely nesteable* process trees - *Infinitely nesteable* process trees
- Builtin IPC streaming APIs with task fan-out broadcasting - Builtin IPC streaming APIs with task fan-out broadcasting
- A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of - A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
`pdb++`_ thanks to @mdmintz!)
- Support for a swappable, OS specific, process spawning layer - Support for a swappable, OS specific, process spawning layer
- A modular transport stack, allowing for custom serialization (eg. with - A modular transport stack, allowing for custom serialization (eg. with
`msgspec`_), communications protocols, and environment specific IPC `msgspec`_), communications protocols, and environment specific IPC
@ -125,7 +118,7 @@ Zombie safe: self-destruct a process tree
f"running in pid {os.getpid()}" f"running in pid {os.getpid()}"
) )
await trio.sleep_forever() await trio.sleep_forever()
async def main(): async def main():
@ -156,7 +149,7 @@ it **is a bug**.
"Native" multi-process debugging "Native" multi-process debugging
-------------------------------- --------------------------------
Using the magic of `pdbp`_ and our internal IPC, we've Using the magic of `pdb++`_ and our internal IPC, we've
been able to create a native feeling debugging experience for been able to create a native feeling debugging experience for
any (sub-)process in your ``tractor`` tree. any (sub-)process in your ``tractor`` tree.
@ -604,7 +597,6 @@ channel`_!
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general .. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org .. _matrix channel: https://matrix.to/#/!tractor:matrix.org
.. _pdbp: https://github.com/mdmintz/pdbp
.. _pdb++: https://github.com/pdbpp/pdbpp .. _pdb++: https://github.com/pdbpp/pdbpp
.. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops .. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
.. _messages: https://en.wikipedia.org/wiki/Message_passing .. _messages: https://en.wikipedia.org/wiki/Message_passing

View File

@ -1,24 +0,0 @@
import os
import sys
import trio
import tractor
async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
# TODO: an assert that verifies the hook has indeed been, hooked
# XD
assert sys.breakpointhook is not tractor._debug._set_trace
breakpoint()
# TODO: an assert that verifies the hook is unhooked..
assert sys.breakpointhook
breakpoint()
if __name__ == '__main__':
trio.run(main)

View File

@ -1,7 +0,0 @@
Drop `trio.Process.aclose()` usage, copy into our spawning code.
The details are laid out in https://github.com/goodboy/tractor/issues/330.
`trio` changed is process running quite some time ago, this just copies
out the small bit we needed (from the old `.aclose()`) for hard kills
where a soft runtime cancel request fails and our "zombie killer"
implementation kicks in.

View File

@ -1,15 +0,0 @@
Switch to using the fork & fix of `pdb++`, `pdbp`:
https://github.com/mdmintz/pdbp
Allows us to sidestep a variety of issues that aren't being maintained
in the upstream project thanks to the hard work of @mdmintz!
We also include some default settings adjustments as per recent
development on the fork:
- sticky mode is still turned on by default but now activates when
a using the `ll` repl command.
- turn off line truncation by default to avoid inter-line gaps when
resizing the terimnal during use.
- when using the backtrace cmd either by `w` or `bt`, the config
automatically switches to non-sticky mode.

View File

@ -1,7 +1,7 @@
pytest pytest
pytest-trio pytest-trio
pytest-timeout pytest-timeout
pdbp pdbpp
mypy mypy
trio_typing trio_typing
pexpect pexpect

View File

@ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup( setup(
name="tractor", name="tractor",
version='0.1.0a6dev0', # alpha zone version='0.1.0a6dev0', # alpha zone
description='structured concurrrent `trio`-"actors"', description='structured concurrrent "actors"',
long_description=readme, long_description=readme,
license='AGPLv3', license='AGPLv3',
author='Tyler Goodlet', author='Tyler Goodlet',
maintainer='Tyler Goodlet', maintainer='Tyler Goodlet',
maintainer_email='goodboy_foss@protonmail.com', maintainer_email='jgbt@protonmail.com',
url='https://github.com/goodboy/tractor', url='https://github.com/goodboy/tractor',
platforms=['linux', 'windows'], platforms=['linux', 'windows'],
packages=[ packages=[
@ -52,14 +52,16 @@ setup(
# tooling # tooling
'tricycle', 'tricycle',
'trio_typing', 'trio_typing',
# tooling
'colorlog', 'colorlog',
'wrapt', 'wrapt',
# IPC serialization # serialization
'msgspec', 'msgspec',
# debug mode REPL # debug mode REPL
'pdbp', 'pdbpp',
# pip ref docs on these specs: # pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
@ -71,9 +73,10 @@ setup(
# https://github.com/pdbpp/fancycompleter/issues/37 # https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"', 'pyreadline3 ; platform_system == "Windows"',
], ],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.10", python_requires=">=3.9",
keywords=[ keywords=[
'trio', 'trio',
'async', 'async',

View File

@ -95,7 +95,7 @@ def spawn(
return _spawn return _spawn
PROMPT = r"\(Pdb\+\)" PROMPT = r"\(Pdb\+\+\)"
def expect( def expect(
@ -151,6 +151,18 @@ def ctlc(
use_ctlc = request.param use_ctlc = request.param
if (
sys.version_info <= (3, 10)
and use_ctlc
):
# on 3.9 it seems the REPL UX
# is highly unreliable and frankly annoying
# to test for. It does work from manual testing
# but i just don't think it's wroth it to try
# and get this working especially since we want to
# be 3.10+ mega-asap.
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
node = request.node node = request.node
markers = node.own_markers markers = node.own_markers
for mark in markers: for mark in markers:
@ -181,15 +193,13 @@ def ctlc(
ids=lambda item: f'{item[0]} -> {item[1]}', ids=lambda item: f'{item[0]} -> {item[1]}',
) )
def test_root_actor_error(spawn, user_in_out): def test_root_actor_error(spawn, user_in_out):
''' """Demonstrate crash handler entering pdbpp from basic error in root actor.
Demonstrate crash handler entering pdb from basic error in root actor. """
'''
user_input, expect_err_str = user_in_out user_input, expect_err_str = user_in_out
child = spawn('root_actor_error') child = spawn('root_actor_error')
# scan for the prompt # scan for the pdbpp prompt
expect(child, PROMPT) expect(child, PROMPT)
before = str(child.before.decode()) before = str(child.before.decode())
@ -220,8 +230,8 @@ def test_root_actor_bp(spawn, user_in_out):
user_input, expect_err_str = user_in_out user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint') child = spawn('root_actor_breakpoint')
# scan for the prompt # scan for the pdbpp prompt
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
assert 'Error' not in str(child.before) assert 'Error' not in str(child.before)
@ -262,7 +272,7 @@ def do_ctlc(
if expect_prompt: if expect_prompt:
before = str(child.before.decode()) before = str(child.before.decode())
time.sleep(delay) time.sleep(delay)
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
time.sleep(delay) time.sleep(delay)
if patt: if patt:
@ -281,7 +291,7 @@ def test_root_actor_bp_forever(
# entries # entries
for _ in range(10): for _ in range(10):
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -291,7 +301,7 @@ def test_root_actor_bp_forever(
# do one continue which should trigger a # do one continue which should trigger a
# new task to lock the tty # new task to lock the tty
child.sendline('continue') child.sendline('continue')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# seems that if we hit ctrl-c too fast the # seems that if we hit ctrl-c too fast the
# sigint guard machinery might not kick in.. # sigint guard machinery might not kick in..
@ -302,10 +312,10 @@ def test_root_actor_bp_forever(
# XXX: this previously caused a bug! # XXX: this previously caused a bug!
child.sendline('n') child.sendline('n')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
child.sendline('n') child.sendline('n')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# quit out of the loop # quit out of the loop
child.sendline('q') child.sendline('q')
@ -328,8 +338,8 @@ def test_subactor_error(
''' '''
child = spawn('subactor_error') child = spawn('subactor_error')
# scan for the prompt # scan for the pdbpp prompt
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "Attaching to pdb in crashed actor: ('name_error'" in before
@ -349,7 +359,7 @@ def test_subactor_error(
# creating actor # creating actor
child.sendline('continue') child.sendline('continue')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
# root actor gets debugger engaged # root actor gets debugger engaged
@ -376,8 +386,8 @@ def test_subactor_breakpoint(
child = spawn('subactor_breakpoint') child = spawn('subactor_breakpoint')
# scan for the prompt # scan for the pdbpp prompt
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -386,7 +396,7 @@ def test_subactor_breakpoint(
# entries # entries
for _ in range(10): for _ in range(10):
child.sendline('next') child.sendline('next')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -394,7 +404,7 @@ def test_subactor_breakpoint(
# now run some "continues" to show re-entries # now run some "continues" to show re-entries
for _ in range(5): for _ in range(5):
child.sendline('continue') child.sendline('continue')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -405,7 +415,7 @@ def test_subactor_breakpoint(
child.sendline('q') child.sendline('q')
# child process should exit but parent will capture pdb.BdbQuit # child process should exit but parent will capture pdb.BdbQuit
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "RemoteActorError: ('breakpoint_forever'" in before assert "RemoteActorError: ('breakpoint_forever'" in before
@ -437,8 +447,8 @@ def test_multi_subactors(
''' '''
child = spawn(r'multi_subactors') child = spawn(r'multi_subactors')
# scan for the prompt # scan for the pdbpp prompt
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -450,7 +460,7 @@ def test_multi_subactors(
# entries # entries
for _ in range(10): for _ in range(10):
child.sendline('next') child.sendline('next')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -459,7 +469,7 @@ def test_multi_subactors(
child.sendline('c') child.sendline('c')
# first name_error failure # first name_error failure
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert "NameError" in before assert "NameError" in before
@ -471,7 +481,7 @@ def test_multi_subactors(
child.sendline('c') child.sendline('c')
# 2nd name_error failure # 2nd name_error failure
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# TODO: will we ever get the race where this crash will show up? # TODO: will we ever get the race where this crash will show up?
# blocklist strat now prevents this crash # blocklist strat now prevents this crash
@ -485,7 +495,7 @@ def test_multi_subactors(
# breakpoint loop should re-engage # breakpoint loop should re-engage
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -501,7 +511,7 @@ def test_multi_subactors(
): ):
child.sendline('c') child.sendline('c')
time.sleep(0.1) time.sleep(0.1)
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
if ctlc: if ctlc:
@ -520,11 +530,11 @@ def test_multi_subactors(
# now run some "continues" to show re-entries # now run some "continues" to show re-entries
for _ in range(5): for _ in range(5):
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# quit the loop and expect parent to attach # quit the loop and expect parent to attach
child.sendline('q') child.sendline('q')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert_before(child, [ assert_before(child, [
@ -568,7 +578,7 @@ def test_multi_daemon_subactors(
''' '''
child = spawn('multi_daemon_subactors') child = spawn('multi_daemon_subactors')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# there can be a race for which subactor will acquire # there can be a race for which subactor will acquire
# the root's tty lock first so anticipate either crash # the root's tty lock first so anticipate either crash
@ -598,7 +608,7 @@ def test_multi_daemon_subactors(
# second entry by `bp_forever`. # second entry by `bp_forever`.
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
assert_before(child, [next_msg]) assert_before(child, [next_msg])
# XXX: hooray the root clobbering the child here was fixed! # XXX: hooray the root clobbering the child here was fixed!
@ -620,7 +630,7 @@ def test_multi_daemon_subactors(
# expect another breakpoint actor entry # expect another breakpoint actor entry
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
try: try:
assert_before(child, [bp_forever_msg]) assert_before(child, [bp_forever_msg])
@ -636,7 +646,7 @@ def test_multi_daemon_subactors(
# after 1 or more further bp actor entries. # after 1 or more further bp actor entries.
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
assert_before(child, [name_error_msg]) assert_before(child, [name_error_msg])
# wait for final error in root # wait for final error in root
@ -644,7 +654,7 @@ def test_multi_daemon_subactors(
while True: while True:
try: try:
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
assert_before( assert_before(
child, child,
[bp_forever_msg] [bp_forever_msg]
@ -677,8 +687,8 @@ def test_multi_subactors_root_errors(
''' '''
child = spawn('multi_subactor_root_errors') child = spawn('multi_subactor_root_errors')
# scan for the prompt # scan for the pdbpp prompt
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# at most one subactor should attach before the root is cancelled # at most one subactor should attach before the root is cancelled
before = str(child.before.decode()) before = str(child.before.decode())
@ -693,7 +703,7 @@ def test_multi_subactors_root_errors(
# due to block list strat from #337, this will no longer # due to block list strat from #337, this will no longer
# propagate before the root errors and cancels the spawner sub-tree. # propagate before the root errors and cancels the spawner sub-tree.
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# only if the blocking condition doesn't kick in fast enough # only if the blocking condition doesn't kick in fast enough
before = str(child.before.decode()) before = str(child.before.decode())
@ -708,7 +718,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# check if the spawner crashed or was blocked from debug # check if the spawner crashed or was blocked from debug
# and if this intermediary attached check the boxed error # and if this intermediary attached check the boxed error
@ -725,7 +735,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child) do_ctlc(child)
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# expect a root actor crash # expect a root actor crash
assert_before(child, [ assert_before(child, [
@ -774,7 +784,7 @@ def test_multi_nested_subactors_error_through_nurseries(
for send_char in itertools.cycle(['c', 'q']): for send_char in itertools.cycle(['c', 'q']):
try: try:
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
child.sendline(send_char) child.sendline(send_char)
time.sleep(0.01) time.sleep(0.01)
@ -816,7 +826,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child = spawn('root_cancelled_but_child_is_in_tty_lock') child = spawn('root_cancelled_but_child_is_in_tty_lock')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before assert "NameError: name 'doggypants' is not defined" in before
@ -831,7 +841,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(4): for i in range(4):
time.sleep(0.5) time.sleep(0.5)
try: try:
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
except ( except (
EOF, EOF,
@ -888,7 +898,7 @@ def test_root_cancels_child_context_during_startup(
''' '''
child = spawn('fast_error_in_root_after_spawn') child = spawn('fast_error_in_root_after_spawn')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode()) before = str(child.before.decode())
assert "AssertionError" in before assert "AssertionError" in before
@ -905,7 +915,7 @@ def test_different_debug_mode_per_actor(
ctlc: bool, ctlc: bool,
): ):
child = spawn('per_actor_debug') child = spawn('per_actor_debug')
child.expect(PROMPT) child.expect(r"\(Pdb\+\+\)")
# only one actor should enter the debugger # only one actor should enter the debugger
before = str(child.before.decode()) before = str(child.before.decode())

View File

@ -44,10 +44,7 @@ from ._exceptions import (
ModuleNotExposed, ModuleNotExposed,
ContextCancelled, ContextCancelled,
) )
from ._debug import ( from ._debug import breakpoint, post_mortem
breakpoint,
post_mortem,
)
from . import msg from . import msg
from ._root import ( from ._root import (
run_daemon, run_daemon,

View File

@ -37,7 +37,6 @@ from typing import (
) )
from types import FrameType from types import FrameType
import pdbp
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -54,6 +53,17 @@ from ._exceptions import (
) )
from ._ipc import Channel from ._ipc import Channel
try:
# wtf: only exported when installed in dev mode?
import pdbpp
except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff
import pdb
xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb
log = get_logger(__name__) log = get_logger(__name__)
@ -144,26 +154,22 @@ class Lock:
cls.repl = None cls.repl = None
class TractorConfig(pdbp.DefaultConfig): class TractorConfig(pdbpp.DefaultConfig):
''' '''
Custom ``pdbp`` goodness :surfer: Custom ``pdbpp`` goodness.
''' '''
use_pygments: bool = True # use_pygments = True
sticky_by_default: bool = False # sticky_by_default = True
enable_hidden_frames: bool = False enable_hidden_frames = False
# much thanks @mdmintz for the hot tip!
# fixes line spacing issue when resizing terminal B)
truncate_long_lines: bool = False
class MultiActorPdb(pdbp.Pdb): class MultiActorPdb(pdbpp.Pdb):
''' '''
Add teardown hooks to the regular ``pdbp.Pdb``. Add teardown hooks to the regular ``pdbpp.Pdb``.
''' '''
# override the pdbp config with our coolio one # override the pdbpp config with our coolio one
DefaultConfig = TractorConfig DefaultConfig = TractorConfig
# def preloop(self): # def preloop(self):
@ -307,7 +313,7 @@ async def lock_tty_for_child(
) -> str: ) -> str:
''' '''
Lock the TTY in the root process of an actor tree in a new Lock the TTY in the root process of an actor tree in a new
inter-actor-context-task such that the ``pdbp`` debugger console inter-actor-context-task such that the ``pdbpp`` debugger console
can be mutex-allocated to the calling sub-actor for REPL control can be mutex-allocated to the calling sub-actor for REPL control
without interference by other processes / threads. without interference by other processes / threads.
@ -427,7 +433,7 @@ async def wait_for_parent_stdin_hijack(
def mk_mpdb() -> tuple[MultiActorPdb, Callable]: def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb = MultiActorPdb() pdb = MultiActorPdb()
# signal.signal = pdbp.hideframe(signal.signal) # signal.signal = pdbpp.hideframe(signal.signal)
Lock.shield_sigint() Lock.shield_sigint()
@ -577,7 +583,7 @@ async def _breakpoint(
# # frame = sys._getframe() # # frame = sys._getframe()
# # last_f = frame.f_back # # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True # # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbp.hideframe(signal.signal) # # signal.signal = pdbpp.hideframe(signal.signal)
def shield_sigint_handler( def shield_sigint_handler(
@ -737,13 +743,13 @@ def shield_sigint_handler(
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
# XXX LEGACY: lol, see ``pdbpp`` issue: # XXX: lol, see ``pdbpp`` issue:
# https://github.com/pdbpp/pdbpp/issues/496 # https://github.com/pdbpp/pdbpp/issues/496
def _set_trace( def _set_trace(
actor: tractor.Actor | None = None, actor: Optional[tractor.Actor] = None,
pdb: MultiActorPdb | None = None, pdb: Optional[MultiActorPdb] = None,
): ):
__tracebackhide__ = True __tracebackhide__ = True
actor = actor or tractor.current_actor() actor = actor or tractor.current_actor()
@ -753,11 +759,7 @@ def _set_trace(
if frame: if frame:
frame = frame.f_back # type: ignore frame = frame.f_back # type: ignore
if ( if frame and pdb and actor is not None:
frame
and pdb
and actor is not None
):
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
# no f!#$&* idea, but when we're in async land # no f!#$&* idea, but when we're in async land
# we need 2x frames up? # we need 2x frames up?
@ -766,8 +768,7 @@ def _set_trace(
else: else:
pdb, undo_sigint = mk_mpdb() pdb, undo_sigint = mk_mpdb()
# we entered the global ``breakpoint()`` built-in from sync # we entered the global ``breakpoint()`` built-in from sync code?
# code?
Lock.local_task_in_debug = 'sync' Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame) pdb.set_trace(frame=frame)
@ -797,7 +798,7 @@ def _post_mortem(
# https://github.com/pdbpp/pdbpp/issues/480 # https://github.com/pdbpp/pdbpp/issues/480
# TODO: help with a 3.10+ major release if/when it arrives. # TODO: help with a 3.10+ major release if/when it arrives.
pdbp.xpm(Pdb=lambda: pdb) pdbpp.xpm(Pdb=lambda: pdb)
post_mortem = partial( post_mortem = partial(

View File

@ -22,9 +22,8 @@ from contextlib import asynccontextmanager
from functools import partial from functools import partial
import importlib import importlib
import logging import logging
import signal
import sys
import os import os
import signal
import typing import typing
import warnings import warnings
@ -85,10 +84,8 @@ async def open_root_actor(
''' '''
# Override the global debugger hook to make it play nice with # Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in: # ``trio``, see:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace' os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
@ -256,16 +253,6 @@ async def open_root_actor(
logger.cancel("Shutting down root actor") logger.cancel("Shutting down root actor")
await actor.cancel() await actor.cancel()
finally: finally:
_state._current_actor = None
# restore breakpoint hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT')
logger.runtime("Root actor terminated") logger.runtime("Root actor terminated")
@ -301,7 +288,7 @@ def run_daemon(
async def _main(): async def _main():
async with open_root_actor( async with open_root_actor(
registry_addr=registry_addr, arbiter_addr=registry_addr,
name=name, name=name,
start_method=start_method, start_method=start_method,
debug_mode=debug_mode, debug_mode=debug_mode,

View File

@ -199,8 +199,8 @@ async def _invoke(
except BaseExceptionGroup: except BaseExceptionGroup:
# if a context error was set then likely # if a context error was set then likely
# thei multierror was raised due to that # thei multierror was raised due to that
if ctx._error is not None: if ctx._remote_ctx_error is not None:
raise ctx._error from None raise ctx._remote_ctx_error from None
raise raise

View File

@ -23,12 +23,13 @@ import sys
import platform import platform
from typing import ( from typing import (
Any, Any,
Awaitable,
Literal, Literal,
Optional,
Callable, Callable,
TypeVar, TypeVar,
TYPE_CHECKING, TYPE_CHECKING,
) )
from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
import trio import trio
@ -59,7 +60,7 @@ if TYPE_CHECKING:
log = get_logger('tractor') log = get_logger('tractor')
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
_ctx: mp.context.BaseContext | None = None _ctx: Optional[mp.context.BaseContext] = None
SpawnMethodKey = Literal[ SpawnMethodKey = Literal[
'trio', # supported on all platforms 'trio', # supported on all platforms
'mp_spawn', 'mp_spawn',
@ -85,7 +86,7 @@ else:
def try_set_start_method( def try_set_start_method(
key: SpawnMethodKey key: SpawnMethodKey
) -> mp.context.BaseContext | None: ) -> Optional[mp.context.BaseContext]:
''' '''
Attempt to set the method for process starting, aka the "actor Attempt to set the method for process starting, aka the "actor
spawning backend". spawning backend".
@ -199,37 +200,16 @@ async def cancel_on_completion(
async def do_hard_kill( async def do_hard_kill(
proc: trio.Process, proc: trio.Process,
terminate_after: int = 3, terminate_after: int = 3,
) -> None: ) -> None:
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as # never release until the process exits, now it acts as
# a hard-kill time ultimatum. # a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs: with trio.move_on_after(terminate_after) as cs:
# NOTE: code below was copied verbatim from the now deprecated # NOTE: This ``__aexit__()`` shields internally.
# (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc async with proc: # calls ``trio.Process.aclose()``
# string: log.debug(f"Terminating {proc}")
#
# Close any pipes we have to the process (both input and output)
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
if proc.stdout is not None:
await proc.stdout.aclose()
if proc.stderr is not None:
await proc.stderr.aclose()
try:
await proc.wait()
finally:
if proc.returncode is None:
proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have # XXX: should pretty much never get here unless we have
@ -375,11 +355,12 @@ async def trio_proc(
spawn_cmd.append("--asyncio") spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False cancelled_during_spawn: bool = False
proc: trio.Process | None = None proc: Optional[trio.Process] = None
try: try:
try: try:
# TODO: needs ``trio_typing`` patch? # TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process(spawn_cmd) proc = await trio.lowlevel.open_process( # type: ignore
spawn_cmd)
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
@ -463,8 +444,8 @@ async def trio_proc(
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
finally: finally:
# XXX NOTE XXX: The "hard" reap since no actor zombies are # The "hard" reap since no actor zombies are allowed!
# allowed! Do this **after** cancellation/teardown to avoid # XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early. # killing the process too early.
if proc: if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}') log.cancel(f'Hard reap sequence starting for {subactor.uid}')

View File

@ -27,7 +27,8 @@ from typing import (
Optional, Optional,
Callable, Callable,
AsyncGenerator, AsyncGenerator,
AsyncIterator AsyncIterator,
TYPE_CHECKING,
) )
import warnings import warnings
@ -41,6 +42,10 @@ from .log import get_logger
from .trionics import broadcast_receiver, BroadcastReceiver from .trionics import broadcast_receiver, BroadcastReceiver
if TYPE_CHECKING:
from ._portal import Portal
log = get_logger(__name__) log = get_logger(__name__)
@ -70,7 +75,7 @@ class MsgStream(trio.abc.Channel):
''' '''
def __init__( def __init__(
self, self,
ctx: 'Context', # typing: ignore # noqa ctx: Context, # typing: ignore # noqa
rx_chan: trio.MemoryReceiveChannel, rx_chan: trio.MemoryReceiveChannel,
_broadcaster: Optional[BroadcastReceiver] = None, _broadcaster: Optional[BroadcastReceiver] = None,
@ -83,6 +88,9 @@ class MsgStream(trio.abc.Channel):
self._eoc: bool = False self._eoc: bool = False
self._closed: bool = False self._closed: bool = False
def ctx(self) -> Context:
return self._ctx
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait(self): def receive_nowait(self):
msg = self._rx_chan.receive_nowait() msg = self._rx_chan.receive_nowait()
@ -278,7 +286,6 @@ class MsgStream(trio.abc.Channel):
@asynccontextmanager @asynccontextmanager
async def subscribe( async def subscribe(
self, self,
) -> AsyncIterator[BroadcastReceiver]: ) -> AsyncIterator[BroadcastReceiver]:
''' '''
Allocate and return a ``BroadcastReceiver`` which delegates Allocate and return a ``BroadcastReceiver`` which delegates
@ -335,8 +342,8 @@ class MsgStream(trio.abc.Channel):
Send a message over this stream to the far end. Send a message over this stream to the far end.
''' '''
if self._ctx._error: if self._ctx._remote_ctx_error:
raise self._ctx._error # from None raise self._ctx._remote_ctx_error # from None
if self._closed: if self._closed:
raise trio.ClosedResourceError('This stream was already closed') raise trio.ClosedResourceError('This stream was already closed')
@ -375,9 +382,10 @@ class Context:
_remote_func_type: Optional[str] = None _remote_func_type: Optional[str] = None
# only set on the caller side # only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa _portal: Optional[Portal] = None # type: ignore # noqa
_stream: Optional[MsgStream] = None
_result: Optional[Any] = False _result: Optional[Any] = False
_error: Optional[BaseException] = None _remote_ctx_error: Optional[BaseException] = None
# status flags # status flags
_cancel_called: bool = False _cancel_called: bool = False
@ -390,7 +398,7 @@ class Context:
# only set on the callee side # only set on the callee side
_scope_nursery: Optional[trio.Nursery] = None _scope_nursery: Optional[trio.Nursery] = None
_backpressure: bool = False _backpressure: bool = True
async def send_yield(self, data: Any) -> None: async def send_yield(self, data: Any) -> None:
@ -435,21 +443,26 @@ class Context:
# (currently) that other portal APIs (``Portal.run()``, # (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point # ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing. # of the call and result processing.
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
error = unpack_error(msg, self.chan) error = unpack_error(msg, self.chan)
if ( if (
isinstance(error, ContextCancelled) and isinstance(error, ContextCancelled)
self._cancel_called
): ):
# this is an expected cancel request response message log.cancel(
# and we don't need to raise it in scope since it will f'Remote context error for {self.chan.uid}:{self.cid}:\n'
# potentially override a real error f'{msg["error"]["tb_str"]}'
return )
if self._cancel_called:
# this is an expected cancel request response message
# and we don't need to raise it in scope since it will
# potentially override a real error
return
else:
log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{msg["error"]["tb_str"]}'
)
self._error = error self._remote_ctx_error = error
# TODO: tempted to **not** do this by-reraising in a # TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect # nursery and instead cancel a surrounding scope, detect
@ -457,7 +470,7 @@ class Context:
if self._scope_nursery: if self._scope_nursery:
async def raiser(): async def raiser():
raise self._error from None raise self._remote_ctx_error from None
# from trio.testing import wait_all_tasks_blocked # from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked() # await wait_all_tasks_blocked()
@ -483,6 +496,7 @@ class Context:
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}') log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True self._cancel_called = True
ipc_broken: bool = False
if side == 'caller': if side == 'caller':
if not self._portal: if not self._portal:
@ -500,7 +514,14 @@ class Context:
# NOTE: we're telling the far end actor to cancel a task # NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel # corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly. # instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns('self', '_cancel_task', cid=cid) try:
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
except trio.BrokenResourceError:
ipc_broken = True
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed # XXX: there's no way to know if the remote task was indeed
@ -516,7 +537,10 @@ class Context:
"Timed out on cancelling remote task " "Timed out on cancelling remote task "
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
# callee side remote task elif ipc_broken:
log.cancel(
"Transport layer was broken before cancel request "
f"{cid} for {self._portal.channel.uid}")
else: else:
self._cancel_msg = msg self._cancel_msg = msg
@ -604,6 +628,7 @@ class Context:
ctx=self, ctx=self,
rx_chan=ctx._recv_chan, rx_chan=ctx._recv_chan,
) as stream: ) as stream:
self._stream = stream
if self._portal: if self._portal:
self._portal._streams.add(stream) self._portal._streams.add(stream)
@ -645,25 +670,22 @@ class Context:
if not self._recv_chan._closed: # type: ignore if not self._recv_chan._closed: # type: ignore
# wait for a final context result consuming def consume(
# and discarding any bi dir stream msgs still msg: dict,
# in transit from the far end.
while True:
msg = await self._recv_chan.receive() ) -> Optional[dict]:
try: try:
self._result = msg['return'] return msg['return']
break
except KeyError as msgerr: except KeyError as msgerr:
if 'yield' in msg: if 'yield' in msg:
# far end task is still streaming to us so discard # far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}') log.warning(f'Discarding stream delivered {msg}')
continue return
elif 'stop' in msg: elif 'stop' in msg:
log.debug('Remote stream terminated') log.debug('Remote stream terminated')
continue return
# internal error should never get here # internal error should never get here
assert msg.get('cid'), ( assert msg.get('cid'), (
@ -673,6 +695,25 @@ class Context:
msg, self._portal.channel msg, self._portal.channel
) from msgerr ) from msgerr
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
if self._stream:
async with self._stream.subscribe() as bstream:
async for msg in bstream:
result = consume(msg)
if result:
self._result = result
break
if not self._result:
while True:
msg = await self._recv_chan.receive()
result = consume(msg)
if result:
self._result = result
break
return self._result return self._result
async def started( async def started(

View File

@ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]: ) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay? # TODO: yay or nay?
__tracebackhide__ = True # __tracebackhide__ = True
# the collection of errors retreived from spawned sub-actors # the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {} errors: dict[tuple[str, str], BaseException] = {}