Compare commits

..

No commits in common. "master" and "eg_backup" have entirely different histories.

35 changed files with 443 additions and 1247 deletions

View File

@ -6,14 +6,8 @@
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
Fundamentally, ``tractor`` gives you parallelism via
``trio``-"*actors*": independent Python processes (aka
non-shared-memory threads) which maintain structured
concurrency (SC) *end-to-end* inside a *supervision tree*.
Cross-process (and thus cross-host) SC is accomplished through the
combined use of our "actor nurseries_" and an "SC-transitive IPC
protocol" constructed on top of multiple Pythons each running a ``trio``
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
our nurseries_ let you spawn new Python processes which each run a ``trio``
scheduled runtime - a call to ``trio.run()``.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
@ -29,8 +23,7 @@ Features
- **It's just** a ``trio`` API
- *Infinitely nesteable* process trees
- Builtin IPC streaming APIs with task fan-out broadcasting
- A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of
`pdb++`_ thanks to @mdmintz!)
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
- Support for a swappable, OS specific, process spawning layer
- A modular transport stack, allowing for custom serialization (eg. with
`msgspec`_), communications protocols, and environment specific IPC
@ -156,7 +149,7 @@ it **is a bug**.
"Native" multi-process debugging
--------------------------------
Using the magic of `pdbp`_ and our internal IPC, we've
Using the magic of `pdb++`_ and our internal IPC, we've
been able to create a native feeling debugging experience for
any (sub-)process in your ``tractor`` tree.
@ -604,7 +597,6 @@ channel`_!
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s
.. _trio gitter channel: https://gitter.im/python-trio/general
.. _matrix channel: https://matrix.to/#/!tractor:matrix.org
.. _pdbp: https://github.com/mdmintz/pdbp
.. _pdb++: https://github.com/pdbpp/pdbpp
.. _guest mode: https://trio.readthedocs.io/en/stable/reference-lowlevel.html?highlight=guest%20mode#using-guest-mode-to-run-trio-on-top-of-other-event-loops
.. _messages: https://en.wikipedia.org/wiki/Message_passing

View File

@ -1,151 +0,0 @@
'''
Complex edge case where during real-time streaming the IPC tranport
channels are wiped out (purposely in this example though it could have
been an outage) and we want to ensure that despite being in debug mode
(or not) the user can sent SIGINT once they notice the hang and the
actor tree will eventually be cancelled without leaving any zombies.
'''
import trio
from tractor import (
open_nursery,
context,
Context,
MsgStream,
)
async def break_channel_silently_then_error(
stream: MsgStream,
):
async for msg in stream:
await stream.send(msg)
# XXX: close the channel right after an error is raised
# purposely breaking the IPC transport to make sure the parent
# doesn't get stuck in debug or hang on the connection join.
# this more or less simulates an infinite msg-receive hang on
# the other end.
await stream._ctx.chan.send(None)
assert 0
async def close_stream_and_error(
stream: MsgStream,
):
async for msg in stream:
await stream.send(msg)
# wipe out channel right before raising
await stream._ctx.chan.send(None)
await stream.aclose()
assert 0
@context
async def recv_and_spawn_net_killers(
ctx: Context,
break_ipc_after: bool | int = False,
) -> None:
'''
Receive stream msgs and spawn some IPC killers mid-stream.
'''
await ctx.started()
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
):
async for i in stream:
print(f'child echoing {i}')
await stream.send(i)
if (
break_ipc_after
and i > break_ipc_after
):
'#################################\n'
'Simulating child-side IPC BREAK!\n'
'#################################'
n.start_soon(break_channel_silently_then_error, stream)
n.start_soon(close_stream_and_error, stream)
async def main(
debug_mode: bool = False,
start_method: str = 'trio',
# by default we break the parent IPC first (if configured to break
# at all), but this can be changed so the child does first (even if
# both are set to break).
break_parent_ipc_after: int | bool = False,
break_child_ipc_after: int | bool = False,
) -> None:
async with (
open_nursery(
start_method=start_method,
# NOTE: even debugger is used we shouldn't get
# a hang since it never engages due to broken IPC
debug_mode=debug_mode,
loglevel='warning',
) as an,
):
portal = await an.start_actor(
'chitty_hijo',
enable_modules=[__name__],
)
async with portal.open_context(
recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after,
) as (ctx, sent):
async with ctx.open_stream() as stream:
for i in range(1000):
if (
break_parent_ipc_after
and i > break_parent_ipc_after
):
print(
'#################################\n'
'Simulating parent-side IPC BREAK!\n'
'#################################'
)
await stream._ctx.chan.send(None)
# it actually breaks right here in the
# mp_spawn/forkserver backends and thus the zombie
# reaper never even kicks in?
print(f'parent sending {i}')
await stream.send(i)
with trio.move_on_after(2) as cs:
# NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's
# caller/this-parent.
rx = await stream.receive()
print(f"I'm a happy user and echoed to me is {rx}")
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print("YOO i'm a user anddd thingz hangin..")
print(
"YOO i'm mad send side dun but thingz hangin..\n"
'MASHING CTlR-C Ctl-c..'
)
raise KeyboardInterrupt
if __name__ == '__main__':
trio.run(main)

View File

@ -1,24 +0,0 @@
import os
import sys
import trio
import tractor
async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
# TODO: an assert that verifies the hook has indeed been, hooked
# XD
assert sys.breakpointhook is not tractor._debug._set_trace
breakpoint()
# TODO: an assert that verifies the hook is unhooked..
assert sys.breakpointhook
breakpoint()
if __name__ == '__main__':
trio.run(main)

View File

@ -1,19 +0,0 @@
Rework our ``.trionics.BroadcastReceiver`` internals to avoid method
recursion and approach a design and interface closer to ``trio``'s
``MemoryReceiveChannel``.
The details of the internal changes include:
- implementing a ``BroadcastReceiver.receive_nowait()`` and using it
within the async ``.receive()`` thus avoiding recursion from
``.receive()``.
- failing over to an internal ``._receive_from_underlying()`` when the
``_nowait()`` call raises ``trio.WouldBlock``
- adding ``BroadcastState.statistics()`` for debugging and testing both
internals and by users.
- add an internal ``BroadcastReceiver._raise_on_lag: bool`` which can be
set to avoid ``Lagged`` raising for possible use cases where a user
wants to choose between a [cheap or nasty
pattern](https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern)
the the particular stream (we use this in ``piker``'s dark clearing
engine to avoid fast feeds breaking during HFT periods).

View File

@ -1,11 +0,0 @@
Always ``list``-cast the ``mngrs`` input to
``.trionics.gather_contexts()`` and ensure its size otherwise raise
a ``ValueError``.
Turns out that trying to pass an inline-style generator comprehension
doesn't seem to work inside the ``async with`` expression? Further, in
such a case we can get a hang waiting on the all-entered event
completion when the internal mngrs iteration is a noop. Instead we
always greedily check a size and error on empty input; the lazy
iteration of a generator input is not beneficial anyway since we're
entering all manager instances in concurrent tasks.

View File

@ -1,15 +0,0 @@
Fixes to ensure IPC (channel) breakage doesn't result in hung actor
trees; the zombie reaping and general supervision machinery will always
clean up and terminate.
This includes not only the (mostly minor) fixes to solve these cases but
also a new extensive test suite in `test_advanced_faults.py` with an
accompanying highly configurable example module-script in
`examples/advanced_faults/ipc_failure_during_stream.py`. Tests ensure we
never get hang or zombies despite operating in debug mode and attempt to
simulate all possible IPC transport failure cases for a local-host actor
tree.
Further we simplify `Context.open_stream.__aexit__()` to just call
`MsgStream.aclose()` directly more or less avoiding a pure duplicate
code path.

View File

@ -1,10 +0,0 @@
Always redraw the `pdbpp` prompt on `SIGINT` during REPL use.
There was recent changes todo with Python 3.10 that required us to pin
to a specific commit in `pdbpp` which have recently been fixed minus
this last issue with `SIGINT` shielding: not clobbering or not
showing the `(Pdb++)` prompt on ctlr-c by the user. This repairs all
that by firstly removing the standard KBI intercepting of the std lib's
`pdb.Pdb._cmdloop()` as well as ensuring that only the actor with REPL
control ever reports `SIGINT` handler log msgs and prompt redraws. With
this we move back to using pypi `pdbpp` release.

View File

@ -1,7 +0,0 @@
Drop `trio.Process.aclose()` usage, copy into our spawning code.
The details are laid out in https://github.com/goodboy/tractor/issues/330.
`trio` changed is process running quite some time ago, this just copies
out the small bit we needed (from the old `.aclose()`) for hard kills
where a soft runtime cancel request fails and our "zombie killer"
implementation kicks in.

View File

@ -1,15 +0,0 @@
Switch to using the fork & fix of `pdb++`, `pdbp`:
https://github.com/mdmintz/pdbp
Allows us to sidestep a variety of issues that aren't being maintained
in the upstream project thanks to the hard work of @mdmintz!
We also include some default settings adjustments as per recent
development on the fork:
- sticky mode is still turned on by default but now activates when
a using the `ll` repl command.
- turn off line truncation by default to avoid inter-line gaps when
resizing the terimnal during use.
- when using the backtrace cmd either by `w` or `bt`, the config
automatically switches to non-sticky mode.

View File

@ -1,7 +1,7 @@
pytest
pytest-trio
pytest-timeout
pdbp
pdbpp
mypy
trio_typing
pexpect

View File

@ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup(
name="tractor",
version='0.1.0a6dev0', # alpha zone
description='structured concurrrent `trio`-"actors"',
description='structured concurrrent "actors"',
long_description=readme,
license='AGPLv3',
author='Tyler Goodlet',
maintainer='Tyler Goodlet',
maintainer_email='goodboy_foss@protonmail.com',
maintainer_email='jgbt@protonmail.com',
url='https://github.com/goodboy/tractor',
platforms=['linux', 'windows'],
packages=[
@ -52,15 +52,14 @@ setup(
# tooling
'tricycle',
'trio_typing',
# tooling
'colorlog',
'wrapt',
# IPC serialization
# serialization
'msgspec',
# debug mode REPL
'pdbp',
# pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
# and pep:
@ -71,9 +70,14 @@ setup(
# https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"',
# 3.10 has an outstanding unreleased issue and `pdbpp` itself
# pins to patched forks of its own dependencies as well..and
# we need a specific patch on master atm.
'pdbpp @ git+https://github.com/pdbpp/pdbpp@76c4be5#egg=pdbpp ; python_version > "3.9"', # noqa: E501
],
tests_require=['pytest'],
python_requires=">=3.10",
python_requires=">=3.9",
keywords=[
'trio',
'async',

View File

@ -7,7 +7,6 @@ import os
import random
import signal
import platform
import pathlib
import time
import inspect
from functools import partial, wraps
@ -114,21 +113,14 @@ no_windows = pytest.mark.skipif(
)
def repodir() -> pathlib.Path:
'''
Return the abspath to the repo directory.
'''
# 2 parents up to step up through tests/<repo_dir>
return pathlib.Path(__file__).parent.parent.absolute()
def examples_dir() -> pathlib.Path:
'''
Return the abspath to the examples directory as `pathlib.Path`.
'''
return repodir() / 'examples'
def repodir():
"""Return the abspath to the repo directory.
"""
dirname = os.path.dirname
dirpath = os.path.abspath(
dirname(dirname(os.path.realpath(__file__)))
)
return dirpath
def pytest_addoption(parser):
@ -159,7 +151,7 @@ def loglevel(request):
@pytest.fixture(scope='session')
def spawn_backend(request) -> str:
def spawn_backend(request):
return request.config.option.spawn_backend
@ -213,22 +205,16 @@ def sig_prog(proc, sig):
@pytest.fixture
def daemon(
loglevel: str,
testdir,
arb_addr: tuple[str, int],
):
'''
Run a daemon actor as a "remote arbiter".
'''
def daemon(loglevel, testdir, arb_addr):
"""Run a daemon actor as a "remote arbiter".
"""
if loglevel in ('trace', 'debug'):
# too much logging will lock up the subproc (smh)
loglevel = 'info'
cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon([], registry_addr={}, loglevel={})"
"import tractor; tractor.run_daemon([], arbiter_addr={}, loglevel={})"
.format(
arb_addr,
"'{}'".format(loglevel) if loglevel else None)

View File

@ -1,193 +0,0 @@
'''
Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la
cancelacion?..
'''
from functools import partial
import pytest
from _pytest.pathlib import import_path
import trio
import tractor
from conftest import (
examples_dir,
)
@pytest.mark.parametrize(
'debug_mode',
[False, True],
ids=['no_debug_mode', 'debug_mode'],
)
@pytest.mark.parametrize(
'ipc_break',
[
# no breaks
{
'break_parent_ipc_after': False,
'break_child_ipc_after': False,
},
# only parent breaks
{
'break_parent_ipc_after': 500,
'break_child_ipc_after': False,
},
# only child breaks
{
'break_parent_ipc_after': False,
'break_child_ipc_after': 500,
},
# both: break parent first
{
'break_parent_ipc_after': 500,
'break_child_ipc_after': 800,
},
# both: break child first
{
'break_parent_ipc_after': 800,
'break_child_ipc_after': 500,
},
],
ids=[
'no_break',
'break_parent',
'break_child',
'break_both_parent_first',
'break_both_child_first',
],
)
def test_ipc_channel_break_during_stream(
debug_mode: bool,
spawn_backend: str,
ipc_break: dict | None,
):
'''
Ensure we can have an IPC channel break its connection during
streaming and it's still possible for the (simulated) user to kill
the actor tree using SIGINT.
We also verify the type of connection error expected in the parent
depending on which side if the IPC breaks first.
'''
if spawn_backend != 'trio':
if debug_mode:
pytest.skip('`debug_mode` only supported on `trio` spawner')
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
expect_final_exc = trio.ClosedResourceError
mod = import_path(
examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py',
root=examples_dir(),
)
expect_final_exc = KeyboardInterrupt
# when ONLY the child breaks we expect the parent to get a closed
# resource error on the next `MsgStream.receive()` and then fail out
# and cancel the child from there.
if (
# only child breaks
(
ipc_break['break_child_ipc_after']
and ipc_break['break_parent_ipc_after'] is False
)
# both break but, parent breaks first
or (
ipc_break['break_child_ipc_after'] is not False
and (
ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after']
)
)
):
expect_final_exc = trio.ClosedResourceError
# when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we expect the channel to be
# sent a stop msg from the child at some point which will signal the
# parent that the stream has been terminated.
# NOTE: when the parent breaks "after" the child you get this same
# case as well, the child breaks the IPC channel with a stop msg
# before any closure takes place.
elif (
# only parent breaks
(
ipc_break['break_parent_ipc_after']
and ipc_break['break_child_ipc_after'] is False
)
# both break but, child breaks first
or (
ipc_break['break_parent_ipc_after'] is not False
and (
ipc_break['break_child_ipc_after']
> ipc_break['break_parent_ipc_after']
)
)
):
expect_final_exc = trio.EndOfChannel
with pytest.raises(expect_final_exc):
trio.run(
partial(
mod.main,
debug_mode=debug_mode,
start_method=spawn_backend,
**ipc_break,
)
)
@tractor.context
async def break_ipc_after_started(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
await stream.aclose()
await trio.sleep(0.2)
await ctx.chan.send(None)
print('child broke IPC and terminating')
def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
'''
Verify that is a subactor's IPC goes down just after bringing up a stream
the parent can trigger a SIGINT and the child will be reaped out-of-IPC by
the localhost process supervision machinery: aka "zombie lord".
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'ipc_breaker',
enable_modules=[__name__],
)
with trio.move_on_after(1):
async with (
portal.open_context(
break_ipc_after_started
) as (ctx, sent),
):
async with ctx.open_stream():
await trio.sleep(0.5)
print('parent waiting on context')
print('parent exited context')
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt):
trio.run(main)

View File

@ -14,7 +14,7 @@ def is_win():
return platform.system() == 'Windows'
_registry: dict[str, set[tractor.MsgStream]] = {
_registry: dict[str, set[tractor.ReceiveMsgStream]] = {
'even': set(),
'odd': set(),
}

View File

@ -1,6 +1,5 @@
import itertools
import pytest
import trio
import tractor
from tractor import open_actor_cluster
@ -12,72 +11,26 @@ from conftest import tractor_test
MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None:
async def main():
with trio.fail_after(1):
async with (
open_actor_cluster(
modules=[__name__],
# NOTE: ensure we can passthrough runtime opts
loglevel='info',
# debug_mode=True,
) as portals,
gather_contexts(
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
mngrs=(
p.open_context(worker) for p in portals.values()
),
),
):
assert 0
with pytest.raises(ValueError):
trio.run(main)
@tractor.context
async def worker(
ctx: tractor.Context,
) -> None:
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream(
backpressure=True,
) as stream:
# TODO: this with the below assert causes a hang bug?
# with trio.move_on_after(1):
async with ctx.open_stream(backpressure=True) as stream:
async for msg in stream:
# do something with msg
print(msg)
assert msg == MESSAGE
# TODO: does this ever cause a hang
# assert 0
@tractor_test
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
gather_contexts(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
gather_contexts(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):
with trio.move_on_after(1):
for stream in itertools.cycle(streams):

View File

@ -14,7 +14,6 @@ import itertools
from os import path
from typing import Optional
import platform
import pathlib
import sys
import time
@ -25,10 +24,7 @@ from pexpect.exceptions import (
EOF,
)
from conftest import (
examples_dir,
_ci_env,
)
from conftest import repodir, _ci_env
# TODO: The next great debugger audit could be done by you!
# - recurrent entry to breakpoint() from single actor *after* and an
@ -47,13 +43,19 @@ if platform.system() == 'Windows':
)
def mk_cmd(ex_name: str) -> str:
'''
Generate a command suitable to pass to ``pexpect.spawn()``.
def examples_dir():
"""Return the abspath to the examples directory.
"""
return path.join(repodir(), 'examples', 'debugging/')
'''
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
return ' '.join(['python', str(script_path)])
def mk_cmd(ex_name: str) -> str:
"""Generate a command suitable to pass to ``pexpect.spawn()``.
"""
return ' '.join(
['python',
path.join(examples_dir(), f'{ex_name}.py')]
)
# TODO: was trying to this xfail style but some weird bug i see in CI
@ -95,7 +97,7 @@ def spawn(
return _spawn
PROMPT = r"\(Pdb\+\)"
PROMPT = r"\(Pdb\+\+\)"
def expect(
@ -151,14 +153,27 @@ def ctlc(
use_ctlc = request.param
if (
sys.version_info <= (3, 10)
and use_ctlc
):
# on 3.9 it seems the REPL UX
# is highly unreliable and frankly annoying
# to test for. It does work from manual testing
# but i just don't think it's wroth it to try
# and get this working especially since we want to
# be 3.10+ mega-asap.
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
if ci_env:
node = request.node
markers = node.own_markers
for mark in markers:
if mark.name == 'has_nested_actors':
pytest.skip(
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
f'Test for {node} uses nested actors and fails in CI\n'
f'The test seems to run fine locally but until we solve'
'this issue this CI test will be xfail:\n'
'https://github.com/goodboy/tractor/issues/320'
)
@ -181,15 +196,13 @@ def ctlc(
ids=lambda item: f'{item[0]} -> {item[1]}',
)
def test_root_actor_error(spawn, user_in_out):
'''
Demonstrate crash handler entering pdb from basic error in root actor.
'''
"""Demonstrate crash handler entering pdbpp from basic error in root actor.
"""
user_input, expect_err_str = user_in_out
child = spawn('root_actor_error')
# scan for the prompt
# scan for the pdbpp prompt
expect(child, PROMPT)
before = str(child.before.decode())
@ -220,8 +233,8 @@ def test_root_actor_bp(spawn, user_in_out):
user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint')
# scan for the prompt
child.expect(PROMPT)
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
assert 'Error' not in str(child.before)
@ -262,7 +275,7 @@ def do_ctlc(
if expect_prompt:
before = str(child.before.decode())
time.sleep(delay)
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
time.sleep(delay)
if patt:
@ -281,7 +294,7 @@ def test_root_actor_bp_forever(
# entries
for _ in range(10):
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
if ctlc:
do_ctlc(child)
@ -291,7 +304,7 @@ def test_root_actor_bp_forever(
# do one continue which should trigger a
# new task to lock the tty
child.sendline('continue')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
# seems that if we hit ctrl-c too fast the
# sigint guard machinery might not kick in..
@ -302,10 +315,10 @@ def test_root_actor_bp_forever(
# XXX: this previously caused a bug!
child.sendline('n')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
child.sendline('n')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
# quit out of the loop
child.sendline('q')
@ -328,8 +341,8 @@ def test_subactor_error(
'''
child = spawn('subactor_error')
# scan for the prompt
child.expect(PROMPT)
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
@ -349,7 +362,7 @@ def test_subactor_error(
# creating actor
child.sendline('continue')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
# root actor gets debugger engaged
@ -376,8 +389,8 @@ def test_subactor_breakpoint(
child = spawn('subactor_breakpoint')
# scan for the prompt
child.expect(PROMPT)
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -386,7 +399,7 @@ def test_subactor_breakpoint(
# entries
for _ in range(10):
child.sendline('next')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
if ctlc:
do_ctlc(child)
@ -394,7 +407,7 @@ def test_subactor_breakpoint(
# now run some "continues" to show re-entries
for _ in range(5):
child.sendline('continue')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -405,7 +418,7 @@ def test_subactor_breakpoint(
child.sendline('q')
# child process should exit but parent will capture pdb.BdbQuit
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "RemoteActorError: ('breakpoint_forever'" in before
@ -437,8 +450,8 @@ def test_multi_subactors(
'''
child = spawn(r'multi_subactors')
# scan for the prompt
child.expect(PROMPT)
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -450,7 +463,7 @@ def test_multi_subactors(
# entries
for _ in range(10):
child.sendline('next')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
if ctlc:
do_ctlc(child)
@ -459,7 +472,7 @@ def test_multi_subactors(
child.sendline('c')
# first name_error failure
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert "NameError" in before
@ -471,7 +484,7 @@ def test_multi_subactors(
child.sendline('c')
# 2nd name_error failure
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
# TODO: will we ever get the race where this crash will show up?
# blocklist strat now prevents this crash
@ -485,7 +498,7 @@ def test_multi_subactors(
# breakpoint loop should re-engage
child.sendline('c')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -501,7 +514,7 @@ def test_multi_subactors(
):
child.sendline('c')
time.sleep(0.1)
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
if ctlc:
@ -520,11 +533,11 @@ def test_multi_subactors(
# now run some "continues" to show re-entries
for _ in range(5):
child.sendline('c')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
# quit the loop and expect parent to attach
child.sendline('q')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert_before(child, [
@ -568,7 +581,7 @@ def test_multi_daemon_subactors(
'''
child = spawn('multi_daemon_subactors')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
# there can be a race for which subactor will acquire
# the root's tty lock first so anticipate either crash
@ -598,7 +611,7 @@ def test_multi_daemon_subactors(
# second entry by `bp_forever`.
child.sendline('c')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
assert_before(child, [next_msg])
# XXX: hooray the root clobbering the child here was fixed!
@ -620,14 +633,9 @@ def test_multi_daemon_subactors(
# expect another breakpoint actor entry
child.sendline('c')
child.expect(PROMPT)
try:
child.expect(r"\(Pdb\+\+\)")
assert_before(child, [bp_forever_msg])
except AssertionError:
assert_before(child, [name_error_msg])
else:
if ctlc:
do_ctlc(child)
@ -636,7 +644,7 @@ def test_multi_daemon_subactors(
# after 1 or more further bp actor entries.
child.sendline('c')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
assert_before(child, [name_error_msg])
# wait for final error in root
@ -644,7 +652,7 @@ def test_multi_daemon_subactors(
while True:
try:
child.sendline('c')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
assert_before(
child,
[bp_forever_msg]
@ -652,6 +660,10 @@ def test_multi_daemon_subactors(
except AssertionError:
break
# child.sendline('c')
# assert_before(
# child.sendline('c')
assert_before(
child,
[
@ -677,8 +689,8 @@ def test_multi_subactors_root_errors(
'''
child = spawn('multi_subactor_root_errors')
# scan for the prompt
child.expect(PROMPT)
# scan for the pdbpp prompt
child.expect(r"\(Pdb\+\+\)")
# at most one subactor should attach before the root is cancelled
before = str(child.before.decode())
@ -693,7 +705,7 @@ def test_multi_subactors_root_errors(
# due to block list strat from #337, this will no longer
# propagate before the root errors and cancels the spawner sub-tree.
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
# only if the blocking condition doesn't kick in fast enough
before = str(child.before.decode())
@ -708,7 +720,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child)
child.sendline('c')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
# check if the spawner crashed or was blocked from debug
# and if this intermediary attached check the boxed error
@ -725,7 +737,7 @@ def test_multi_subactors_root_errors(
do_ctlc(child)
child.sendline('c')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
# expect a root actor crash
assert_before(child, [
@ -774,7 +786,7 @@ def test_multi_nested_subactors_error_through_nurseries(
for send_char in itertools.cycle(['c', 'q']):
try:
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
child.sendline(send_char)
time.sleep(0.01)
@ -816,7 +828,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child = spawn('root_cancelled_but_child_is_in_tty_lock')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
@ -831,7 +843,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(4):
time.sleep(0.5)
try:
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
except (
EOF,
@ -888,7 +900,7 @@ def test_root_cancels_child_context_during_startup(
'''
child = spawn('fast_error_in_root_after_spawn')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "AssertionError" in before
@ -905,7 +917,7 @@ def test_different_debug_mode_per_actor(
ctlc: bool,
):
child = spawn('per_actor_debug')
child.expect(PROMPT)
child.expect(r"\(Pdb\+\+\)")
# only one actor should enter the debugger
before = str(child.before.decode())

View File

@ -12,17 +12,17 @@ import shutil
import pytest
from conftest import (
examples_dir,
)
from conftest import repodir
def examples_dir():
"""Return the abspath to the examples directory.
"""
return os.path.join(repodir(), 'examples')
@pytest.fixture
def run_example_in_subproc(
loglevel: str,
testdir,
arb_addr: tuple[str, int],
):
def run_example_in_subproc(loglevel, testdir, arb_addr):
@contextmanager
def run(script_code):
@ -32,8 +32,8 @@ def run_example_in_subproc(
# on windows we need to create a special __main__.py which will
# be executed with ``python -m <modulename>`` on windows..
shutil.copyfile(
examples_dir() / '__main__.py',
str(testdir / '__main__.py'),
os.path.join(examples_dir(), '__main__.py'),
os.path.join(str(testdir), '__main__.py')
)
# drop the ``if __name__ == '__main__'`` guard onwards from
@ -88,7 +88,6 @@ def run_example_in_subproc(
and f[0] != '_'
and 'debugging' not in p[0]
and 'integration' not in p[0]
and 'advanced_faults' not in p[0]
],
ids=lambda t: t[1],

View File

@ -251,7 +251,7 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
results, diff = time_quad_ex
assert results
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.666
assert diff < this_fast

View File

@ -12,10 +12,7 @@ import pytest
import trio
from trio.lowlevel import current_task
import tractor
from tractor.trionics import (
broadcast_receiver,
Lagged,
)
from tractor.trionics import broadcast_receiver, Lagged
@tractor.context
@ -40,7 +37,7 @@ async def echo_sequences(
async def ensure_sequence(
stream: tractor.MsgStream,
stream: tractor.ReceiveMsgStream,
sequence: list,
delay: Optional[float] = None,
@ -214,8 +211,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
arb_addr,
start_method,
):
'''
Ensure that if a faster task consuming from a stream is cancelled
'''Ensure that if a faster task consuming from a stream is cancelled
the slower task can continue to receive all expected values.
'''
@ -464,51 +460,3 @@ def test_first_recver_is_cancelled():
assert value == 1
trio.run(main)
def test_no_raise_on_lag():
'''
Run a simple 2-task broadcast where one task is slow but configured
so that it does not raise `Lagged` on overruns using
`raise_on_lasg=False` and verify that the task does not raise.
'''
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
async def slow():
async with brx.subscribe(
raise_on_lag=False,
) as br:
async for msg in br:
print(f'slow task got: {msg}')
await trio.sleep(0.1)
async def fast():
async with brx.subscribe() as br:
async for msg in br:
print(f'fast task got: {msg}')
async def main():
async with (
tractor.open_root_actor(
# NOTE: so we see the warning msg emitted by the bcaster
# internals when the no raise flag is set.
loglevel='warning',
),
trio.open_nursery() as n,
):
n.start_soon(slow)
n.start_soon(fast)
for i in range(1000):
await tx.send(i)
# simulate user nailing ctl-c after realizing
# there's a lag in the slow task.
await trio.sleep(1)
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt):
trio.run(main)

View File

@ -24,6 +24,7 @@ from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
Context,
ReceiveMsgStream,
MsgStream,
stream,
context,
@ -44,10 +45,7 @@ from ._exceptions import (
ModuleNotExposed,
ContextCancelled,
)
from ._debug import (
breakpoint,
post_mortem,
)
from ._debug import breakpoint, post_mortem
from . import msg
from ._root import (
run_daemon,
@ -66,6 +64,7 @@ __all__ = [
'MsgStream',
'BaseExceptionGroup',
'Portal',
'ReceiveMsgStream',
'RemoteActorError',
'breakpoint',
'context',

View File

@ -32,12 +32,9 @@ import tractor
async def open_actor_cluster(
modules: list[str],
count: int = cpu_count(),
names: list[str] | None = None,
names: Optional[list[str]] = None,
start_method: Optional[str] = None,
hard_kill: bool = False,
# passed through verbatim to ``open_root_actor()``
**runtime_kwargs,
) -> AsyncGenerator[
dict[str, tractor.Portal],
None,
@ -52,9 +49,7 @@ async def open_actor_cluster(
raise ValueError(
'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery(
**runtime_kwargs,
) as an:
async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n:
uid = tractor.current_actor().uid

View File

@ -20,13 +20,9 @@ Multi-core debugging for da peeps!
"""
from __future__ import annotations
import bdb
import os
import sys
import signal
from functools import (
partial,
cached_property,
)
from functools import partial
from contextlib import asynccontextmanager as acm
from typing import (
Any,
@ -37,7 +33,6 @@ from typing import (
)
from types import FrameType
import pdbp
import tractor
import trio
from trio_typing import TaskStatus
@ -54,6 +49,17 @@ from ._exceptions import (
)
from ._ipc import Channel
try:
# wtf: only exported when installed in dev mode?
import pdbpp
except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff
import pdb
xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb
log = get_logger(__name__)
@ -67,7 +73,6 @@ class Lock:
Mostly to avoid a lot of ``global`` declarations for now XD.
'''
repl: MultiActorPdb | None = None
# placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None
@ -106,7 +111,7 @@ class Lock:
def shield_sigint(cls):
cls._orig_sigint_handler = signal.signal(
signal.SIGINT,
shield_sigint_handler,
shield_sigint,
)
@classmethod
@ -141,29 +146,24 @@ class Lock:
finally:
# restore original sigint handler
cls.unshield_sigint()
cls.repl = None
class TractorConfig(pdbp.DefaultConfig):
class TractorConfig(pdbpp.DefaultConfig):
'''
Custom ``pdbp`` goodness :surfer:
Custom ``pdbpp`` goodness.
'''
use_pygments: bool = True
sticky_by_default: bool = False
enable_hidden_frames: bool = False
# much thanks @mdmintz for the hot tip!
# fixes line spacing issue when resizing terminal B)
truncate_long_lines: bool = False
# use_pygments = True
# sticky_by_default = True
enable_hidden_frames = False
class MultiActorPdb(pdbp.Pdb):
class MultiActorPdb(pdbpp.Pdb):
'''
Add teardown hooks to the regular ``pdbp.Pdb``.
Add teardown hooks to the regular ``pdbpp.Pdb``.
'''
# override the pdbp config with our coolio one
# override the pdbpp config with our coolio one
DefaultConfig = TractorConfig
# def preloop(self):
@ -184,35 +184,6 @@ class MultiActorPdb(pdbp.Pdb):
finally:
Lock.release()
# XXX NOTE: we only override this because apparently the stdlib pdb
# bois likes to touch the SIGINT handler as much as i like to touch
# my d$%&.
def _cmdloop(self):
self.cmdloop()
@cached_property
def shname(self) -> str | None:
'''
Attempt to return the login shell name with a special check for
the infamous `xonsh` since it seems to have some issues much
different from std shells when it comes to flushing the prompt?
'''
# SUPER HACKY and only really works if `xonsh` is not used
# before spawning further sub-shells..
shpath = os.getenv('SHELL', None)
if shpath:
if (
os.getenv('XONSH_LOGIN', default=False)
or 'xonsh' in shpath
):
return 'xonsh'
return os.path.basename(shpath)
return None
@acm
async def _acquire_debug_lock_from_root_task(
@ -307,7 +278,7 @@ async def lock_tty_for_child(
) -> str:
'''
Lock the TTY in the root process of an actor tree in a new
inter-actor-context-task such that the ``pdbp`` debugger console
inter-actor-context-task such that the ``pdbpp`` debugger console
can be mutex-allocated to the calling sub-actor for REPL control
without interference by other processes / threads.
@ -417,7 +388,6 @@ async def wait_for_parent_stdin_hijack(
except ContextCancelled:
log.warning('Root actor cancelled debug lock')
raise
finally:
Lock.local_task_in_debug = None
@ -427,7 +397,7 @@ async def wait_for_parent_stdin_hijack(
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb = MultiActorPdb()
# signal.signal = pdbp.hideframe(signal.signal)
# signal.signal = pdbpp.hideframe(signal.signal)
Lock.shield_sigint()
@ -465,10 +435,7 @@ async def _breakpoint(
# with trio.CancelScope(shield=shield):
# await trio.lowlevel.checkpoint()
if (
not Lock.local_pdb_complete
or Lock.local_pdb_complete.is_set()
):
if not Lock.local_pdb_complete or Lock.local_pdb_complete.is_set():
Lock.local_pdb_complete = trio.Event()
# TODO: need a more robust check for the "root" actor
@ -517,7 +484,6 @@ async def _breakpoint(
wait_for_parent_stdin_hijack,
actor.uid,
)
Lock.repl = pdb
except RuntimeError:
Lock.release()
@ -556,7 +522,6 @@ async def _breakpoint(
Lock.global_actor_in_debug = actor.uid
Lock.local_task_in_debug = task_name
Lock.repl = pdb
try:
# block here one (at the appropriate frame *up*) where
@ -577,13 +542,13 @@ async def _breakpoint(
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbp.hideframe(signal.signal)
# # signal.signal = pdbpp.hideframe(signal.signal)
def shield_sigint_handler(
def shield_sigint(
signum: int,
frame: 'frame', # type: ignore # noqa
# pdb_obj: Optional[MultiActorPdb] = None,
pdb_obj: Optional[MultiActorPdb] = None,
*args,
) -> None:
@ -600,7 +565,6 @@ def shield_sigint_handler(
uid_in_debug = Lock.global_actor_in_debug
actor = tractor.current_actor()
# print(f'{actor.uid} in HANDLER with ')
def do_cancel():
# If we haven't tried to cancel the runtime then do that instead
@ -634,9 +598,6 @@ def shield_sigint_handler(
)
return do_cancel()
# only set in the actor actually running the REPL
pdb_obj = Lock.repl
# root actor branch that reports whether or not a child
# has locked debugger.
if (
@ -651,12 +612,10 @@ def shield_sigint_handler(
):
# we are root and some actor is in debug mode
# if uid_in_debug is not None:
if pdb_obj:
name = uid_in_debug[0]
if name != 'root':
log.pdb(
f"Ignoring SIGINT, child in debug mode: `{uid_in_debug}`"
f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`"
)
else:
@ -666,19 +625,19 @@ def shield_sigint_handler(
elif (
is_root_process()
):
if pdb_obj:
log.pdb(
"Ignoring SIGINT since debug mode is enabled"
)
# revert back to ``trio`` handler asap!
Lock.unshield_sigint()
if (
Lock._root_local_task_cs_in_debug
and not Lock._root_local_task_cs_in_debug.cancel_called
):
Lock._root_local_task_cs_in_debug.cancel()
# revert back to ``trio`` handler asap!
Lock.unshield_sigint()
# raise KeyboardInterrupt
# child actor that has locked the debugger
elif not is_root_process():
@ -694,10 +653,7 @@ def shield_sigint_handler(
return do_cancel()
task = Lock.local_task_in_debug
if (
task
and pdb_obj
):
if task:
log.pdb(
f"Ignoring SIGINT while task in debug mode: `{task}`"
)
@ -712,21 +668,14 @@ def shield_sigint_handler(
raise KeyboardInterrupt
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it looks to be that the last command that was run (eg. ll)
# it lookks to be that the last command that was run (eg. ll)
# will be repeated by default.
# maybe redraw/print last REPL output to console since
# we want to alert the user that more input is expect since
# nothing has been done dur to ignoring sigint.
# TODO: maybe redraw/print last REPL output to console
if (
pdb_obj # only when this actor has a REPL engaged
pdb_obj
and sys.version_info <= (3, 10)
):
# XXX: yah, mega hack, but how else do we catch this madness XD
if pdb_obj.shname == 'xonsh':
pdb_obj.stdout.write(pdb_obj.prompt)
pdb_obj.stdout.flush()
# TODO: make this work like sticky mode where if there is output
# detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above?
@ -737,13 +686,21 @@ def shield_sigint_handler(
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
# XXX LEGACY: lol, see ``pdbpp`` issue:
# XXX: lol, see ``pdbpp`` issue:
# https://github.com/pdbpp/pdbpp/issues/496
# TODO: pretty sure this is what we should expect to have to run
# in total but for now we're just going to wait until `pdbpp`
# figures out it's own stuff on 3.10 (and maybe we'll help).
# pdb_obj.do_longlist(None)
# XXX: we were doing this but it shouldn't be required..
print(pdb_obj.prompt, end='', flush=True)
def _set_trace(
actor: tractor.Actor | None = None,
pdb: MultiActorPdb | None = None,
actor: Optional[tractor.Actor] = None,
pdb: Optional[MultiActorPdb] = None,
):
__tracebackhide__ = True
actor = actor or tractor.current_actor()
@ -753,11 +710,7 @@ def _set_trace(
if frame:
frame = frame.f_back # type: ignore
if (
frame
and pdb
and actor is not None
):
if frame and pdb and actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
# no f!#$&* idea, but when we're in async land
# we need 2x frames up?
@ -766,8 +719,7 @@ def _set_trace(
else:
pdb, undo_sigint = mk_mpdb()
# we entered the global ``breakpoint()`` built-in from sync
# code?
# we entered the global ``breakpoint()`` built-in from sync code?
Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame)
@ -797,7 +749,7 @@ def _post_mortem(
# https://github.com/pdbpp/pdbpp/issues/480
# TODO: help with a 3.10+ major release if/when it arrives.
pdbp.xpm(Pdb=lambda: pdb)
pdbpp.xpm(Pdb=lambda: pdb)
post_mortem = partial(
@ -868,10 +820,7 @@ async def maybe_wait_for_debugger(
) -> None:
if (
not debug_mode()
and not child_in_debug
):
if not debug_mode() and not child_in_debug:
return
if (

View File

@ -108,7 +108,7 @@ async def query_actor(
@acm
async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
arbiter_sockaddr: tuple[str, int] = None
) -> AsyncGenerator[Optional[Portal], None]:
'''
@ -134,7 +134,7 @@ async def find_actor(
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
arbiter_sockaddr: tuple[str, int] = None
) -> AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.

View File

@ -51,7 +51,7 @@ def _mp_main(
accept_addr: tuple[str, int],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: tuple[str, int] | None = None,
parent_addr: tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
@ -98,7 +98,7 @@ def _trio_main(
actor: Actor, # type: ignore
*,
parent_addr: tuple[str, int] | None = None,
parent_addr: tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:

View File

@ -341,7 +341,7 @@ class Channel:
async def connect(
self,
destaddr: tuple[Any, ...] | None = None,
destaddr: tuple[Any, ...] = None,
**kwargs
) -> MsgTransport:

View File

@ -45,10 +45,7 @@ from ._exceptions import (
NoResult,
ContextCancelled,
)
from ._streaming import (
Context,
MsgStream,
)
from ._streaming import Context, ReceiveMsgStream
log = get_logger(__name__)
@ -104,7 +101,7 @@ class Portal:
# it is expected that ``result()`` will be awaited at some
# point.
self._expect_result: Optional[Context] = None
self._streams: set[MsgStream] = set()
self._streams: set[ReceiveMsgStream] = set()
self.actor = current_actor()
async def _submit_for_result(
@ -189,7 +186,7 @@ class Portal:
async def cancel_actor(
self,
timeout: float | None = None,
timeout: float = None,
) -> bool:
'''
@ -319,7 +316,7 @@ class Portal:
async_gen_func: Callable, # typing: ignore
**kwargs,
) -> AsyncGenerator[MsgStream, None]:
) -> AsyncGenerator[ReceiveMsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func):
if not (
@ -344,7 +341,7 @@ class Portal:
try:
# deliver receive only stream
async with MsgStream(
async with ReceiveMsgStream(
ctx, ctx._recv_chan,
) as rchan:
self._streams.add(rchan)
@ -500,10 +497,6 @@ class Portal:
f'actor: {uid}'
)
result = await ctx.result()
log.runtime(
f'Context {fn_name} returned '
f'value from callee `{result}`'
)
# though it should be impossible for any tasks
# operating *in* this scope to have survived
@ -525,6 +518,12 @@ class Portal:
f'task:{cid}\n'
f'actor:{uid}'
)
else:
log.runtime(
f'Context {fn_name} returned '
f'value from callee `{result}`'
)
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
@ -537,10 +536,7 @@ class Portal:
await maybe_wait_for_debugger()
# remove the context from runtime tracking
self.actor._contexts.pop(
(self.channel.uid, ctx.cid),
None,
)
self.actor._contexts.pop((self.channel.uid, ctx.cid))
@dataclass

View File

@ -22,9 +22,11 @@ from contextlib import asynccontextmanager
from functools import partial
import importlib
import logging
import signal
import sys
import os
import signal
from typing import (
Optional,
)
import typing
import warnings
@ -56,28 +58,27 @@ logger = log.get_logger('tractor')
@asynccontextmanager
async def open_root_actor(
*,
# defaults are above
arbiter_addr: tuple[str, int] | None = None,
arbiter_addr: Optional[tuple[str, int]] = (
_default_arbiter_host,
_default_arbiter_port,
),
# defaults are above
registry_addr: tuple[str, int] | None = None,
name: str | None = 'root',
name: Optional[str] = 'root',
# either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default).
start_method: _spawn.SpawnMethodKey | None = None,
start_method: Optional[_spawn.SpawnMethodKey] = None,
# enables the multi-process debugger support
debug_mode: bool = False,
# internal logging
loglevel: str | None = None,
loglevel: Optional[str] = None,
enable_modules: list | None = None,
rpc_module_paths: list | None = None,
enable_modules: Optional[list] = None,
rpc_module_paths: Optional[list] = None,
) -> typing.Any:
'''
@ -85,10 +86,8 @@ async def open_root_actor(
'''
# Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in:
# ``trio``, see:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it
@ -113,22 +112,10 @@ async def open_root_actor(
if start_method is not None:
_spawn.try_set_start_method(start_method)
if arbiter_addr is not None:
warnings.warn(
'`arbiter_addr` is now deprecated and has been renamed to'
'`registry_addr`.\nUse that instead..',
DeprecationWarning,
stacklevel=2,
)
registry_addr = (host, port) = (
registry_addr
or arbiter_addr
or (
arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host,
_default_arbiter_port,
)
)
loglevel = (loglevel or log._default_loglevel).upper()
@ -173,7 +160,7 @@ async def open_root_actor(
except OSError:
# TODO: make this a "discovery" log level?
logger.warning(f"No actor registry found @ {host}:{port}")
logger.warning(f"No actor could be found @ {host}:{port}")
# create a local actor and start up its main routine/task
if arbiter_found:
@ -183,7 +170,7 @@ async def open_root_actor(
actor = Actor(
name or 'anonymous',
arbiter_addr=registry_addr,
arbiter_addr=arbiter_addr,
loglevel=loglevel,
enable_modules=enable_modules,
)
@ -199,7 +186,7 @@ async def open_root_actor(
actor = Arbiter(
name or 'arbiter',
arbiter_addr=registry_addr,
arbiter_addr=arbiter_addr,
loglevel=loglevel,
enable_modules=enable_modules,
)
@ -257,15 +244,6 @@ async def open_root_actor(
await actor.cancel()
finally:
_state._current_actor = None
# restore breakpoint hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT')
logger.runtime("Root actor terminated")
@ -273,13 +251,13 @@ def run_daemon(
enable_modules: list[str],
# runtime kwargs
name: str | None = 'root',
registry_addr: tuple[str, int] = (
name: Optional[str] = 'root',
arbiter_addr: tuple[str, int] = (
_default_arbiter_host,
_default_arbiter_port,
),
start_method: str | None = None,
start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs
@ -301,7 +279,7 @@ def run_daemon(
async def _main():
async with open_root_actor(
registry_addr=registry_addr,
arbiter_addr=arbiter_addr,
name=name,
start_method=start_method,
debug_mode=debug_mode,

View File

@ -228,11 +228,11 @@ async def _invoke(
fname = func.__name__
if ctx._cancel_called:
msg = f'`{fname}()` cancelled itself'
msg = f'{fname} cancelled itself'
elif cs.cancel_called:
msg = (
f'`{fname}()` was remotely cancelled by its caller '
f'{fname} was remotely cancelled by its caller '
f'{ctx.chan.uid}'
)
@ -319,7 +319,7 @@ async def _invoke(
BrokenPipeError,
):
# if we can't propagate the error that's a big boo boo
log.exception(
log.error(
f"Failed to ship error to caller @ {chan.uid} !?"
)
@ -423,8 +423,8 @@ class Actor:
name: str,
*,
enable_modules: list[str] = [],
uid: str | None = None,
loglevel: str | None = None,
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[tuple[str, int]] = None,
spawn_method: Optional[str] = None
) -> None:
@ -455,7 +455,7 @@ class Actor:
self._mods: dict[str, ModuleType] = {}
self.loglevel = loglevel
self._arb_addr: tuple[str, int] | None = (
self._arb_addr = (
str(arbiter_addr[0]),
int(arbiter_addr[1])
) if arbiter_addr else None
@ -488,10 +488,7 @@ class Actor:
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[
tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: dict[
tuple[str, str],
ActorNursery | None,
] = {} # type: ignore # noqa
self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa
async def wait_for_peer(
self, uid: tuple[str, str]
@ -829,12 +826,7 @@ class Actor:
if ctx._backpressure:
log.warning(text)
try:
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
else:
try:
raise StreamOverrun(text) from None
@ -988,7 +980,7 @@ class Actor:
handler_nursery: trio.Nursery,
*,
# (host, port) to bind for channel server
accept_host: tuple[str, int] | None = None,
accept_host: tuple[str, int] = None,
accept_port: int = 0,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -1379,12 +1371,10 @@ async def async_main(
actor.lifetime_stack.close()
# Unregister actor from the arbiter
if (
registered_with_arbiter
and not actor.is_arbiter
if registered_with_arbiter and (
actor._arb_addr is not None
):
failed = False
assert isinstance(actor._arb_addr, tuple)
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
@ -1609,10 +1599,7 @@ async def process_messages(
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up.
log.runtime(
f'channel from {chan.uid} closed abruptly:\n'
f'-> {chan.raddr}\n'
)
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
@ -1661,28 +1648,17 @@ class Arbiter(Actor):
'''
is_arbiter = True
def __init__(self, *args, **kwargs) -> None:
def __init__(self, *args, **kwargs):
self._registry: dict[
tuple[str, str],
tuple[str, int],
] = {}
self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
# is filled in once the actor has sucessfully registered),
# or that uid after registry is complete.
list[trio.Event | tuple[str, str]]
] = {}
self._waiters = {}
super().__init__(*args, **kwargs)
async def find_actor(
self,
name: str,
) -> tuple[str, int] | None:
async def find_actor(self, name: str) -> Optional[tuple[str, int]]:
for uid, sockaddr in self._registry.items():
if name in uid:
return sockaddr
@ -1717,8 +1693,7 @@ class Arbiter(Actor):
registered.
'''
sockaddrs: list[tuple[str, int]] = []
sockaddr: tuple[str, int]
sockaddrs = []
for (aname, _), sockaddr in self._registry.items():
if name == aname:
@ -1728,9 +1703,7 @@ class Arbiter(Actor):
waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
sockaddrs.append(self._registry[uid])
return sockaddrs
@ -1741,11 +1714,11 @@ class Arbiter(Actor):
sockaddr: tuple[str, int]
) -> None:
uid = name, _ = (str(uid[0]), str(uid[1]))
uid = name, uuid = (str(uid[0]), str(uid[1]))
self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
# pop and signal all waiter events
events = self._waiters.pop(name, [])
events = self._waiters.pop(name, ())
self._waiters.setdefault(name, []).append(uid)
for event in events:
if isinstance(event, trio.Event):

View File

@ -23,12 +23,13 @@ import sys
import platform
from typing import (
Any,
Awaitable,
Literal,
Optional,
Callable,
TypeVar,
TYPE_CHECKING,
)
from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup
import trio
@ -59,7 +60,7 @@ if TYPE_CHECKING:
log = get_logger('tractor')
# placeholder for an mp start context if so using that backend
_ctx: mp.context.BaseContext | None = None
_ctx: Optional[mp.context.BaseContext] = None
SpawnMethodKey = Literal[
'trio', # supported on all platforms
'mp_spawn',
@ -85,7 +86,7 @@ else:
def try_set_start_method(
key: SpawnMethodKey
) -> mp.context.BaseContext | None:
) -> Optional[mp.context.BaseContext]:
'''
Attempt to set the method for process starting, aka the "actor
spawning backend".
@ -199,37 +200,16 @@ async def cancel_on_completion(
async def do_hard_kill(
proc: trio.Process,
terminate_after: int = 3,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs:
# NOTE: code below was copied verbatim from the now deprecated
# (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc
# string:
#
# Close any pipes we have to the process (both input and output)
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
if proc.stdout is not None:
await proc.stdout.aclose()
if proc.stderr is not None:
await proc.stderr.aclose()
try:
await proc.wait()
finally:
if proc.returncode is None:
proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
@ -280,9 +260,7 @@ async def soft_wait(
if proc.poll() is None: # type: ignore
log.warning(
'Actor still alive after cancel request:\n'
f'{uid}'
)
f'Process still alive after cancel request:\n{uid}')
n.cancel_scope.cancel()
raise
@ -375,11 +353,12 @@ async def trio_proc(
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: trio.Process | None = None
proc: Optional[trio.Process] = None
try:
try:
# TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process(spawn_cmd)
proc = await trio.lowlevel.open_process( # type: ignore
spawn_cmd)
log.runtime(f"Started {proc}")
@ -463,8 +442,8 @@ async def trio_proc(
nursery.cancel_scope.cancel()
finally:
# XXX NOTE XXX: The "hard" reap since no actor zombies are
# allowed! Do this **after** cancellation/teardown to avoid
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
@ -478,13 +457,6 @@ async def trio_proc(
await proc.wait()
if is_root_process():
# TODO: solve the following issue where we need
# to do a similar wait like this but in an
# "intermediary" parent actor that itself isn't
# in debug but has a child that is, and we need
# to hold off on relaying SIGINT until that child
# is complete.
# https://github.com/goodboy/tractor/issues/320
await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False),

View File

@ -50,13 +50,12 @@ log = get_logger(__name__)
# - use __slots__ on ``Context``?
class MsgStream(trio.abc.Channel):
class ReceiveMsgStream(trio.abc.ReceiveChannel):
'''
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC ``Channel``.
This is the type returned to a local task which entered either
``Portal.open_stream_from()`` or ``Context.open_stream()``.
A IPC message stream for receiving logically sequenced values over
an inter-actor ``Channel``. This is the type returned to a local
task which entered either ``Portal.open_stream_from()`` or
``Context.open_stream()``.
Termination rules:
@ -98,9 +97,6 @@ class MsgStream(trio.abc.Channel):
if self._eoc:
raise trio.EndOfChannel
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
try:
msg = await self._rx_chan.receive()
return msg['yield']
@ -114,9 +110,6 @@ class MsgStream(trio.abc.Channel):
# - 'error'
# possibly just handle msg['stop'] here!
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or self._eoc:
log.debug(f"{self} was stopped at remote end")
@ -196,6 +189,7 @@ class MsgStream(trio.abc.Channel):
return
self._eoc = True
self._closed = True
# NOTE: this is super subtle IPC messaging stuff:
# Relay stop iteration to far end **iff** we're
@ -212,8 +206,12 @@ class MsgStream(trio.abc.Channel):
# In the bidirectional case, `Context.open_stream()` will create
# the `Actor._cids2qs` entry from a call to
# `Actor.get_context()` and will call us here to send the stop
# msg in ``__aexit__()`` on teardown.
# `Actor.get_context()` and will send the stop message in
# ``__aexit__()`` on teardown so it **does not** need to be
# called here.
if not self._ctx._portal:
# Only for 2 way streams can we can send stop from the
# caller side.
try:
# NOTE: if this call is cancelled we expect this end to
# handle as though the stop was never sent (though if it
@ -230,14 +228,7 @@ class MsgStream(trio.abc.Channel):
# the underlying channel may already have been pulled
# in which case our stop message is meaningless since
# it can't traverse the transport.
ctx = self._ctx
log.warning(
f'Stream was already destroyed?\n'
f'actor: {ctx.chan.uid}\n'
f'ctx id: {ctx.cid}'
)
self._closed = True
log.debug(f'Channel for {self} was already closed')
# Do we close the local mem chan ``self._rx_chan`` ??!?
@ -280,8 +271,7 @@ class MsgStream(trio.abc.Channel):
self,
) -> AsyncIterator[BroadcastReceiver]:
'''
Allocate and return a ``BroadcastReceiver`` which delegates
'''Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream.
This allows multiple local tasks to receive each their own copy
@ -318,15 +308,15 @@ class MsgStream(trio.abc.Channel):
async with self._broadcaster.subscribe() as bstream:
assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
# NOTE: we patch on a `.send()` to the bcaster so that the
# caller can still conduct 2-way streaming using this
# ``bstream`` handle transparently as though it was the msg
# stream instance.
bstream.send = self.send # type: ignore
yield bstream
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
'''
Bidirectional message stream for use within an inter-actor actor
``Context```.
'''
async def send(
self,
data: Any
@ -603,23 +593,23 @@ class Context:
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
) as rchan:
if self._portal:
self._portal._streams.add(stream)
self._portal._streams.add(rchan)
try:
self._stream_opened = True
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# ensure we aren't cancelled before delivering
# the stream
# await trio.lowlevel.checkpoint()
yield stream
yield rchan
# NOTE: Make the stream "one-shot use". On exit, signal
# XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await stream.aclose()
await self.send_stop()
finally:
if self._portal:

View File

@ -111,11 +111,11 @@ class ActorNursery:
name: str,
*,
bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
nursery: trio.Nursery | None = None,
debug_mode: Optional[bool] | None = None,
rpc_module_paths: list[str] = None,
enable_modules: list[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
debug_mode: Optional[bool] = None,
infect_asyncio: bool = False,
) -> Portal:
'''
@ -182,9 +182,9 @@ class ActorNursery:
name: Optional[str] = None,
bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
rpc_module_paths: Optional[list[str]] = None,
enable_modules: list[str] = None,
loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn``
@ -302,7 +302,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay?
__tracebackhide__ = True
# __tracebackhide__ = True
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {}

View File

@ -48,7 +48,7 @@ log = get_logger('messaging')
async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: dict[str, list],
packetizer: typing.Callable | None = None,
packetizer: typing.Callable = None,
) -> None:
'''
Request and fan out quotes to each subscribed actor channel.
@ -144,7 +144,7 @@ _pubtask2lock: dict[str, trio.StrictFIFOLock] = {}
def pub(
wrapped: typing.Callable | None = None,
wrapped: typing.Callable = None,
*,
tasks: set[str] = set(),
):
@ -249,8 +249,8 @@ def pub(
topics: set[str],
*args,
# *,
task_name: str | None = None, # default: only one task allocated
packetizer: Callable | None = None,
task_name: str = None, # default: only one task allocated
packetizer: Callable = None,
**kwargs,
):
if task_name is None:

View File

@ -172,7 +172,7 @@ class ActorContextInfo(Mapping):
def get_logger(
name: str | None = None,
name: str = None,
_root_name: str = _proj_name,
) -> StackLevelAdapter:
@ -207,7 +207,7 @@ def get_logger(
def get_console_log(
level: str | None = None,
level: str = None,
**kwargs,
) -> logging.LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr.

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from abc import abstractmethod
from collections import deque
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import partial
from operator import ne
from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol
@ -32,10 +33,7 @@ import trio
from trio._core._run import Task
from trio.abc import ReceiveChannel
from trio.lowlevel import current_task
from msgspec import Struct
from tractor.log import get_logger
log = get_logger(__name__)
# A regular invariant generic type
T = TypeVar("T")
@ -88,7 +86,8 @@ class Lagged(trio.TooSlowError):
'''
class BroadcastState(Struct):
@dataclass
class BroadcastState:
'''
Common state to all receivers of a broadcast.
@ -111,35 +110,7 @@ class BroadcastState(Struct):
eoc: bool = False
# If the broadcaster was cancelled, we might as well track it
cancelled: dict[int, Task] = {}
def statistics(self) -> dict[str, Any]:
'''
Return broadcast receiver group "statistics" like many of
``trio``'s internal task-sync primitives.
'''
key: int | None
ev: trio.Event | None
subs = self.subs
if self.recv_ready is not None:
key, ev = self.recv_ready
else:
key = ev = None
qlens: dict[int, int] = {}
for tid, sz in subs.items():
qlens[tid] = sz if sz != -1 else 0
return {
'open_consumers': len(subs),
'queued_len_by_task': qlens,
'max_buffer_size': self.maxlen,
'tasks_waiting': ev.statistics().tasks_waiting if ev else 0,
'tasks_cancelled': self.cancelled,
'next_value_receiver_id': key,
}
cancelled: bool = False
class BroadcastReceiver(ReceiveChannel):
@ -157,40 +128,23 @@ class BroadcastReceiver(ReceiveChannel):
rx_chan: AsyncReceiver,
state: BroadcastState,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = True,
) -> None:
# register the original underlying (clone)
self.key = id(self)
self._state = state
# each consumer has an int count which indicates
# which index contains the next value that the task has not yet
# consumed and thus should read. In the "up-to-date" case the
# consumer task must wait for a new value from the underlying
# receiver and we use ``-1`` as the sentinel for this state.
state.subs[self.key] = -1
# underlying for this receiver
self._rx = rx_chan
self._recv = receive_afunc or rx_chan.receive
self._closed: bool = False
self._raise_on_lag = raise_on_lag
def receive_nowait(
self,
_key: int | None = None,
_state: BroadcastState | None = None,
async def receive(self) -> ReceiveType:
) -> Any:
'''
Sync version of `.receive()` which does all the low level work
of receiving from the underlying/wrapped receive channel.
'''
key = _key or self.key
state = _state or self._state
key = self.key
state = self._state
# TODO: ideally we can make some way to "lock out" the
# underlying receive channel in some way such that if some task
@ -223,47 +177,32 @@ class BroadcastReceiver(ReceiveChannel):
# return this value."
# https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging
mxln = state.maxlen
lost = seq - mxln
# decrement to the last value and expect
# consumer to either handle the ``Lagged`` and come back
# or bail out on its own (thus un-subscribing)
state.subs[key] = mxln - 1
state.subs[key] = state.maxlen - 1
# this task was overrun by the producer side
task: Task = current_task()
msg = f'Task `{task.name}` overrun and dropped `{lost}` values'
if self._raise_on_lag:
raise Lagged(msg)
else:
log.warning(msg)
return self.receive_nowait(_key, _state)
raise Lagged(f'Task {task.name} was overrun')
state.subs[key] -= 1
return value
raise trio.WouldBlock
async def _receive_from_underlying(
self,
key: int,
state: BroadcastState,
) -> ReceiveType:
# current task already has the latest value **and** is the
# first task to begin waiting for a new one
if state.recv_ready is None:
if self._closed:
raise trio.ClosedResourceError
event = trio.Event()
assert state.recv_ready is None
state.recv_ready = key, event
try:
# if we're cancelled here it should be
# fine to bail without affecting any other consumers
# right?
try:
value = await self._recv()
# items with lower indices are "newer"
@ -281,6 +220,7 @@ class BroadcastReceiver(ReceiveChannel):
# already retreived the last value
# XXX: which of these impls is fastest?
# subs = state.subs.copy()
# subs.pop(key)
@ -311,85 +251,54 @@ class BroadcastReceiver(ReceiveChannel):
# consumers will be awoken with a sequence of -1
# and will potentially try to rewait the underlying
# receiver instead of just cancelling immediately.
self._state.cancelled[key] = current_task()
self._state.cancelled = True
if event.statistics().tasks_waiting:
event.set()
raise
finally:
# Reset receiver waiter task event for next blocking condition.
# this MUST be reset even if the above ``.recv()`` call
# was cancelled to avoid the next consumer from blocking on
# an event that won't be set!
state.recv_ready = None
async def receive(self) -> ReceiveType:
key = self.key
state = self._state
try:
return self.receive_nowait(
_key=key,
_state=state,
)
except trio.WouldBlock:
pass
# current task already has the latest value **and** is the
# first task to begin waiting for a new one so we begin blocking
# until rescheduled with the a new value from the underlying.
if state.recv_ready is None:
return await self._receive_from_underlying(key, state)
# This task is all caught up and ready to receive the latest
# value, so queue/schedule it to be woken on the next internal
# event.
# value, so queue sched it on the internal event.
else:
while state.recv_ready is not None:
# seq = state.subs[key]
# assert seq == -1 # sanity
seq = state.subs[key]
assert seq == -1 # sanity
_, ev = state.recv_ready
await ev.wait()
try:
return self.receive_nowait(
_key=key,
_state=state,
)
except trio.WouldBlock:
if self._closed:
raise trio.ClosedResourceError
subs = state.subs
if (
len(subs) == 1
and key in subs
# or cancelled
):
# XXX: we are the last and only user of this BR so
# likely it makes sense to unwind back to the
# underlying?
# import tractor
# await tractor.breakpoint()
log.warning(
f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?'
)
# NOTE: if we ever would like the behaviour where if the
# first task to recv on the underlying is cancelled but it
# still DOES trigger the ``.recv_ready``, event we'll likely need
# this logic:
if seq > -1:
# stuff from above..
seq = state.subs[key]
value = state.queue[seq]
state.subs[key] -= 1
return value
elif seq == -1:
# XXX: In the case where the first task to allocate the
# ``.recv_ready`` event is cancelled we will be woken
# with a non-incremented sequence number (the ``-1``
# sentinel) and thus will read the oldest value if we
# use that. Instead we need to detect if we have not
# been incremented and then receive again.
# return await self.receive()
# ``.recv_ready`` event is cancelled we will be woken with
# a non-incremented sequence number and thus will read the
# oldest value if we use that. Instead we need to detect if
# we have not been incremented and then receive again.
return await self.receive()
return await self._receive_from_underlying(key, state)
else:
raise ValueError(f'Invalid sequence {seq}!?')
@asynccontextmanager
async def subscribe(
self,
raise_on_lag: bool = True,
) -> AsyncIterator[BroadcastReceiver]:
'''
Subscribe for values from this broadcast receiver.
@ -407,7 +316,6 @@ class BroadcastReceiver(ReceiveChannel):
rx_chan=self._rx,
state=state,
receive_afunc=self._recv,
raise_on_lag=raise_on_lag,
)
# assert clone in state.subs
assert br.key in state.subs
@ -444,8 +352,7 @@ def broadcast_receiver(
recv_chan: AsyncReceiver,
max_buffer_size: int,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = True,
**kwargs,
) -> BroadcastReceiver:
@ -456,6 +363,5 @@ def broadcast_receiver(
maxlen=max_buffer_size,
subs={},
),
receive_afunc=receive_afunc,
raise_on_lag=raise_on_lag,
**kwargs,
)

View File

@ -47,7 +47,7 @@ T = TypeVar("T")
@acm
async def maybe_open_nursery(
nursery: trio.Nursery | None = None,
nursery: trio.Nursery = None,
shield: bool = False,
) -> AsyncGenerator[trio.Nursery, Any]:
'''
@ -109,17 +109,6 @@ async def gather_contexts(
all_entered = trio.Event()
parent_exit = trio.Event()
# XXX: ensure greedy sequence of manager instances
# since a lazy inline generator doesn't seem to work
# with `async with` syntax.
mngrs = list(mngrs)
if not mngrs:
raise ValueError(
'input mngrs is empty?\n'
'Did try to use inline generator syntax?'
)
async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
@ -133,12 +122,12 @@ async def gather_contexts(
# deliver control once all managers have started up
await all_entered.wait()
try:
# NOTE: order *should* be preserved in the output values
# since ``dict``s are now implicitly ordered.
yield tuple(unwrapped.values())
finally:
# NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
# we don't need a try/finally since cancellation will be triggered
# by the surrounding nursery on error.
parent_exit.set()