Compare commits
6 Commits
master
...
ctx_result
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 6120e99d7e | |
Tyler Goodlet | 33b0e36ad6 | |
Tyler Goodlet | a71a958f54 | |
Tyler Goodlet | c9eb466d76 | |
Tyler Goodlet | f7a1f3832f | |
Tyler Goodlet | 3f2e33a120 |
|
@ -6,14 +6,8 @@
|
||||||
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
|
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
|
||||||
built on trio_.
|
built on trio_.
|
||||||
|
|
||||||
Fundamentally, ``tractor`` gives you parallelism via
|
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
|
||||||
``trio``-"*actors*": independent Python processes (aka
|
our nurseries_ let you spawn new Python processes which each run a ``trio``
|
||||||
non-shared-memory threads) which maintain structured
|
|
||||||
concurrency (SC) *end-to-end* inside a *supervision tree*.
|
|
||||||
|
|
||||||
Cross-process (and thus cross-host) SC is accomplished through the
|
|
||||||
combined use of our "actor nurseries_" and an "SC-transitive IPC
|
|
||||||
protocol" constructed on top of multiple Pythons each running a ``trio``
|
|
||||||
scheduled runtime - a call to ``trio.run()``.
|
scheduled runtime - a call to ``trio.run()``.
|
||||||
|
|
||||||
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
|
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
|
||||||
|
@ -29,8 +23,7 @@ Features
|
||||||
- **It's just** a ``trio`` API
|
- **It's just** a ``trio`` API
|
||||||
- *Infinitely nesteable* process trees
|
- *Infinitely nesteable* process trees
|
||||||
- Builtin IPC streaming APIs with task fan-out broadcasting
|
- Builtin IPC streaming APIs with task fan-out broadcasting
|
||||||
- A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of
|
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
|
||||||
`pdb++`_ thanks to @mdmintz!)
|
|
||||||
- Support for a swappable, OS specific, process spawning layer
|
- Support for a swappable, OS specific, process spawning layer
|
||||||
- A modular transport stack, allowing for custom serialization (eg. with
|
- A modular transport stack, allowing for custom serialization (eg. with
|
||||||
`msgspec`_), communications protocols, and environment specific IPC
|
`msgspec`_), communications protocols, and environment specific IPC
|
||||||
|
@ -125,7 +118,7 @@ Zombie safe: self-destruct a process tree
|
||||||
f"running in pid {os.getpid()}"
|
f"running in pid {os.getpid()}"
|
||||||
)
|
)
|
||||||
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -156,7 +149,7 @@ it **is a bug**.
|
||||||
|
|
||||||
"Native" multi-process debugging
|
"Native" multi-process debugging
|
||||||
--------------------------------
|
--------------------------------
|
||||||
Using the magic of `pdbp`_ and our internal IPC, we've
|
Using the magic of `pdb++`_ and our internal IPC, we've
|
||||||
been able to create a native feeling debugging experience for
|
been able to create a native feeling debugging experience for
|
||||||
any (sub-)process in your ``tractor`` tree.
|
any (sub-)process in your ``tractor`` tree.
|
||||||
|
|
||||||
|
@ -604,7 +597,6 @@ channel`_!
|
||||||
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
|
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
|
||||||
.. _trio gitter channel: https://gitter.im/python-trio/general
|
.. _trio gitter channel: https://gitter.im/python-trio/general
|
||||||
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
|
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
|
||||||
.. _pdbp: https://github.com/mdmintz/pdbp
|
|
||||||
.. _pdb++: https://github.com/pdbpp/pdbpp
|
.. _pdb++: https://github.com/pdbpp/pdbpp
|
||||||
.. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
|
.. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
|
||||||
.. _messages: https://en.wikipedia.org/wiki/Message_passing
|
.. _messages: https://en.wikipedia.org/wiki/Message_passing
|
||||||
|
|
|
@ -1,24 +0,0 @@
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
import trio
|
|
||||||
import tractor
|
|
||||||
|
|
||||||
|
|
||||||
async def main() -> None:
|
|
||||||
async with tractor.open_nursery(debug_mode=True) as an:
|
|
||||||
|
|
||||||
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
|
|
||||||
|
|
||||||
# TODO: an assert that verifies the hook has indeed been, hooked
|
|
||||||
# XD
|
|
||||||
assert sys.breakpointhook is not tractor._debug._set_trace
|
|
||||||
|
|
||||||
breakpoint()
|
|
||||||
|
|
||||||
# TODO: an assert that verifies the hook is unhooked..
|
|
||||||
assert sys.breakpointhook
|
|
||||||
breakpoint()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
trio.run(main)
|
|
|
@ -1,7 +0,0 @@
|
||||||
Drop `trio.Process.aclose()` usage, copy into our spawning code.
|
|
||||||
|
|
||||||
The details are laid out in https://github.com/goodboy/tractor/issues/330.
|
|
||||||
`trio` changed is process running quite some time ago, this just copies
|
|
||||||
out the small bit we needed (from the old `.aclose()`) for hard kills
|
|
||||||
where a soft runtime cancel request fails and our "zombie killer"
|
|
||||||
implementation kicks in.
|
|
|
@ -1,15 +0,0 @@
|
||||||
Switch to using the fork & fix of `pdb++`, `pdbp`:
|
|
||||||
https://github.com/mdmintz/pdbp
|
|
||||||
|
|
||||||
Allows us to sidestep a variety of issues that aren't being maintained
|
|
||||||
in the upstream project thanks to the hard work of @mdmintz!
|
|
||||||
|
|
||||||
We also include some default settings adjustments as per recent
|
|
||||||
development on the fork:
|
|
||||||
|
|
||||||
- sticky mode is still turned on by default but now activates when
|
|
||||||
a using the `ll` repl command.
|
|
||||||
- turn off line truncation by default to avoid inter-line gaps when
|
|
||||||
resizing the terimnal during use.
|
|
||||||
- when using the backtrace cmd either by `w` or `bt`, the config
|
|
||||||
automatically switches to non-sticky mode.
|
|
|
@ -1,7 +1,7 @@
|
||||||
pytest
|
pytest
|
||||||
pytest-trio
|
pytest-trio
|
||||||
pytest-timeout
|
pytest-timeout
|
||||||
pdbp
|
pdbpp
|
||||||
mypy
|
mypy
|
||||||
trio_typing
|
trio_typing
|
||||||
pexpect
|
pexpect
|
||||||
|
|
13
setup.py
13
setup.py
|
@ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f:
|
||||||
setup(
|
setup(
|
||||||
name="tractor",
|
name="tractor",
|
||||||
version='0.1.0a6dev0', # alpha zone
|
version='0.1.0a6dev0', # alpha zone
|
||||||
description='structured concurrrent `trio`-"actors"',
|
description='structured concurrrent "actors"',
|
||||||
long_description=readme,
|
long_description=readme,
|
||||||
license='AGPLv3',
|
license='AGPLv3',
|
||||||
author='Tyler Goodlet',
|
author='Tyler Goodlet',
|
||||||
maintainer='Tyler Goodlet',
|
maintainer='Tyler Goodlet',
|
||||||
maintainer_email='goodboy_foss@protonmail.com',
|
maintainer_email='jgbt@protonmail.com',
|
||||||
url='https://github.com/goodboy/tractor',
|
url='https://github.com/goodboy/tractor',
|
||||||
platforms=['linux', 'windows'],
|
platforms=['linux', 'windows'],
|
||||||
packages=[
|
packages=[
|
||||||
|
@ -52,14 +52,16 @@ setup(
|
||||||
# tooling
|
# tooling
|
||||||
'tricycle',
|
'tricycle',
|
||||||
'trio_typing',
|
'trio_typing',
|
||||||
|
|
||||||
|
# tooling
|
||||||
'colorlog',
|
'colorlog',
|
||||||
'wrapt',
|
'wrapt',
|
||||||
|
|
||||||
# IPC serialization
|
# serialization
|
||||||
'msgspec',
|
'msgspec',
|
||||||
|
|
||||||
# debug mode REPL
|
# debug mode REPL
|
||||||
'pdbp',
|
'pdbpp',
|
||||||
|
|
||||||
# pip ref docs on these specs:
|
# pip ref docs on these specs:
|
||||||
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
|
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
|
||||||
|
@ -71,9 +73,10 @@ setup(
|
||||||
# https://github.com/pdbpp/fancycompleter/issues/37
|
# https://github.com/pdbpp/fancycompleter/issues/37
|
||||||
'pyreadline3 ; platform_system == "Windows"',
|
'pyreadline3 ; platform_system == "Windows"',
|
||||||
|
|
||||||
|
|
||||||
],
|
],
|
||||||
tests_require=['pytest'],
|
tests_require=['pytest'],
|
||||||
python_requires=">=3.10",
|
python_requires=">=3.9",
|
||||||
keywords=[
|
keywords=[
|
||||||
'trio',
|
'trio',
|
||||||
'async',
|
'async',
|
||||||
|
|
|
@ -95,7 +95,7 @@ def spawn(
|
||||||
return _spawn
|
return _spawn
|
||||||
|
|
||||||
|
|
||||||
PROMPT = r"\(Pdb\+\)"
|
PROMPT = r"\(Pdb\+\+\)"
|
||||||
|
|
||||||
|
|
||||||
def expect(
|
def expect(
|
||||||
|
@ -151,6 +151,18 @@ def ctlc(
|
||||||
|
|
||||||
use_ctlc = request.param
|
use_ctlc = request.param
|
||||||
|
|
||||||
|
if (
|
||||||
|
sys.version_info <= (3, 10)
|
||||||
|
and use_ctlc
|
||||||
|
):
|
||||||
|
# on 3.9 it seems the REPL UX
|
||||||
|
# is highly unreliable and frankly annoying
|
||||||
|
# to test for. It does work from manual testing
|
||||||
|
# but i just don't think it's wroth it to try
|
||||||
|
# and get this working especially since we want to
|
||||||
|
# be 3.10+ mega-asap.
|
||||||
|
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
|
||||||
|
|
||||||
node = request.node
|
node = request.node
|
||||||
markers = node.own_markers
|
markers = node.own_markers
|
||||||
for mark in markers:
|
for mark in markers:
|
||||||
|
@ -181,15 +193,13 @@ def ctlc(
|
||||||
ids=lambda item: f'{item[0]} -> {item[1]}',
|
ids=lambda item: f'{item[0]} -> {item[1]}',
|
||||||
)
|
)
|
||||||
def test_root_actor_error(spawn, user_in_out):
|
def test_root_actor_error(spawn, user_in_out):
|
||||||
'''
|
"""Demonstrate crash handler entering pdbpp from basic error in root actor.
|
||||||
Demonstrate crash handler entering pdb from basic error in root actor.
|
"""
|
||||||
|
|
||||||
'''
|
|
||||||
user_input, expect_err_str = user_in_out
|
user_input, expect_err_str = user_in_out
|
||||||
|
|
||||||
child = spawn('root_actor_error')
|
child = spawn('root_actor_error')
|
||||||
|
|
||||||
# scan for the prompt
|
# scan for the pdbpp prompt
|
||||||
expect(child, PROMPT)
|
expect(child, PROMPT)
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
@ -220,8 +230,8 @@ def test_root_actor_bp(spawn, user_in_out):
|
||||||
user_input, expect_err_str = user_in_out
|
user_input, expect_err_str = user_in_out
|
||||||
child = spawn('root_actor_breakpoint')
|
child = spawn('root_actor_breakpoint')
|
||||||
|
|
||||||
# scan for the prompt
|
# scan for the pdbpp prompt
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
assert 'Error' not in str(child.before)
|
assert 'Error' not in str(child.before)
|
||||||
|
|
||||||
|
@ -262,7 +272,7 @@ def do_ctlc(
|
||||||
if expect_prompt:
|
if expect_prompt:
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
|
|
||||||
if patt:
|
if patt:
|
||||||
|
@ -281,7 +291,7 @@ def test_root_actor_bp_forever(
|
||||||
# entries
|
# entries
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
|
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
if ctlc:
|
if ctlc:
|
||||||
do_ctlc(child)
|
do_ctlc(child)
|
||||||
|
@ -291,7 +301,7 @@ def test_root_actor_bp_forever(
|
||||||
# do one continue which should trigger a
|
# do one continue which should trigger a
|
||||||
# new task to lock the tty
|
# new task to lock the tty
|
||||||
child.sendline('continue')
|
child.sendline('continue')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# seems that if we hit ctrl-c too fast the
|
# seems that if we hit ctrl-c too fast the
|
||||||
# sigint guard machinery might not kick in..
|
# sigint guard machinery might not kick in..
|
||||||
|
@ -302,10 +312,10 @@ def test_root_actor_bp_forever(
|
||||||
|
|
||||||
# XXX: this previously caused a bug!
|
# XXX: this previously caused a bug!
|
||||||
child.sendline('n')
|
child.sendline('n')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
child.sendline('n')
|
child.sendline('n')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# quit out of the loop
|
# quit out of the loop
|
||||||
child.sendline('q')
|
child.sendline('q')
|
||||||
|
@ -328,8 +338,8 @@ def test_subactor_error(
|
||||||
'''
|
'''
|
||||||
child = spawn('subactor_error')
|
child = spawn('subactor_error')
|
||||||
|
|
||||||
# scan for the prompt
|
# scan for the pdbpp prompt
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "Attaching to pdb in crashed actor: ('name_error'" in before
|
assert "Attaching to pdb in crashed actor: ('name_error'" in before
|
||||||
|
@ -349,7 +359,7 @@ def test_subactor_error(
|
||||||
# creating actor
|
# creating actor
|
||||||
child.sendline('continue')
|
child.sendline('continue')
|
||||||
|
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
|
||||||
# root actor gets debugger engaged
|
# root actor gets debugger engaged
|
||||||
|
@ -376,8 +386,8 @@ def test_subactor_breakpoint(
|
||||||
|
|
||||||
child = spawn('subactor_breakpoint')
|
child = spawn('subactor_breakpoint')
|
||||||
|
|
||||||
# scan for the prompt
|
# scan for the pdbpp prompt
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
||||||
|
@ -386,7 +396,7 @@ def test_subactor_breakpoint(
|
||||||
# entries
|
# entries
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
child.sendline('next')
|
child.sendline('next')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
if ctlc:
|
if ctlc:
|
||||||
do_ctlc(child)
|
do_ctlc(child)
|
||||||
|
@ -394,7 +404,7 @@ def test_subactor_breakpoint(
|
||||||
# now run some "continues" to show re-entries
|
# now run some "continues" to show re-entries
|
||||||
for _ in range(5):
|
for _ in range(5):
|
||||||
child.sendline('continue')
|
child.sendline('continue')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
||||||
|
|
||||||
|
@ -405,7 +415,7 @@ def test_subactor_breakpoint(
|
||||||
child.sendline('q')
|
child.sendline('q')
|
||||||
|
|
||||||
# child process should exit but parent will capture pdb.BdbQuit
|
# child process should exit but parent will capture pdb.BdbQuit
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "RemoteActorError: ('breakpoint_forever'" in before
|
assert "RemoteActorError: ('breakpoint_forever'" in before
|
||||||
|
@ -437,8 +447,8 @@ def test_multi_subactors(
|
||||||
'''
|
'''
|
||||||
child = spawn(r'multi_subactors')
|
child = spawn(r'multi_subactors')
|
||||||
|
|
||||||
# scan for the prompt
|
# scan for the pdbpp prompt
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
||||||
|
@ -450,7 +460,7 @@ def test_multi_subactors(
|
||||||
# entries
|
# entries
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
child.sendline('next')
|
child.sendline('next')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
if ctlc:
|
if ctlc:
|
||||||
do_ctlc(child)
|
do_ctlc(child)
|
||||||
|
@ -459,7 +469,7 @@ def test_multi_subactors(
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
|
||||||
# first name_error failure
|
# first name_error failure
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "Attaching to pdb in crashed actor: ('name_error'" in before
|
assert "Attaching to pdb in crashed actor: ('name_error'" in before
|
||||||
assert "NameError" in before
|
assert "NameError" in before
|
||||||
|
@ -471,7 +481,7 @@ def test_multi_subactors(
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
|
||||||
# 2nd name_error failure
|
# 2nd name_error failure
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# TODO: will we ever get the race where this crash will show up?
|
# TODO: will we ever get the race where this crash will show up?
|
||||||
# blocklist strat now prevents this crash
|
# blocklist strat now prevents this crash
|
||||||
|
@ -485,7 +495,7 @@ def test_multi_subactors(
|
||||||
|
|
||||||
# breakpoint loop should re-engage
|
# breakpoint loop should re-engage
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
|
||||||
|
|
||||||
|
@ -501,7 +511,7 @@ def test_multi_subactors(
|
||||||
):
|
):
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
|
||||||
if ctlc:
|
if ctlc:
|
||||||
|
@ -520,11 +530,11 @@ def test_multi_subactors(
|
||||||
# now run some "continues" to show re-entries
|
# now run some "continues" to show re-entries
|
||||||
for _ in range(5):
|
for _ in range(5):
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# quit the loop and expect parent to attach
|
# quit the loop and expect parent to attach
|
||||||
child.sendline('q')
|
child.sendline('q')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
|
||||||
assert_before(child, [
|
assert_before(child, [
|
||||||
|
@ -568,7 +578,7 @@ def test_multi_daemon_subactors(
|
||||||
'''
|
'''
|
||||||
child = spawn('multi_daemon_subactors')
|
child = spawn('multi_daemon_subactors')
|
||||||
|
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# there can be a race for which subactor will acquire
|
# there can be a race for which subactor will acquire
|
||||||
# the root's tty lock first so anticipate either crash
|
# the root's tty lock first so anticipate either crash
|
||||||
|
@ -598,7 +608,7 @@ def test_multi_daemon_subactors(
|
||||||
# second entry by `bp_forever`.
|
# second entry by `bp_forever`.
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
assert_before(child, [next_msg])
|
assert_before(child, [next_msg])
|
||||||
|
|
||||||
# XXX: hooray the root clobbering the child here was fixed!
|
# XXX: hooray the root clobbering the child here was fixed!
|
||||||
|
@ -620,7 +630,7 @@ def test_multi_daemon_subactors(
|
||||||
|
|
||||||
# expect another breakpoint actor entry
|
# expect another breakpoint actor entry
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
assert_before(child, [bp_forever_msg])
|
assert_before(child, [bp_forever_msg])
|
||||||
|
@ -636,7 +646,7 @@ def test_multi_daemon_subactors(
|
||||||
# after 1 or more further bp actor entries.
|
# after 1 or more further bp actor entries.
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
assert_before(child, [name_error_msg])
|
assert_before(child, [name_error_msg])
|
||||||
|
|
||||||
# wait for final error in root
|
# wait for final error in root
|
||||||
|
@ -644,7 +654,7 @@ def test_multi_daemon_subactors(
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
assert_before(
|
assert_before(
|
||||||
child,
|
child,
|
||||||
[bp_forever_msg]
|
[bp_forever_msg]
|
||||||
|
@ -677,8 +687,8 @@ def test_multi_subactors_root_errors(
|
||||||
'''
|
'''
|
||||||
child = spawn('multi_subactor_root_errors')
|
child = spawn('multi_subactor_root_errors')
|
||||||
|
|
||||||
# scan for the prompt
|
# scan for the pdbpp prompt
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# at most one subactor should attach before the root is cancelled
|
# at most one subactor should attach before the root is cancelled
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
@ -693,7 +703,7 @@ def test_multi_subactors_root_errors(
|
||||||
|
|
||||||
# due to block list strat from #337, this will no longer
|
# due to block list strat from #337, this will no longer
|
||||||
# propagate before the root errors and cancels the spawner sub-tree.
|
# propagate before the root errors and cancels the spawner sub-tree.
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# only if the blocking condition doesn't kick in fast enough
|
# only if the blocking condition doesn't kick in fast enough
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
@ -708,7 +718,7 @@ def test_multi_subactors_root_errors(
|
||||||
do_ctlc(child)
|
do_ctlc(child)
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# check if the spawner crashed or was blocked from debug
|
# check if the spawner crashed or was blocked from debug
|
||||||
# and if this intermediary attached check the boxed error
|
# and if this intermediary attached check the boxed error
|
||||||
|
@ -725,7 +735,7 @@ def test_multi_subactors_root_errors(
|
||||||
do_ctlc(child)
|
do_ctlc(child)
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# expect a root actor crash
|
# expect a root actor crash
|
||||||
assert_before(child, [
|
assert_before(child, [
|
||||||
|
@ -774,7 +784,7 @@ def test_multi_nested_subactors_error_through_nurseries(
|
||||||
|
|
||||||
for send_char in itertools.cycle(['c', 'q']):
|
for send_char in itertools.cycle(['c', 'q']):
|
||||||
try:
|
try:
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
child.sendline(send_char)
|
child.sendline(send_char)
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
@ -816,7 +826,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
||||||
|
|
||||||
child = spawn('root_cancelled_but_child_is_in_tty_lock')
|
child = spawn('root_cancelled_but_child_is_in_tty_lock')
|
||||||
|
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "NameError: name 'doggypants' is not defined" in before
|
assert "NameError: name 'doggypants' is not defined" in before
|
||||||
|
@ -831,7 +841,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
||||||
for i in range(4):
|
for i in range(4):
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
try:
|
try:
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
except (
|
except (
|
||||||
EOF,
|
EOF,
|
||||||
|
@ -888,7 +898,7 @@ def test_root_cancels_child_context_during_startup(
|
||||||
'''
|
'''
|
||||||
child = spawn('fast_error_in_root_after_spawn')
|
child = spawn('fast_error_in_root_after_spawn')
|
||||||
|
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "AssertionError" in before
|
assert "AssertionError" in before
|
||||||
|
@ -905,7 +915,7 @@ def test_different_debug_mode_per_actor(
|
||||||
ctlc: bool,
|
ctlc: bool,
|
||||||
):
|
):
|
||||||
child = spawn('per_actor_debug')
|
child = spawn('per_actor_debug')
|
||||||
child.expect(PROMPT)
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
# only one actor should enter the debugger
|
# only one actor should enter the debugger
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
|
|
@ -44,10 +44,7 @@ from ._exceptions import (
|
||||||
ModuleNotExposed,
|
ModuleNotExposed,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
)
|
)
|
||||||
from ._debug import (
|
from ._debug import breakpoint, post_mortem
|
||||||
breakpoint,
|
|
||||||
post_mortem,
|
|
||||||
)
|
|
||||||
from . import msg
|
from . import msg
|
||||||
from ._root import (
|
from ._root import (
|
||||||
run_daemon,
|
run_daemon,
|
||||||
|
|
|
@ -37,7 +37,6 @@ from typing import (
|
||||||
)
|
)
|
||||||
from types import FrameType
|
from types import FrameType
|
||||||
|
|
||||||
import pdbp
|
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -54,6 +53,17 @@ from ._exceptions import (
|
||||||
)
|
)
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
# wtf: only exported when installed in dev mode?
|
||||||
|
import pdbpp
|
||||||
|
except ImportError:
|
||||||
|
# pdbpp is installed in regular mode...it monkey patches stuff
|
||||||
|
import pdb
|
||||||
|
xpm = getattr(pdb, 'xpm', None)
|
||||||
|
assert xpm, "pdbpp is not installed?" # type: ignore
|
||||||
|
pdbpp = pdb
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -144,26 +154,22 @@ class Lock:
|
||||||
cls.repl = None
|
cls.repl = None
|
||||||
|
|
||||||
|
|
||||||
class TractorConfig(pdbp.DefaultConfig):
|
class TractorConfig(pdbpp.DefaultConfig):
|
||||||
'''
|
'''
|
||||||
Custom ``pdbp`` goodness :surfer:
|
Custom ``pdbpp`` goodness.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
use_pygments: bool = True
|
# use_pygments = True
|
||||||
sticky_by_default: bool = False
|
# sticky_by_default = True
|
||||||
enable_hidden_frames: bool = False
|
enable_hidden_frames = False
|
||||||
|
|
||||||
# much thanks @mdmintz for the hot tip!
|
|
||||||
# fixes line spacing issue when resizing terminal B)
|
|
||||||
truncate_long_lines: bool = False
|
|
||||||
|
|
||||||
|
|
||||||
class MultiActorPdb(pdbp.Pdb):
|
class MultiActorPdb(pdbpp.Pdb):
|
||||||
'''
|
'''
|
||||||
Add teardown hooks to the regular ``pdbp.Pdb``.
|
Add teardown hooks to the regular ``pdbpp.Pdb``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# override the pdbp config with our coolio one
|
# override the pdbpp config with our coolio one
|
||||||
DefaultConfig = TractorConfig
|
DefaultConfig = TractorConfig
|
||||||
|
|
||||||
# def preloop(self):
|
# def preloop(self):
|
||||||
|
@ -307,7 +313,7 @@ async def lock_tty_for_child(
|
||||||
) -> str:
|
) -> str:
|
||||||
'''
|
'''
|
||||||
Lock the TTY in the root process of an actor tree in a new
|
Lock the TTY in the root process of an actor tree in a new
|
||||||
inter-actor-context-task such that the ``pdbp`` debugger console
|
inter-actor-context-task such that the ``pdbpp`` debugger console
|
||||||
can be mutex-allocated to the calling sub-actor for REPL control
|
can be mutex-allocated to the calling sub-actor for REPL control
|
||||||
without interference by other processes / threads.
|
without interference by other processes / threads.
|
||||||
|
|
||||||
|
@ -427,7 +433,7 @@ async def wait_for_parent_stdin_hijack(
|
||||||
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
|
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
|
||||||
|
|
||||||
pdb = MultiActorPdb()
|
pdb = MultiActorPdb()
|
||||||
# signal.signal = pdbp.hideframe(signal.signal)
|
# signal.signal = pdbpp.hideframe(signal.signal)
|
||||||
|
|
||||||
Lock.shield_sigint()
|
Lock.shield_sigint()
|
||||||
|
|
||||||
|
@ -577,7 +583,7 @@ async def _breakpoint(
|
||||||
# # frame = sys._getframe()
|
# # frame = sys._getframe()
|
||||||
# # last_f = frame.f_back
|
# # last_f = frame.f_back
|
||||||
# # last_f.f_globals['__tracebackhide__'] = True
|
# # last_f.f_globals['__tracebackhide__'] = True
|
||||||
# # signal.signal = pdbp.hideframe(signal.signal)
|
# # signal.signal = pdbpp.hideframe(signal.signal)
|
||||||
|
|
||||||
|
|
||||||
def shield_sigint_handler(
|
def shield_sigint_handler(
|
||||||
|
@ -737,13 +743,13 @@ def shield_sigint_handler(
|
||||||
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
|
||||||
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
|
||||||
|
|
||||||
# XXX LEGACY: lol, see ``pdbpp`` issue:
|
# XXX: lol, see ``pdbpp`` issue:
|
||||||
# https://github.com/pdbpp/pdbpp/issues/496
|
# https://github.com/pdbpp/pdbpp/issues/496
|
||||||
|
|
||||||
|
|
||||||
def _set_trace(
|
def _set_trace(
|
||||||
actor: tractor.Actor | None = None,
|
actor: Optional[tractor.Actor] = None,
|
||||||
pdb: MultiActorPdb | None = None,
|
pdb: Optional[MultiActorPdb] = None,
|
||||||
):
|
):
|
||||||
__tracebackhide__ = True
|
__tracebackhide__ = True
|
||||||
actor = actor or tractor.current_actor()
|
actor = actor or tractor.current_actor()
|
||||||
|
@ -753,11 +759,7 @@ def _set_trace(
|
||||||
if frame:
|
if frame:
|
||||||
frame = frame.f_back # type: ignore
|
frame = frame.f_back # type: ignore
|
||||||
|
|
||||||
if (
|
if frame and pdb and actor is not None:
|
||||||
frame
|
|
||||||
and pdb
|
|
||||||
and actor is not None
|
|
||||||
):
|
|
||||||
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
|
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
|
||||||
# no f!#$&* idea, but when we're in async land
|
# no f!#$&* idea, but when we're in async land
|
||||||
# we need 2x frames up?
|
# we need 2x frames up?
|
||||||
|
@ -766,8 +768,7 @@ def _set_trace(
|
||||||
else:
|
else:
|
||||||
pdb, undo_sigint = mk_mpdb()
|
pdb, undo_sigint = mk_mpdb()
|
||||||
|
|
||||||
# we entered the global ``breakpoint()`` built-in from sync
|
# we entered the global ``breakpoint()`` built-in from sync code?
|
||||||
# code?
|
|
||||||
Lock.local_task_in_debug = 'sync'
|
Lock.local_task_in_debug = 'sync'
|
||||||
|
|
||||||
pdb.set_trace(frame=frame)
|
pdb.set_trace(frame=frame)
|
||||||
|
@ -797,7 +798,7 @@ def _post_mortem(
|
||||||
# https://github.com/pdbpp/pdbpp/issues/480
|
# https://github.com/pdbpp/pdbpp/issues/480
|
||||||
# TODO: help with a 3.10+ major release if/when it arrives.
|
# TODO: help with a 3.10+ major release if/when it arrives.
|
||||||
|
|
||||||
pdbp.xpm(Pdb=lambda: pdb)
|
pdbpp.xpm(Pdb=lambda: pdb)
|
||||||
|
|
||||||
|
|
||||||
post_mortem = partial(
|
post_mortem = partial(
|
||||||
|
|
|
@ -22,9 +22,8 @@ from contextlib import asynccontextmanager
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import importlib
|
import importlib
|
||||||
import logging
|
import logging
|
||||||
import signal
|
|
||||||
import sys
|
|
||||||
import os
|
import os
|
||||||
|
import signal
|
||||||
import typing
|
import typing
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
|
@ -85,10 +84,8 @@ async def open_root_actor(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# Override the global debugger hook to make it play nice with
|
# Override the global debugger hook to make it play nice with
|
||||||
# ``trio``, see much discussion in:
|
# ``trio``, see:
|
||||||
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
|
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
|
||||||
builtin_bp_handler = sys.breakpointhook
|
|
||||||
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
|
|
||||||
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
|
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
|
||||||
|
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
|
@ -256,16 +253,6 @@ async def open_root_actor(
|
||||||
logger.cancel("Shutting down root actor")
|
logger.cancel("Shutting down root actor")
|
||||||
await actor.cancel()
|
await actor.cancel()
|
||||||
finally:
|
finally:
|
||||||
_state._current_actor = None
|
|
||||||
|
|
||||||
# restore breakpoint hook state
|
|
||||||
sys.breakpointhook = builtin_bp_handler
|
|
||||||
if orig_bp_path is not None:
|
|
||||||
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
|
|
||||||
else:
|
|
||||||
# clear env back to having no entry
|
|
||||||
os.environ.pop('PYTHONBREAKPOINT')
|
|
||||||
|
|
||||||
logger.runtime("Root actor terminated")
|
logger.runtime("Root actor terminated")
|
||||||
|
|
||||||
|
|
||||||
|
@ -301,7 +288,7 @@ def run_daemon(
|
||||||
async def _main():
|
async def _main():
|
||||||
|
|
||||||
async with open_root_actor(
|
async with open_root_actor(
|
||||||
registry_addr=registry_addr,
|
arbiter_addr=registry_addr,
|
||||||
name=name,
|
name=name,
|
||||||
start_method=start_method,
|
start_method=start_method,
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
|
|
|
@ -199,8 +199,8 @@ async def _invoke(
|
||||||
except BaseExceptionGroup:
|
except BaseExceptionGroup:
|
||||||
# if a context error was set then likely
|
# if a context error was set then likely
|
||||||
# thei multierror was raised due to that
|
# thei multierror was raised due to that
|
||||||
if ctx._error is not None:
|
if ctx._remote_ctx_error is not None:
|
||||||
raise ctx._error from None
|
raise ctx._remote_ctx_error from None
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
|
@ -23,12 +23,13 @@ import sys
|
||||||
import platform
|
import platform
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Awaitable,
|
|
||||||
Literal,
|
Literal,
|
||||||
|
Optional,
|
||||||
Callable,
|
Callable,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
from collections.abc import Awaitable
|
||||||
|
|
||||||
from exceptiongroup import BaseExceptionGroup
|
from exceptiongroup import BaseExceptionGroup
|
||||||
import trio
|
import trio
|
||||||
|
@ -59,7 +60,7 @@ if TYPE_CHECKING:
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
|
||||||
# placeholder for an mp start context if so using that backend
|
# placeholder for an mp start context if so using that backend
|
||||||
_ctx: mp.context.BaseContext | None = None
|
_ctx: Optional[mp.context.BaseContext] = None
|
||||||
SpawnMethodKey = Literal[
|
SpawnMethodKey = Literal[
|
||||||
'trio', # supported on all platforms
|
'trio', # supported on all platforms
|
||||||
'mp_spawn',
|
'mp_spawn',
|
||||||
|
@ -85,7 +86,7 @@ else:
|
||||||
def try_set_start_method(
|
def try_set_start_method(
|
||||||
key: SpawnMethodKey
|
key: SpawnMethodKey
|
||||||
|
|
||||||
) -> mp.context.BaseContext | None:
|
) -> Optional[mp.context.BaseContext]:
|
||||||
'''
|
'''
|
||||||
Attempt to set the method for process starting, aka the "actor
|
Attempt to set the method for process starting, aka the "actor
|
||||||
spawning backend".
|
spawning backend".
|
||||||
|
@ -199,37 +200,16 @@ async def cancel_on_completion(
|
||||||
async def do_hard_kill(
|
async def do_hard_kill(
|
||||||
proc: trio.Process,
|
proc: trio.Process,
|
||||||
terminate_after: int = 3,
|
terminate_after: int = 3,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# NOTE: this timeout used to do nothing since we were shielding
|
# NOTE: this timeout used to do nothing since we were shielding
|
||||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||||
# never release until the process exits, now it acts as
|
# never release until the process exits, now it acts as
|
||||||
# a hard-kill time ultimatum.
|
# a hard-kill time ultimatum.
|
||||||
log.debug(f"Terminating {proc}")
|
|
||||||
with trio.move_on_after(terminate_after) as cs:
|
with trio.move_on_after(terminate_after) as cs:
|
||||||
|
|
||||||
# NOTE: code below was copied verbatim from the now deprecated
|
# NOTE: This ``__aexit__()`` shields internally.
|
||||||
# (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc
|
async with proc: # calls ``trio.Process.aclose()``
|
||||||
# string:
|
log.debug(f"Terminating {proc}")
|
||||||
#
|
|
||||||
# Close any pipes we have to the process (both input and output)
|
|
||||||
# and wait for it to exit. If cancelled, kills the process and
|
|
||||||
# waits for it to finish exiting before propagating the
|
|
||||||
# cancellation.
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
if proc.stdin is not None:
|
|
||||||
await proc.stdin.aclose()
|
|
||||||
if proc.stdout is not None:
|
|
||||||
await proc.stdout.aclose()
|
|
||||||
if proc.stderr is not None:
|
|
||||||
await proc.stderr.aclose()
|
|
||||||
try:
|
|
||||||
await proc.wait()
|
|
||||||
finally:
|
|
||||||
if proc.returncode is None:
|
|
||||||
proc.kill()
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await proc.wait()
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
# XXX: should pretty much never get here unless we have
|
# XXX: should pretty much never get here unless we have
|
||||||
|
@ -375,11 +355,12 @@ async def trio_proc(
|
||||||
spawn_cmd.append("--asyncio")
|
spawn_cmd.append("--asyncio")
|
||||||
|
|
||||||
cancelled_during_spawn: bool = False
|
cancelled_during_spawn: bool = False
|
||||||
proc: trio.Process | None = None
|
proc: Optional[trio.Process] = None
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
# TODO: needs ``trio_typing`` patch?
|
# TODO: needs ``trio_typing`` patch?
|
||||||
proc = await trio.lowlevel.open_process(spawn_cmd)
|
proc = await trio.lowlevel.open_process( # type: ignore
|
||||||
|
spawn_cmd)
|
||||||
|
|
||||||
log.runtime(f"Started {proc}")
|
log.runtime(f"Started {proc}")
|
||||||
|
|
||||||
|
@ -463,8 +444,8 @@ async def trio_proc(
|
||||||
nursery.cancel_scope.cancel()
|
nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# XXX NOTE XXX: The "hard" reap since no actor zombies are
|
# The "hard" reap since no actor zombies are allowed!
|
||||||
# allowed! Do this **after** cancellation/teardown to avoid
|
# XXX: do this **after** cancellation/tearfown to avoid
|
||||||
# killing the process too early.
|
# killing the process too early.
|
||||||
if proc:
|
if proc:
|
||||||
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
|
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
|
||||||
|
|
|
@ -27,7 +27,8 @@ from typing import (
|
||||||
Optional,
|
Optional,
|
||||||
Callable,
|
Callable,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
AsyncIterator
|
AsyncIterator,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import warnings
|
import warnings
|
||||||
|
@ -41,6 +42,10 @@ from .log import get_logger
|
||||||
from .trionics import broadcast_receiver, BroadcastReceiver
|
from .trionics import broadcast_receiver, BroadcastReceiver
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ._portal import Portal
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -70,7 +75,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
'''
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
ctx: 'Context', # typing: ignore # noqa
|
ctx: Context, # typing: ignore # noqa
|
||||||
rx_chan: trio.MemoryReceiveChannel,
|
rx_chan: trio.MemoryReceiveChannel,
|
||||||
_broadcaster: Optional[BroadcastReceiver] = None,
|
_broadcaster: Optional[BroadcastReceiver] = None,
|
||||||
|
|
||||||
|
@ -83,6 +88,9 @@ class MsgStream(trio.abc.Channel):
|
||||||
self._eoc: bool = False
|
self._eoc: bool = False
|
||||||
self._closed: bool = False
|
self._closed: bool = False
|
||||||
|
|
||||||
|
def ctx(self) -> Context:
|
||||||
|
return self._ctx
|
||||||
|
|
||||||
# delegate directly to underlying mem channel
|
# delegate directly to underlying mem channel
|
||||||
def receive_nowait(self):
|
def receive_nowait(self):
|
||||||
msg = self._rx_chan.receive_nowait()
|
msg = self._rx_chan.receive_nowait()
|
||||||
|
@ -278,7 +286,6 @@ class MsgStream(trio.abc.Channel):
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def subscribe(
|
async def subscribe(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
) -> AsyncIterator[BroadcastReceiver]:
|
) -> AsyncIterator[BroadcastReceiver]:
|
||||||
'''
|
'''
|
||||||
Allocate and return a ``BroadcastReceiver`` which delegates
|
Allocate and return a ``BroadcastReceiver`` which delegates
|
||||||
|
@ -335,8 +342,8 @@ class MsgStream(trio.abc.Channel):
|
||||||
Send a message over this stream to the far end.
|
Send a message over this stream to the far end.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if self._ctx._error:
|
if self._ctx._remote_ctx_error:
|
||||||
raise self._ctx._error # from None
|
raise self._ctx._remote_ctx_error # from None
|
||||||
|
|
||||||
if self._closed:
|
if self._closed:
|
||||||
raise trio.ClosedResourceError('This stream was already closed')
|
raise trio.ClosedResourceError('This stream was already closed')
|
||||||
|
@ -375,9 +382,10 @@ class Context:
|
||||||
_remote_func_type: Optional[str] = None
|
_remote_func_type: Optional[str] = None
|
||||||
|
|
||||||
# only set on the caller side
|
# only set on the caller side
|
||||||
_portal: Optional['Portal'] = None # type: ignore # noqa
|
_portal: Optional[Portal] = None # type: ignore # noqa
|
||||||
|
_stream: Optional[MsgStream] = None
|
||||||
_result: Optional[Any] = False
|
_result: Optional[Any] = False
|
||||||
_error: Optional[BaseException] = None
|
_remote_ctx_error: Optional[BaseException] = None
|
||||||
|
|
||||||
# status flags
|
# status flags
|
||||||
_cancel_called: bool = False
|
_cancel_called: bool = False
|
||||||
|
@ -390,7 +398,7 @@ class Context:
|
||||||
# only set on the callee side
|
# only set on the callee side
|
||||||
_scope_nursery: Optional[trio.Nursery] = None
|
_scope_nursery: Optional[trio.Nursery] = None
|
||||||
|
|
||||||
_backpressure: bool = False
|
_backpressure: bool = True
|
||||||
|
|
||||||
async def send_yield(self, data: Any) -> None:
|
async def send_yield(self, data: Any) -> None:
|
||||||
|
|
||||||
|
@ -435,21 +443,26 @@ class Context:
|
||||||
# (currently) that other portal APIs (``Portal.run()``,
|
# (currently) that other portal APIs (``Portal.run()``,
|
||||||
# ``.run_in_actor()``) do their own error checking at the point
|
# ``.run_in_actor()``) do their own error checking at the point
|
||||||
# of the call and result processing.
|
# of the call and result processing.
|
||||||
log.error(
|
|
||||||
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
|
||||||
f'{msg["error"]["tb_str"]}'
|
|
||||||
)
|
|
||||||
error = unpack_error(msg, self.chan)
|
error = unpack_error(msg, self.chan)
|
||||||
if (
|
if (
|
||||||
isinstance(error, ContextCancelled) and
|
isinstance(error, ContextCancelled)
|
||||||
self._cancel_called
|
|
||||||
):
|
):
|
||||||
# this is an expected cancel request response message
|
log.cancel(
|
||||||
# and we don't need to raise it in scope since it will
|
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
||||||
# potentially override a real error
|
f'{msg["error"]["tb_str"]}'
|
||||||
return
|
)
|
||||||
|
if self._cancel_called:
|
||||||
|
# this is an expected cancel request response message
|
||||||
|
# and we don't need to raise it in scope since it will
|
||||||
|
# potentially override a real error
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
log.error(
|
||||||
|
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
||||||
|
f'{msg["error"]["tb_str"]}'
|
||||||
|
)
|
||||||
|
|
||||||
self._error = error
|
self._remote_ctx_error = error
|
||||||
|
|
||||||
# TODO: tempted to **not** do this by-reraising in a
|
# TODO: tempted to **not** do this by-reraising in a
|
||||||
# nursery and instead cancel a surrounding scope, detect
|
# nursery and instead cancel a surrounding scope, detect
|
||||||
|
@ -457,7 +470,7 @@ class Context:
|
||||||
if self._scope_nursery:
|
if self._scope_nursery:
|
||||||
|
|
||||||
async def raiser():
|
async def raiser():
|
||||||
raise self._error from None
|
raise self._remote_ctx_error from None
|
||||||
|
|
||||||
# from trio.testing import wait_all_tasks_blocked
|
# from trio.testing import wait_all_tasks_blocked
|
||||||
# await wait_all_tasks_blocked()
|
# await wait_all_tasks_blocked()
|
||||||
|
@ -483,6 +496,7 @@ class Context:
|
||||||
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
||||||
|
|
||||||
self._cancel_called = True
|
self._cancel_called = True
|
||||||
|
ipc_broken: bool = False
|
||||||
|
|
||||||
if side == 'caller':
|
if side == 'caller':
|
||||||
if not self._portal:
|
if not self._portal:
|
||||||
|
@ -500,7 +514,14 @@ class Context:
|
||||||
# NOTE: we're telling the far end actor to cancel a task
|
# NOTE: we're telling the far end actor to cancel a task
|
||||||
# corresponding to *this actor*. The far end local channel
|
# corresponding to *this actor*. The far end local channel
|
||||||
# instance is passed to `Actor._cancel_task()` implicitly.
|
# instance is passed to `Actor._cancel_task()` implicitly.
|
||||||
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
|
try:
|
||||||
|
await self._portal.run_from_ns(
|
||||||
|
'self',
|
||||||
|
'_cancel_task',
|
||||||
|
cid=cid,
|
||||||
|
)
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
ipc_broken = True
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
# XXX: there's no way to know if the remote task was indeed
|
# XXX: there's no way to know if the remote task was indeed
|
||||||
|
@ -516,7 +537,10 @@ class Context:
|
||||||
"Timed out on cancelling remote task "
|
"Timed out on cancelling remote task "
|
||||||
f"{cid} for {self._portal.channel.uid}")
|
f"{cid} for {self._portal.channel.uid}")
|
||||||
|
|
||||||
# callee side remote task
|
elif ipc_broken:
|
||||||
|
log.cancel(
|
||||||
|
"Transport layer was broken before cancel request "
|
||||||
|
f"{cid} for {self._portal.channel.uid}")
|
||||||
else:
|
else:
|
||||||
self._cancel_msg = msg
|
self._cancel_msg = msg
|
||||||
|
|
||||||
|
@ -604,6 +628,7 @@ class Context:
|
||||||
ctx=self,
|
ctx=self,
|
||||||
rx_chan=ctx._recv_chan,
|
rx_chan=ctx._recv_chan,
|
||||||
) as stream:
|
) as stream:
|
||||||
|
self._stream = stream
|
||||||
|
|
||||||
if self._portal:
|
if self._portal:
|
||||||
self._portal._streams.add(stream)
|
self._portal._streams.add(stream)
|
||||||
|
@ -645,25 +670,22 @@ class Context:
|
||||||
|
|
||||||
if not self._recv_chan._closed: # type: ignore
|
if not self._recv_chan._closed: # type: ignore
|
||||||
|
|
||||||
# wait for a final context result consuming
|
def consume(
|
||||||
# and discarding any bi dir stream msgs still
|
msg: dict,
|
||||||
# in transit from the far end.
|
|
||||||
while True:
|
|
||||||
|
|
||||||
msg = await self._recv_chan.receive()
|
) -> Optional[dict]:
|
||||||
try:
|
try:
|
||||||
self._result = msg['return']
|
return msg['return']
|
||||||
break
|
|
||||||
except KeyError as msgerr:
|
except KeyError as msgerr:
|
||||||
|
|
||||||
if 'yield' in msg:
|
if 'yield' in msg:
|
||||||
# far end task is still streaming to us so discard
|
# far end task is still streaming to us so discard
|
||||||
log.warning(f'Discarding stream delivered {msg}')
|
log.warning(f'Discarding stream delivered {msg}')
|
||||||
continue
|
return
|
||||||
|
|
||||||
elif 'stop' in msg:
|
elif 'stop' in msg:
|
||||||
log.debug('Remote stream terminated')
|
log.debug('Remote stream terminated')
|
||||||
continue
|
return
|
||||||
|
|
||||||
# internal error should never get here
|
# internal error should never get here
|
||||||
assert msg.get('cid'), (
|
assert msg.get('cid'), (
|
||||||
|
@ -673,6 +695,25 @@ class Context:
|
||||||
msg, self._portal.channel
|
msg, self._portal.channel
|
||||||
) from msgerr
|
) from msgerr
|
||||||
|
|
||||||
|
# wait for a final context result consuming
|
||||||
|
# and discarding any bi dir stream msgs still
|
||||||
|
# in transit from the far end.
|
||||||
|
if self._stream:
|
||||||
|
async with self._stream.subscribe() as bstream:
|
||||||
|
async for msg in bstream:
|
||||||
|
result = consume(msg)
|
||||||
|
if result:
|
||||||
|
self._result = result
|
||||||
|
break
|
||||||
|
|
||||||
|
if not self._result:
|
||||||
|
while True:
|
||||||
|
msg = await self._recv_chan.receive()
|
||||||
|
result = consume(msg)
|
||||||
|
if result:
|
||||||
|
self._result = result
|
||||||
|
break
|
||||||
|
|
||||||
return self._result
|
return self._result
|
||||||
|
|
||||||
async def started(
|
async def started(
|
||||||
|
|
|
@ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
) -> typing.AsyncGenerator[ActorNursery, None]:
|
) -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
|
|
||||||
# TODO: yay or nay?
|
# TODO: yay or nay?
|
||||||
__tracebackhide__ = True
|
# __tracebackhide__ = True
|
||||||
|
|
||||||
# the collection of errors retreived from spawned sub-actors
|
# the collection of errors retreived from spawned sub-actors
|
||||||
errors: dict[tuple[str, str], BaseException] = {}
|
errors: dict[tuple[str, str], BaseException] = {}
|
||||||
|
|
Loading…
Reference in New Issue