1
0
Fork 0

Compare commits

..

35 Commits

Author SHA1 Message Date
goodboy e5ee2e3de8
Merge pull request #358 from goodboy/switch_to_pdbp
Switch to `pdbp` 🏄🏼
2023-05-15 09:58:58 -04:00
Tyler Goodlet 41aa91c8eb Add news file 2023-05-15 09:35:59 -04:00
Tyler Goodlet 6758e4487c Drop lingering `pdbpp` comment-refs in tests 2023-05-15 09:14:42 -04:00
Tyler Goodlet 1c3893a383 Drop commented `pdbpp` import logic 2023-05-15 09:01:55 -04:00
Tyler Goodlet 73befac9bc Switch to `pdbp` in test reqs 2023-05-15 09:01:27 -04:00
Tyler Goodlet 79622bbeea Restore `breakpoint()` hook after runtime exits
Previously we were leaking our (pdb++) override into the Python runtime
which would always result in a runtime error whenever `breakpoint()` is
called outside our runtime; after exit of the root actor . This
explicitly restores any previous hook override (detected during startup)
or deletes the hook and restores the environment if none existed prior.

Also adds a new WIP debugging example script to ensure breakpointing
works as normal after runtime close; this will be added to the test
suite.
2023-05-15 00:47:29 -04:00
Tyler Goodlet 95535b2226 Some more 3.10+ optional type sigs 2023-05-15 00:47:29 -04:00
Tyler Goodlet 87c6e09d6b Switch readme links to point @ `pdbp` B) 2023-05-14 22:52:24 -04:00
Tyler Goodlet 9ccd3a74b6 More detailed preface description 2023-05-14 22:38:47 -04:00
Tyler Goodlet ae4ff5dc8d pdbp: adding typing to config settings vars 2023-05-14 22:38:46 -04:00
Tyler Goodlet 705538398f `pdbp`: turn off line truncating by default, fixes terminal resizing stuff 2023-05-14 22:38:16 -04:00
Tyler Goodlet 86aef5238d Hide actor nursery exit frame 2023-05-14 21:24:26 -04:00
Tyler Goodlet cc82447db6 First try: switch debug machinery over to `pdbp` B) 2023-05-14 21:24:26 -04:00
Tyler Goodlet 23cffbd940 Use multiline import for debug mod 2023-05-14 21:24:26 -04:00
Tyler Goodlet 3d202272c4 Change over debugger tests to use `PROMPT` var.. 2023-05-14 21:24:26 -04:00
Tyler Goodlet 63cdb0891f Switch to `pdbp` since noone is maintaining `pdbpp` 2023-05-14 21:24:26 -04:00
goodboy 0f7db27b68
Merge pull request #356 from goodboy/drop_proc_actxmngr
`trio.Process.aclose()`?
2023-05-14 20:59:53 -04:00
Tyler Goodlet c53d62d2f7 Add news file 2023-05-14 20:31:26 -04:00
Tyler Goodlet f667d16d66 Copy the now deprecated `trio.Process.aclose()`
Move it into our `_spawn.do_hard_kill()` since we do indeed rely on
the particular process killing sequence on "soft kill" failure cases.
2023-05-14 19:31:50 -04:00
Tyler Goodlet 24a062341e Just call `trio.Process.aclose()` directly for now? 2023-04-02 14:34:41 -04:00
goodboy e714bec8db
Merge pull request #355 from kehrazy/patch-1
fixed the `Zombie` example having wrong indentation
2023-04-01 12:11:47 -04:00
Igor 009cd6552e
fixed the `Zombie` example having wrong indentation 2023-03-31 17:50:46 +03:00
goodboy 649c5e7504
Merge pull request #343 from goodboy/breceiver_internals
Avoid inf recursion in `BroadcastReceiver.receive()`
2023-01-30 14:01:13 -05:00
Tyler Goodlet 203f95615c Add nooz 2023-01-30 12:42:26 -05:00
Tyler Goodlet efb8bec828 Add a basic no-raise-on lag test 2023-01-30 12:26:07 -05:00
Tyler Goodlet 8637778739 Expose `raise_on_lag: bool` flag through factory 2023-01-30 12:18:23 -05:00
Tyler Goodlet 47166e45f0 Be explicit with passthrough kwargs (there's so few) 2023-01-29 17:31:21 -05:00
Tyler Goodlet 4ce2dcd12b Switch back to raising `Lagged` by default
Makes the broadcast test suite not hang xD, and is our expected default
behaviour. Also removes a ton of commented legacy cruft from before the
refactor to remove the `.receive()` recursion and fixes some typing.

Oh right, and in the case where there's only one subscriber left we warn
log about it since in theory we could actually entirely unwind the
bcaster back to the original underlying, though not sure if that's sane
or works for some use cases (like wanting to have some other subscriber
get added dynamically later).
2023-01-29 15:03:34 -05:00
Tyler Goodlet 80f983818f Ignore monkey patched `.send()` type annot 2023-01-29 15:03:34 -05:00
Tyler Goodlet 6ba29f8d56 Recurse and get the last value when in warn mode 2023-01-29 15:03:34 -05:00
Tyler Goodlet 2707a0e971 Add `._raise_on_lag` flag to disable `Lag` raising 2023-01-29 15:03:34 -05:00
Tyler Goodlet c8efcdd0d3 Drop `ReceiveMsgStream` from test suite 2023-01-29 15:03:34 -05:00
Tyler Goodlet 9f9907271b Merge `ReceiveMsgStream` and `MsgStream`
Since one-way streaming can be accomplished by just *not* sending on one
side (and/or thus wrapping such usage in a more restrictive API), we
just drop the recv-only parent type. The only method different was
`MsgStream.send()`, now merged in. Further in usage of `.subscribe()`
we monkey patch the underlying stream's `.send()` onto the delivered
broadcast receiver so that subscriber tasks can two-way stream as though
using the stream directly.

This allows us to more definitively drop `tractor.open_stream_from()` in
the longer run if we so choose as well; note currently this will
potentially create an issue if a caller tries to `.send()` on such a one
way stream.
2023-01-29 15:03:34 -05:00
Tyler Goodlet c2367c1c5e Better `trio`-ize `BroadcastReceiver` internals
Driven by a bug found in `piker` where we'd get an inf recursion error
due to `BroadcastReceiver.receive()` being called when consumer tasks
are awoken but no value is ready to `.nowait_receive()`.

This new rework takes an approach closer to the interface and internals
of `trio.MemoryReceiveChannel` particularly in terms of,

- implementing a `BroadcastReceiver.receive_nowait()` and using it
  within the async `.receive()`.
- failing over to an internal `._receive_from_underlying()` when the
  `_nowait()` call raises `trio.WouldBlock`.
- adding `BroadcastState.statistics()` for debugging and testing
  dropping recursion from `.receive()`.
2023-01-29 15:03:34 -05:00
goodboy a777217674
Merge pull request #346 from goodboy/ipc_failure_while_streaming
Ipc failure while streaming
2023-01-29 15:02:54 -05:00
18 changed files with 474 additions and 233 deletions

View File

@ -6,8 +6,14 @@
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
our nurseries_ let you spawn new Python processes which each run a ``trio``
Fundamentally, ``tractor`` gives you parallelism via
``trio``-"*actors*": independent Python processes (aka
non-shared-memory threads) which maintain structured
concurrency (SC) *end-to-end* inside a *supervision tree*.
Cross-process (and thus cross-host) SC is accomplished through the
combined use of our "actor nurseries_" and an "SC-transitive IPC
protocol" constructed on top of multiple Pythons each running a ``trio``
scheduled runtime - a call to ``trio.run()``.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
@ -23,7 +29,8 @@ Features
- **It's just** a ``trio`` API
- *Infinitely nesteable* process trees
- Builtin IPC streaming APIs with task fan-out broadcasting
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
- A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of
`pdb++`_ thanks to @mdmintz!)
- Support for a swappable, OS specific, process spawning layer
- A modular transport stack, allowing for custom serialization (eg. with
`msgspec`_), communications protocols, and environment specific IPC
@ -118,7 +125,7 @@ Zombie safe: self-destruct a process tree
f"running in pid {os.getpid()}"
)
await trio.sleep_forever()
await trio.sleep_forever()
async def main():
@ -149,7 +156,7 @@ it **is a bug**.
"Native" multi-process debugging
--------------------------------
Using the magic of `pdb++`_ and our internal IPC, we've
Using the magic of `pdbp`_ and our internal IPC, we've
been able to create a native feeling debugging experience for
any (sub-)process in your ``tractor`` tree.
@ -597,6 +604,7 @@ channel`_!
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
.. _pdbp: https://github.com/mdmintz/pdbp
.. _pdb++: https://github.com/pdbpp/pdbpp
.. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
.. _messages: https://en.wikipedia.org/wiki/Message_passing

View File

@ -0,0 +1,24 @@
import os
import sys
import trio
import tractor
async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
# TODO: an assert that verifies the hook has indeed been, hooked
# XD
assert sys.breakpointhook is not tractor._debug._set_trace
breakpoint()
# TODO: an assert that verifies the hook is unhooked..
assert sys.breakpointhook
breakpoint()
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,19 @@
Rework our ``.trionics.BroadcastReceiver`` internals to avoid method
recursion and approach a design and interface closer to ``trio``'s
``MemoryReceiveChannel``.
The details of the internal changes include:
- implementing a ``BroadcastReceiver.receive_nowait()`` and using it
within the async ``.receive()`` thus avoiding recursion from
``.receive()``.
- failing over to an internal ``._receive_from_underlying()`` when the
``_nowait()`` call raises ``trio.WouldBlock``
- adding ``BroadcastState.statistics()`` for debugging and testing both
internals and by users.
- add an internal ``BroadcastReceiver._raise_on_lag: bool`` which can be
set to avoid ``Lagged`` raising for possible use cases where a user
wants to choose between a [cheap or nasty
pattern](https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern)
the the particular stream (we use this in ``piker``'s dark clearing
engine to avoid fast feeds breaking during HFT periods).

View File

@ -0,0 +1,7 @@
Drop `trio.Process.aclose()` usage, copy into our spawning code.
The details are laid out in https://github.com/goodboy/tractor/issues/330.
`trio` changed is process running quite some time ago, this just copies
out the small bit we needed (from the old `.aclose()`) for hard kills
where a soft runtime cancel request fails and our "zombie killer"
implementation kicks in.

View File

@ -0,0 +1,15 @@
Switch to using the fork & fix of `pdb++`, `pdbp`:
https://github.com/mdmintz/pdbp
Allows us to sidestep a variety of issues that aren't being maintained
in the upstream project thanks to the hard work of @mdmintz!
We also include some default settings adjustments as per recent
development on the fork:
- sticky mode is still turned on by default but now activates when
a using the `ll` repl command.
- turn off line truncation by default to avoid inter-line gaps when
resizing the terimnal during use.
- when using the backtrace cmd either by `w` or `bt`, the config
automatically switches to non-sticky mode.

View File

@ -1,7 +1,7 @@
pytest
pytest-trio
pytest-timeout
pdbpp
pdbp
mypy
trio_typing
pexpect

View File

@ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup(
name="tractor",
version='0.1.0a6dev0', # alpha zone
description='structured concurrrent "actors"',
description='structured concurrrent `trio`-"actors"',
long_description=readme,
license='AGPLv3',
author='Tyler Goodlet',
maintainer='Tyler Goodlet',
maintainer_email='jgbt@protonmail.com',
maintainer_email='goodboy_foss@protonmail.com',
url='https://github.com/goodboy/tractor',
platforms=['linux', 'windows'],
packages=[
@ -52,16 +52,14 @@ setup(
# tooling
'tricycle',
'trio_typing',
# tooling
'colorlog',
'wrapt',
# serialization
# IPC serialization
'msgspec',
# debug mode REPL
'pdbpp',
'pdbp',
# pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
@ -73,10 +71,9 @@ setup(
# https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"',
],
tests_require=['pytest'],
python_requires=">=3.9",
python_requires=">=3.10",
keywords=[
'trio',
'async',

View File

@ -14,7 +14,7 @@ def is_win():
return platform.system() == 'Windows'
_registry: dict[str, set[tractor.ReceiveMsgStream]] = {
_registry: dict[str, set[tractor.MsgStream]] = {
'even': set(),
'odd': set(),
}

View File

@ -95,7 +95,7 @@ def spawn(
return _spawn
PROMPT = r"\(Pdb\+\+\)"
PROMPT = r"\(Pdb\+\)"
def expect(
@ -151,18 +151,6 @@ def ctlc(
use_ctlc = request.param
if (
sys.version_info <= (3, 10)
and use_ctlc
):
# on 3.9 it seems the REPL UX
# is highly unreliable and frankly annoying
# to test for. It does work from manual testing
# but i just don't think it's wroth it to try
# and get this working especially since we want to
# be 3.10+ mega-asap.
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
node = request.node
markers = node.own_markers
for mark in markers:
@ -193,13 +181,15 @@ def ctlc(
ids=lambda item: f'{item[0]} -> {item[1]}',
)
def test_root_actor_error(spawn, user_in_out):
"""Demonstrate crash handler entering pdbpp from basic error in root actor.
"""
'''
Demonstrate crash handler entering pdb from basic error in root actor.
'''
user_input, expect_err_str = user_in_out
child = spawn('root_actor_error')
# scan for the pdbpp prompt
# scan for the prompt
expect(child, PROMPT)
before = str(child.before.decode())
@ -230,8 +220,8 @@ def test_root_actor_bp(spawn, user_in_out):
user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
assert 'Error' not in str(child.before)
@ -272,7 +262,7 @@ def do_ctlc(
if expect_prompt:
before = str(child.before.decode())
time.sleep(delay)
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
time.sleep(delay)
if patt:
@ -291,7 +281,7 @@ def test_root_actor_bp_forever(
# entries
for _ in range(10):
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
if ctlc:
do_ctlc(child)
@ -301,7 +291,7 @@ def test_root_actor_bp_forever(
# do one continue which should trigger a
# new task to lock the tty
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# seems that if we hit ctrl-c too fast the
# sigint guard machinery might not kick in..
@ -312,10 +302,10 @@ def test_root_actor_bp_forever(
# XXX: this previously caused a bug!
child.sendline('n')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
child.sendline('n')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# quit out of the loop
child.sendline('q')
@ -338,8 +328,8 @@ def test_subactor_error(
'''
child = spawn('subactor_error')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
@ -359,7 +349,7 @@ def test_subactor_error(
# creating actor
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
# root actor gets debugger engaged
@ -386,8 +376,8 @@ def test_subactor_breakpoint(
child = spawn('subactor_breakpoint')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -396,7 +386,7 @@ def test_subactor_breakpoint(
# entries
for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
if ctlc:
do_ctlc(child)
@ -404,7 +394,7 @@ def test_subactor_breakpoint(
# now run some "continues" to show re-entries
for _ in range(5):
child.sendline('continue')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -415,7 +405,7 @@ def test_subactor_breakpoint(
child.sendline('q')
# child process should exit but parent will capture pdb.BdbQuit
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "RemoteActorError: ('breakpoint_forever'" in before
@ -447,8 +437,8 @@ def test_multi_subactors(
'''
child = spawn(r'multi_subactors')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -460,7 +450,7 @@ def test_multi_subactors(
# entries
for _ in range(10):
child.sendline('next')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
if ctlc:
do_ctlc(child)
@ -469,7 +459,7 @@ def test_multi_subactors(
child.sendline('c')
# first name_error failure
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert "NameError" in before
@ -481,7 +471,7 @@ def test_multi_subactors(
child.sendline('c')
# 2nd name_error failure
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# TODO: will we ever get the race where this crash will show up?
# blocklist strat now prevents this crash
@ -495,7 +485,7 @@ def test_multi_subactors(
# breakpoint loop should re-engage
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -511,7 +501,7 @@ def test_multi_subactors(
):
child.sendline('c')
time.sleep(0.1)
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
if ctlc:
@ -530,11 +520,11 @@ def test_multi_subactors(
# now run some "continues" to show re-entries
for _ in range(5):
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# quit the loop and expect parent to attach
child.sendline('q')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert_before(child, [
@ -578,7 +568,7 @@ def test_multi_daemon_subactors(
'''
child = spawn('multi_daemon_subactors')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# there can be a race for which subactor will acquire
# the root's tty lock first so anticipate either crash
@ -608,7 +598,7 @@ def test_multi_daemon_subactors(
# second entry by `bp_forever`.
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
assert_before(child, [next_msg])
# XXX: hooray the root clobbering the child here was fixed!
@ -630,7 +620,7 @@ def test_multi_daemon_subactors(
# expect another breakpoint actor entry
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
try:
assert_before(child, [bp_forever_msg])
@ -646,7 +636,7 @@ def test_multi_daemon_subactors(
# after 1 or more further bp actor entries.
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
assert_before(child, [name_error_msg])
# wait for final error in root
@ -654,7 +644,7 @@ def test_multi_daemon_subactors(
while True:
try:
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
assert_before(
child,
[bp_forever_msg]
@ -687,8 +677,8 @@ def test_multi_subactors_root_errors(
'''
child = spawn('multi_subactor_root_errors')
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# scan for the prompt
child.expect(PROMPT)
# at most one subactor should attach before the root is cancelled
before = str(child.before.decode())
@ -703,7 +693,7 @@ def test_multi_subactors_root_errors(
# due to block list strat from #337, this will no longer
# propagate before the root errors and cancels the spawner sub-tree.
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# only if the blocking condition doesn't kick in fast enough
before = str(child.before.decode())
@ -718,7 +708,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child)
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# check if the spawner crashed or was blocked from debug
# and if this intermediary attached check the boxed error
@ -735,7 +725,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child)
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# expect a root actor crash
assert_before(child, [
@ -784,7 +774,7 @@ def test_multi_nested_subactors_error_through_nurseries(
for send_char in itertools.cycle(['c', 'q']):
try:
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
child.sendline(send_char)
time.sleep(0.01)
@ -826,7 +816,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child = spawn('root_cancelled_but_child_is_in_tty_lock')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
@ -841,7 +831,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(4):
time.sleep(0.5)
try:
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
except (
EOF,
@ -898,7 +888,7 @@ def test_root_cancels_child_context_during_startup(
'''
child = spawn('fast_error_in_root_after_spawn')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
before = str(child.before.decode())
assert "AssertionError" in before
@ -915,7 +905,7 @@ def test_different_debug_mode_per_actor(
ctlc: bool,
):
child = spawn('per_actor_debug')
child.expect(r"\(Pdb\+\+\)")
child.expect(PROMPT)
# only one actor should enter the debugger
before = str(child.before.decode())

View File

@ -12,7 +12,10 @@ import pytest
import trio
from trio.lowlevel import current_task
import tractor
from tractor.trionics import broadcast_receiver, Lagged
from tractor.trionics import (
broadcast_receiver,
Lagged,
)
@tractor.context
@ -37,7 +40,7 @@ async def echo_sequences(
async def ensure_sequence(
stream: tractor.ReceiveMsgStream,
stream: tractor.MsgStream,
sequence: list,
delay: Optional[float] = None,
@ -211,7 +214,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
arb_addr,
start_method,
):
'''Ensure that if a faster task consuming from a stream is cancelled
'''
Ensure that if a faster task consuming from a stream is cancelled
the slower task can continue to receive all expected values.
'''
@ -460,3 +464,51 @@ def test_first_recver_is_cancelled():
assert value == 1
trio.run(main)
def test_no_raise_on_lag():
'''
Run a simple 2-task broadcast where one task is slow but configured
so that it does not raise `Lagged` on overruns using
`raise_on_lasg=False` and verify that the task does not raise.
'''
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
async def slow():
async with brx.subscribe(
raise_on_lag=False,
) as br:
async for msg in br:
print(f'slow task got: {msg}')
await trio.sleep(0.1)
async def fast():
async with brx.subscribe() as br:
async for msg in br:
print(f'fast task got: {msg}')
async def main():
async with (
tractor.open_root_actor(
# NOTE: so we see the warning msg emitted by the bcaster
# internals when the no raise flag is set.
loglevel='warning',
),
trio.open_nursery() as n,
):
n.start_soon(slow)
n.start_soon(fast)
for i in range(1000):
await tx.send(i)
# simulate user nailing ctl-c after realizing
# there's a lag in the slow task.
await trio.sleep(1)
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt):
trio.run(main)

View File

@ -24,7 +24,6 @@ from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
Context,
ReceiveMsgStream,
MsgStream,
stream,
context,
@ -45,7 +44,10 @@ from ._exceptions import (
ModuleNotExposed,
ContextCancelled,
)
from ._debug import breakpoint, post_mortem
from ._debug import (
breakpoint,
post_mortem,
)
from . import msg
from ._root import (
run_daemon,
@ -64,7 +66,6 @@ __all__ = [
'MsgStream',
'BaseExceptionGroup',
'Portal',
'ReceiveMsgStream',
'RemoteActorError',
'breakpoint',
'context',

View File

@ -37,6 +37,7 @@ from typing import (
)
from types import FrameType
import pdbp
import tractor
import trio
from trio_typing import TaskStatus
@ -53,17 +54,6 @@ from ._exceptions import (
)
from ._ipc import Channel
try:
# wtf: only exported when installed in dev mode?
import pdbpp
except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff
import pdb
xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb
log = get_logger(__name__)
@ -154,22 +144,26 @@ class Lock:
cls.repl = None
class TractorConfig(pdbpp.DefaultConfig):
class TractorConfig(pdbp.DefaultConfig):
'''
Custom ``pdbpp`` goodness.
Custom ``pdbp`` goodness :surfer:
'''
# use_pygments = True
# sticky_by_default = True
enable_hidden_frames = False
use_pygments: bool = True
sticky_by_default: bool = False
enable_hidden_frames: bool = False
# much thanks @mdmintz for the hot tip!
# fixes line spacing issue when resizing terminal B)
truncate_long_lines: bool = False
class MultiActorPdb(pdbpp.Pdb):
class MultiActorPdb(pdbp.Pdb):
'''
Add teardown hooks to the regular ``pdbpp.Pdb``.
Add teardown hooks to the regular ``pdbp.Pdb``.
'''
# override the pdbpp config with our coolio one
# override the pdbp config with our coolio one
DefaultConfig = TractorConfig
# def preloop(self):
@ -313,7 +307,7 @@ async def lock_tty_for_child(
) -> str:
'''
Lock the TTY in the root process of an actor tree in a new
inter-actor-context-task such that the ``pdbpp`` debugger console
inter-actor-context-task such that the ``pdbp`` debugger console
can be mutex-allocated to the calling sub-actor for REPL control
without interference by other processes / threads.
@ -433,7 +427,7 @@ async def wait_for_parent_stdin_hijack(
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb = MultiActorPdb()
# signal.signal = pdbpp.hideframe(signal.signal)
# signal.signal = pdbp.hideframe(signal.signal)
Lock.shield_sigint()
@ -583,7 +577,7 @@ async def _breakpoint(
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbpp.hideframe(signal.signal)
# # signal.signal = pdbp.hideframe(signal.signal)
def shield_sigint_handler(
@ -743,13 +737,13 @@ def shield_sigint_handler(
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
# XXX: lol, see ``pdbpp`` issue:
# XXX LEGACY: lol, see ``pdbpp`` issue:
# https://github.com/pdbpp/pdbpp/issues/496
def _set_trace(
actor: Optional[tractor.Actor] = None,
pdb: Optional[MultiActorPdb] = None,
actor: tractor.Actor | None = None,
pdb: MultiActorPdb | None = None,
):
__tracebackhide__ = True
actor = actor or tractor.current_actor()
@ -759,7 +753,11 @@ def _set_trace(
if frame:
frame = frame.f_back # type: ignore
if frame and pdb and actor is not None:
if (
frame
and pdb
and actor is not None
):
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
# no f!#$&* idea, but when we're in async land
# we need 2x frames up?
@ -768,7 +766,8 @@ def _set_trace(
else:
pdb, undo_sigint = mk_mpdb()
# we entered the global ``breakpoint()`` built-in from sync code?
# we entered the global ``breakpoint()`` built-in from sync
# code?
Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame)
@ -798,7 +797,7 @@ def _post_mortem(
# https://github.com/pdbpp/pdbpp/issues/480
# TODO: help with a 3.10+ major release if/when it arrives.
pdbpp.xpm(Pdb=lambda: pdb)
pdbp.xpm(Pdb=lambda: pdb)
post_mortem = partial(

View File

@ -45,7 +45,10 @@ from ._exceptions import (
NoResult,
ContextCancelled,
)
from ._streaming import Context, ReceiveMsgStream
from ._streaming import (
Context,
MsgStream,
)
log = get_logger(__name__)
@ -101,7 +104,7 @@ class Portal:
# it is expected that ``result()`` will be awaited at some
# point.
self._expect_result: Optional[Context] = None
self._streams: set[ReceiveMsgStream] = set()
self._streams: set[MsgStream] = set()
self.actor = current_actor()
async def _submit_for_result(
@ -316,7 +319,7 @@ class Portal:
async_gen_func: Callable, # typing: ignore
**kwargs,
) -> AsyncGenerator[ReceiveMsgStream, None]:
) -> AsyncGenerator[MsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func):
if not (
@ -341,7 +344,7 @@ class Portal:
try:
# deliver receive only stream
async with ReceiveMsgStream(
async with MsgStream(
ctx, ctx._recv_chan,
) as rchan:
self._streams.add(rchan)

View File

@ -22,8 +22,9 @@ from contextlib import asynccontextmanager
from functools import partial
import importlib
import logging
import os
import signal
import sys
import os
import typing
import warnings
@ -84,8 +85,10 @@ async def open_root_actor(
'''
# Override the global debugger hook to make it play nice with
# ``trio``, see:
# ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it
@ -254,6 +257,15 @@ async def open_root_actor(
await actor.cancel()
finally:
_state._current_actor = None
# restore breakpoint hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT')
logger.runtime("Root actor terminated")
@ -289,7 +301,7 @@ def run_daemon(
async def _main():
async with open_root_actor(
arbiter_addr=registry_addr,
registry_addr=registry_addr,
name=name,
start_method=start_method,
debug_mode=debug_mode,

View File

@ -23,13 +23,12 @@ import sys
import platform
from typing import (
Any,
Awaitable,
Literal,
Optional,
Callable,
TypeVar,
TYPE_CHECKING,
)
from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup
import trio
@ -60,7 +59,7 @@ if TYPE_CHECKING:
log = get_logger('tractor')
# placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None
_ctx: mp.context.BaseContext | None = None
SpawnMethodKey = Literal[
'trio', # supported on all platforms
'mp_spawn',
@ -86,7 +85,7 @@ else:
def try_set_start_method(
key: SpawnMethodKey
) -> Optional[mp.context.BaseContext]:
) -> mp.context.BaseContext | None:
'''
Attempt to set the method for process starting, aka the "actor
spawning backend".
@ -200,16 +199,37 @@ async def cancel_on_completion(
async def do_hard_kill(
proc: trio.Process,
terminate_after: int = 3,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs:
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
# NOTE: code below was copied verbatim from the now deprecated
# (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc
# string:
#
# Close any pipes we have to the process (both input and output)
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
if proc.stdout is not None:
await proc.stdout.aclose()
if proc.stderr is not None:
await proc.stderr.aclose()
try:
await proc.wait()
finally:
if proc.returncode is None:
proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
@ -355,12 +375,11 @@ async def trio_proc(
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: Optional[trio.Process] = None
proc: trio.Process | None = None
try:
try:
# TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process( # type: ignore
spawn_cmd)
proc = await trio.lowlevel.open_process(spawn_cmd)
log.runtime(f"Started {proc}")
@ -444,8 +463,8 @@ async def trio_proc(
nursery.cancel_scope.cancel()
finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# XXX NOTE XXX: The "hard" reap since no actor zombies are
# allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')

View File

@ -50,12 +50,13 @@ log = get_logger(__name__)
# - use __slots__ on ``Context``?
class ReceiveMsgStream(trio.abc.ReceiveChannel):
class MsgStream(trio.abc.Channel):
'''
A IPC message stream for receiving logically sequenced values over
an inter-actor ``Channel``. This is the type returned to a local
task which entered either ``Portal.open_stream_from()`` or
``Context.open_stream()``.
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC ``Channel``.
This is the type returned to a local task which entered either
``Portal.open_stream_from()`` or ``Context.open_stream()``.
Termination rules:
@ -317,15 +318,15 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
async with self._broadcaster.subscribe() as bstream:
assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
# NOTE: we patch on a `.send()` to the bcaster so that the
# caller can still conduct 2-way streaming using this
# ``bstream`` handle transparently as though it was the msg
# stream instance.
bstream.send = self.send # type: ignore
yield bstream
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
'''
Bidirectional message stream for use within an inter-actor actor
``Context```.
'''
async def send(
self,
data: Any

View File

@ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay?
# __tracebackhide__ = True
__tracebackhide__ = True
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {}

View File

@ -23,7 +23,6 @@ from __future__ import annotations
from abc import abstractmethod
from collections import deque
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import partial
from operator import ne
from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol
@ -33,7 +32,10 @@ import trio
from trio._core._run import Task
from trio.abc import ReceiveChannel
from trio.lowlevel import current_task
from msgspec import Struct
from tractor.log import get_logger
log = get_logger(__name__)
# A regular invariant generic type
T = TypeVar("T")
@ -86,8 +88,7 @@ class Lagged(trio.TooSlowError):
'''
@dataclass
class BroadcastState:
class BroadcastState(Struct):
'''
Common state to all receivers of a broadcast.
@ -110,7 +111,35 @@ class BroadcastState:
eoc: bool = False
# If the broadcaster was cancelled, we might as well track it
cancelled: bool = False
cancelled: dict[int, Task] = {}
def statistics(self) -> dict[str, Any]:
'''
Return broadcast receiver group "statistics" like many of
``trio``'s internal task-sync primitives.
'''
key: int | None
ev: trio.Event | None
subs = self.subs
if self.recv_ready is not None:
key, ev = self.recv_ready
else:
key = ev = None
qlens: dict[int, int] = {}
for tid, sz in subs.items():
qlens[tid] = sz if sz != -1 else 0
return {
'open_consumers': len(subs),
'queued_len_by_task': qlens,
'max_buffer_size': self.maxlen,
'tasks_waiting': ev.statistics().tasks_waiting if ev else 0,
'tasks_cancelled': self.cancelled,
'next_value_receiver_id': key,
}
class BroadcastReceiver(ReceiveChannel):
@ -128,23 +157,40 @@ class BroadcastReceiver(ReceiveChannel):
rx_chan: AsyncReceiver,
state: BroadcastState,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = True,
) -> None:
# register the original underlying (clone)
self.key = id(self)
self._state = state
# each consumer has an int count which indicates
# which index contains the next value that the task has not yet
# consumed and thus should read. In the "up-to-date" case the
# consumer task must wait for a new value from the underlying
# receiver and we use ``-1`` as the sentinel for this state.
state.subs[self.key] = -1
# underlying for this receiver
self._rx = rx_chan
self._recv = receive_afunc or rx_chan.receive
self._closed: bool = False
self._raise_on_lag = raise_on_lag
async def receive(self) -> ReceiveType:
def receive_nowait(
self,
_key: int | None = None,
_state: BroadcastState | None = None,
key = self.key
state = self._state
) -> Any:
'''
Sync version of `.receive()` which does all the low level work
of receiving from the underlying/wrapped receive channel.
'''
key = _key or self.key
state = _state or self._state
# TODO: ideally we can make some way to "lock out" the
# underlying receive channel in some way such that if some task
@ -177,128 +223,173 @@ class BroadcastReceiver(ReceiveChannel):
# return this value."
# https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging
mxln = state.maxlen
lost = seq - mxln
# decrement to the last value and expect
# consumer to either handle the ``Lagged`` and come back
# or bail out on its own (thus un-subscribing)
state.subs[key] = state.maxlen - 1
state.subs[key] = mxln - 1
# this task was overrun by the producer side
task: Task = current_task()
raise Lagged(f'Task {task.name} was overrun')
msg = f'Task `{task.name}` overrun and dropped `{lost}` values'
if self._raise_on_lag:
raise Lagged(msg)
else:
log.warning(msg)
return self.receive_nowait(_key, _state)
state.subs[key] -= 1
return value
# current task already has the latest value **and** is the
# first task to begin waiting for a new one
if state.recv_ready is None:
raise trio.WouldBlock
if self._closed:
raise trio.ClosedResourceError
async def _receive_from_underlying(
self,
key: int,
state: BroadcastState,
event = trio.Event()
state.recv_ready = key, event
) -> ReceiveType:
if self._closed:
raise trio.ClosedResourceError
event = trio.Event()
assert state.recv_ready is None
state.recv_ready = key, event
try:
# if we're cancelled here it should be
# fine to bail without affecting any other consumers
# right?
try:
value = await self._recv()
value = await self._recv()
# items with lower indices are "newer"
# NOTE: ``collections.deque`` implicitly takes care of
# trucating values outside our ``state.maxlen``. In the
# alt-backend-array-case we'll need to make sure this is
# implemented in similar ringer-buffer-ish style.
state.queue.appendleft(value)
# items with lower indices are "newer"
# NOTE: ``collections.deque`` implicitly takes care of
# trucating values outside our ``state.maxlen``. In the
# alt-backend-array-case we'll need to make sure this is
# implemented in similar ringer-buffer-ish style.
state.queue.appendleft(value)
# broadcast new value to all subscribers by increasing
# all sequence numbers that will point in the queue to
# their latest available value.
# broadcast new value to all subscribers by increasing
# all sequence numbers that will point in the queue to
# their latest available value.
# don't decrement the sequence for this task since we
# already retreived the last value
# don't decrement the sequence for this task since we
# already retreived the last value
# XXX: which of these impls is fastest?
# XXX: which of these impls is fastest?
# subs = state.subs.copy()
# subs.pop(key)
# subs = state.subs.copy()
# subs.pop(key)
for sub_key in filter(
# lambda k: k != key, state.subs,
partial(ne, key), state.subs,
):
state.subs[sub_key] += 1
# NOTE: this should ONLY be set if the above task was *NOT*
# cancelled on the `._recv()` call.
event.set()
return value
except trio.EndOfChannel:
# if any one consumer gets an EOC from the underlying
# receiver we need to unblock and send that signal to
# all other consumers.
self._state.eoc = True
if event.statistics().tasks_waiting:
event.set()
raise
except (
trio.Cancelled,
for sub_key in filter(
# lambda k: k != key, state.subs,
partial(ne, key), state.subs,
):
# handle cancelled specially otherwise sibling
# consumers will be awoken with a sequence of -1
# and will potentially try to rewait the underlying
# receiver instead of just cancelling immediately.
self._state.cancelled = True
if event.statistics().tasks_waiting:
event.set()
raise
state.subs[sub_key] += 1
finally:
# NOTE: this should ONLY be set if the above task was *NOT*
# cancelled on the `._recv()` call.
event.set()
return value
# Reset receiver waiter task event for next blocking condition.
# this MUST be reset even if the above ``.recv()`` call
# was cancelled to avoid the next consumer from blocking on
# an event that won't be set!
state.recv_ready = None
except trio.EndOfChannel:
# if any one consumer gets an EOC from the underlying
# receiver we need to unblock and send that signal to
# all other consumers.
self._state.eoc = True
if event.statistics().tasks_waiting:
event.set()
raise
except (
trio.Cancelled,
):
# handle cancelled specially otherwise sibling
# consumers will be awoken with a sequence of -1
# and will potentially try to rewait the underlying
# receiver instead of just cancelling immediately.
self._state.cancelled[key] = current_task()
if event.statistics().tasks_waiting:
event.set()
raise
finally:
# Reset receiver waiter task event for next blocking condition.
# this MUST be reset even if the above ``.recv()`` call
# was cancelled to avoid the next consumer from blocking on
# an event that won't be set!
state.recv_ready = None
async def receive(self) -> ReceiveType:
key = self.key
state = self._state
try:
return self.receive_nowait(
_key=key,
_state=state,
)
except trio.WouldBlock:
pass
# current task already has the latest value **and** is the
# first task to begin waiting for a new one so we begin blocking
# until rescheduled with the a new value from the underlying.
if state.recv_ready is None:
return await self._receive_from_underlying(key, state)
# This task is all caught up and ready to receive the latest
# value, so queue sched it on the internal event.
# value, so queue/schedule it to be woken on the next internal
# event.
else:
seq = state.subs[key]
assert seq == -1 # sanity
_, ev = state.recv_ready
await ev.wait()
while state.recv_ready is not None:
# seq = state.subs[key]
# assert seq == -1 # sanity
_, ev = state.recv_ready
await ev.wait()
try:
return self.receive_nowait(
_key=key,
_state=state,
)
except trio.WouldBlock:
if self._closed:
raise trio.ClosedResourceError
# NOTE: if we ever would like the behaviour where if the
# first task to recv on the underlying is cancelled but it
# still DOES trigger the ``.recv_ready``, event we'll likely need
# this logic:
subs = state.subs
if (
len(subs) == 1
and key in subs
# or cancelled
):
# XXX: we are the last and only user of this BR so
# likely it makes sense to unwind back to the
# underlying?
# import tractor
# await tractor.breakpoint()
log.warning(
f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?'
)
if seq > -1:
# stuff from above..
seq = state.subs[key]
# XXX: In the case where the first task to allocate the
# ``.recv_ready`` event is cancelled we will be woken
# with a non-incremented sequence number (the ``-1``
# sentinel) and thus will read the oldest value if we
# use that. Instead we need to detect if we have not
# been incremented and then receive again.
# return await self.receive()
value = state.queue[seq]
state.subs[key] -= 1
return value
elif seq == -1:
# XXX: In the case where the first task to allocate the
# ``.recv_ready`` event is cancelled we will be woken with
# a non-incremented sequence number and thus will read the
# oldest value if we use that. Instead we need to detect if
# we have not been incremented and then receive again.
return await self.receive()
else:
raise ValueError(f'Invalid sequence {seq}!?')
return await self._receive_from_underlying(key, state)
@asynccontextmanager
async def subscribe(
self,
raise_on_lag: bool = True,
) -> AsyncIterator[BroadcastReceiver]:
'''
Subscribe for values from this broadcast receiver.
@ -316,6 +407,7 @@ class BroadcastReceiver(ReceiveChannel):
rx_chan=self._rx,
state=state,
receive_afunc=self._recv,
raise_on_lag=raise_on_lag,
)
# assert clone in state.subs
assert br.key in state.subs
@ -352,7 +444,8 @@ def broadcast_receiver(
recv_chan: AsyncReceiver,
max_buffer_size: int,
**kwargs,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = True,
) -> BroadcastReceiver:
@ -363,5 +456,6 @@ def broadcast_receiver(
maxlen=max_buffer_size,
subs={},
),
**kwargs,
receive_afunc=receive_afunc,
raise_on_lag=raise_on_lag,
)