Compare commits


No commits in common. "master" and "callable_key_maybe_open_context" have entirely different histories.

47 changed files with 721 additions and 1846 deletions

View File

@ -6,14 +6,8 @@
``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
Fundamentally, ``tractor`` gives you parallelism via
``trio``-"*actors*": independent Python processes (aka
non-shared-memory threads) which maintain structured
concurrency (SC) *end-to-end* inside a *supervision tree*.
Cross-process (and thus cross-host) SC is accomplished through the
combined use of our "actor nurseries_" and an "SC-transitive IPC
protocol" constructed on top of multiple Pythons each running a ``trio``
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
our nurseries_ let you spawn new Python processes which each run a ``trio``
scheduled runtime - a call to ````.
We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
@ -29,8 +23,7 @@ Features
- **It's just** a ``trio`` API
- *Infinitely nesteable* process trees
- Builtin IPC streaming APIs with task fan-out broadcasting
- A "native" multi-core debugger REPL using `pdbp`_ (a fork & fix of
`pdb++`_ thanks to @mdmintz!)
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
- Support for a swappable, OS specific, process spawning layer
- A modular transport stack, allowing for custom serialization (eg. with
`msgspec`_), communications protocols, and environment specific IPC
@ -156,7 +149,7 @@ it **is a bug**.
"Native" multi-process debugging
Using the magic of `pdbp`_ and our internal IPC, we've
Using the magic of `pdb++`_ and our internal IPC, we've
been able to create a native feeling debugging experience for
any (sub-)process in your ``tractor`` tree.
@ -604,7 +597,6 @@ channel`_!
.. _adherance to:
.. _trio gitter channel:
.. _matrix channel:!
.. _pdbp:
.. _pdb++:
.. _guest mode:
.. _messages:

View File

@ -1,151 +0,0 @@
Complex edge case where during real-time streaming the IPC tranport
channels are wiped out (purposely in this example though it could have
been an outage) and we want to ensure that despite being in debug mode
(or not) the user can sent SIGINT once they notice the hang and the
actor tree will eventually be cancelled without leaving any zombies.
import trio
from tractor import (
async def break_channel_silently_then_error(
stream: MsgStream,
async for msg in stream:
await stream.send(msg)
# XXX: close the channel right after an error is raised
# purposely breaking the IPC transport to make sure the parent
# doesn't get stuck in debug or hang on the connection join.
# this more or less simulates an infinite msg-receive hang on
# the other end.
await stream._ctx.chan.send(None)
assert 0
async def close_stream_and_error(
stream: MsgStream,
async for msg in stream:
await stream.send(msg)
# wipe out channel right before raising
await stream._ctx.chan.send(None)
await stream.aclose()
assert 0
async def recv_and_spawn_net_killers(
ctx: Context,
break_ipc_after: bool | int = False,
) -> None:
Receive stream msgs and spawn some IPC killers mid-stream.
await ctx.started()
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
async for i in stream:
print(f'child echoing {i}')
await stream.send(i)
if (
and i > break_ipc_after
'Simulating child-side IPC BREAK!\n'
n.start_soon(break_channel_silently_then_error, stream)
n.start_soon(close_stream_and_error, stream)
async def main(
debug_mode: bool = False,
start_method: str = 'trio',
# by default we break the parent IPC first (if configured to break
# at all), but this can be changed so the child does first (even if
# both are set to break).
break_parent_ipc_after: int | bool = False,
break_child_ipc_after: int | bool = False,
) -> None:
async with (
# NOTE: even debugger is used we shouldn't get
# a hang since it never engages due to broken IPC
) as an,
portal = await an.start_actor(
async with portal.open_context(
) as (ctx, sent):
async with ctx.open_stream() as stream:
for i in range(1000):
if (
and i > break_parent_ipc_after
'Simulating parent-side IPC BREAK!\n'
await stream._ctx.chan.send(None)
# it actually breaks right here in the
# mp_spawn/forkserver backends and thus the zombie
# reaper never even kicks in?
print(f'parent sending {i}')
await stream.send(i)
with trio.move_on_after(2) as cs:
# NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's
# caller/this-parent.
rx = await stream.receive()
print(f"I'm a happy user and echoed to me is {rx}")
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print("YOO i'm a user anddd thingz hangin..")
"YOO i'm mad send side dun but thingz hangin..\n"
'MASHING CTlR-C Ctl-c..'
raise KeyboardInterrupt
if __name__ == '__main__':

View File

@ -27,17 +27,6 @@ async def main():
# retreive results
async with p0.open_stream_from(breakpoint_forever) as stream:
# triggers the first name error
except tractor.RemoteActorError as rae:
assert rae.type is NameError
async for i in stream:
# a second time try the failing subactor and this tie
# let error propagate up to the parent/nursery.

View File

@ -12,31 +12,18 @@ async def breakpoint_forever():
while True:
await tractor.breakpoint()
# NOTE: if the test never sent 'q'/'quit' commands
# on the pdb repl, without this checkpoint line the
# repl would spin in this actor forever.
# await trio.sleep(0)
async def spawn_until(depth=0):
""""A nested nursery that triggers another ``NameError``.
async with tractor.open_nursery() as n:
if depth < 1:
await n.run_in_actor(breakpoint_forever)
p = await n.run_in_actor(
# await n.run_in_actor('breakpoint_forever', breakpoint_forever)
await n.run_in_actor(
await trio.sleep(0.5)
# rx and propagate error from child
await p.result()
# recusrive call to spawn another process branching layer of
# the tree
depth -= 1
await n.run_in_actor(
@ -66,7 +53,6 @@ async def main():
async with tractor.open_nursery(
# loglevel='cancel',
) as n:
# spawn both actors
@ -81,16 +67,8 @@ async def main():
# TODO: test this case as well where the parent don't see
# the sub-actor errors by default and instead expect a user
# ctrl-c to kill the root.
with trio.move_on_after(3):
await trio.sleep_forever()
# gah still an issue here.
await portal.result()
# should never get here
await portal1.result()

View File

@ -1,24 +0,0 @@
import os
import sys
import trio
import tractor
async def main() -> None:
async with tractor.open_nursery(debug_mode=True) as an:
assert os.environ['PYTHONBREAKPOINT'] == 'tractor._debug._set_trace'
# TODO: an assert that verifies the hook has indeed been, hooked
# XD
assert sys.breakpointhook is not tractor._debug._set_trace
# TODO: an assert that verifies the hook is unhooked..
assert sys.breakpointhook
if __name__ == '__main__':

View File

@ -1,25 +0,0 @@
Add support for ``trio >= 0.22`` and support for the new Python 3.11
``[Base]ExceptionGroup`` from `pep 654`_ via the backported
`exceptiongroup`_ package and some final fixes to the debug mode
This port ended up driving some (hopefully) final fixes to our debugger
subsystem including the solution to all lingering stdstreams locking
race-conditions and deadlock scenarios. This includes extending the
debugger tests suite as well as cancellation and ``asyncio`` mode cases.
Some of the notable details:
- always reverting to the ``trio`` SIGINT handler when leaving debug
- bypassing child attempts to acquire the debug lock when detected
to be amdist actor-runtime-cancellation.
- allowing the root actor to cancel local but IPC-stale subactor
requests-tasks for the debug lock when in a "no IPC peers" state.
Further we refined our ``ActorNursery`` semantics to be more similar to
``trio`` in the sense that parent task errors are always packed into the
actor-nursery emitted exception group and adjusted all tests and
examples accordingly.
.. _pep 654:
.. _exceptiongroup:

View File

@ -1,41 +0,0 @@
Add support for debug-lock blocking using a ``._debug.Lock._blocked:
set[tuple]`` and add ids when no-more IPC connections with the
root actor are detected.
This is an enhancement which (mostly) solves a lingering debugger
locking race case we needed to handle:
- child crashes acquires TTY lock in root and attaches to ``pdb``
- child IPC goes down such that all channels to the root are broken
/ non-functional.
- root is stuck thinking the child is still in debug even though it
can't be contacted and the child actor machinery hasn't been
cancelled by its parent.
- root get's stuck in deadlock with child since it won't send a cancel
request until the child is finished debugging (to avoid clobbering
a child that is actually using the debugger), but the child can't
unlock the debugger bc IPC is down and it can't contact the root.
To avoid this scenario add debug lock blocking list via
`._debug.Lock._blocked: set[tuple]` which holds actor uids for any actor
that is detected by the root as having no transport channel connections
(of which at least one should exist if this sub-actor at some point
acquired the debug lock). The root consequently checks this list for any
actor that tries to (re)acquire the lock and blocks with
a ``ContextCancelled``. Further, when a debug condition is tested in
``._runtime._invoke``, the context's ``._enter_debugger_on_cancel`` is
set to `False` if the actor was put on the block list then all
post-mortem / crash handling will be bypassed for that task.
In theory this approach to block list management may cause problems
where some nested child actor acquires and releases the lock multiple
times and it gets stuck on the block list after the first use? If this
turns out to be an issue we can try changing the strat so blocks are
only added when the root has zero IPC peers left?
Further, this adds a root-locking-task side cancel scope,
``Lock._root_local_task_cs_in_debug``, which can be ``.cancel()``-ed by the root
runtime when a stale lock is detected during the IPC channel testing.
However, right now we're NOT using this since it seems to cause test
failures likely due to causing pre-mature cancellation and maybe needs
a bit more experimenting?

View File

@ -1,19 +0,0 @@
Rework our ``.trionics.BroadcastReceiver`` internals to avoid method
recursion and approach a design and interface closer to ``trio``'s
The details of the internal changes include:
- implementing a ``BroadcastReceiver.receive_nowait()`` and using it
within the async ``.receive()`` thus avoiding recursion from
- failing over to an internal ``._receive_from_underlying()`` when the
``_nowait()`` call raises ``trio.WouldBlock``
- adding ``BroadcastState.statistics()`` for debugging and testing both
internals and by users.
- add an internal ``BroadcastReceiver._raise_on_lag: bool`` which can be
set to avoid ``Lagged`` raising for possible use cases where a user
wants to choose between a [cheap or nasty
the the particular stream (we use this in ``piker``'s dark clearing
engine to avoid fast feeds breaking during HFT periods).

View File

@ -1,11 +0,0 @@
Always ``list``-cast the ``mngrs`` input to
``.trionics.gather_contexts()`` and ensure its size otherwise raise
a ``ValueError``.
Turns out that trying to pass an inline-style generator comprehension
doesn't seem to work inside the ``async with`` expression? Further, in
such a case we can get a hang waiting on the all-entered event
completion when the internal mngrs iteration is a noop. Instead we
always greedily check a size and error on empty input; the lazy
iteration of a generator input is not beneficial anyway since we're
entering all manager instances in concurrent tasks.

View File

@ -1,15 +0,0 @@
Fixes to ensure IPC (channel) breakage doesn't result in hung actor
trees; the zombie reaping and general supervision machinery will always
clean up and terminate.
This includes not only the (mostly minor) fixes to solve these cases but
also a new extensive test suite in `` with an
accompanying highly configurable example module-script in
`examples/advanced_faults/`. Tests ensure we
never get hang or zombies despite operating in debug mode and attempt to
simulate all possible IPC transport failure cases for a local-host actor
Further we simplify `Context.open_stream.__aexit__()` to just call
`MsgStream.aclose()` directly more or less avoiding a pure duplicate
code path.

View File

@ -1,10 +0,0 @@
Always redraw the `pdbpp` prompt on `SIGINT` during REPL use.
There was recent changes todo with Python 3.10 that required us to pin
to a specific commit in `pdbpp` which have recently been fixed minus
this last issue with `SIGINT` shielding: not clobbering or not
showing the `(Pdb++)` prompt on ctlr-c by the user. This repairs all
that by firstly removing the standard KBI intercepting of the std lib's
`pdb.Pdb._cmdloop()` as well as ensuring that only the actor with REPL
control ever reports `SIGINT` handler log msgs and prompt redraws. With
this we move back to using pypi `pdbpp` release.

View File

@ -1,7 +0,0 @@
Drop `trio.Process.aclose()` usage, copy into our spawning code.
The details are laid out in
`trio` changed is process running quite some time ago, this just copies
out the small bit we needed (from the old `.aclose()`) for hard kills
where a soft runtime cancel request fails and our "zombie killer"
implementation kicks in.

View File

@ -1,15 +0,0 @@
Switch to using the fork & fix of `pdb++`, `pdbp`:
Allows us to sidestep a variety of issues that aren't being maintained
in the upstream project thanks to the hard work of @mdmintz!
We also include some default settings adjustments as per recent
development on the fork:
- sticky mode is still turned on by default but now activates when
a using the `ll` repl command.
- turn off line truncation by default to avoid inter-line gaps when
resizing the terimnal during use.
- when using the backtrace cmd either by `w` or `bt`, the config
automatically switches to non-sticky mode.

View File

@ -2,7 +2,7 @@
package = "tractor"
filename = "NEWS.rst"
directory = "nooz/"
version = "0.1.0a6"
version = "0.1.0a5"
title_format = "tractor {version} ({project_date})"
template = "nooz/_template.rst"
all_bullets = true

View File

@ -1,7 +1,7 @@

View File

@ -26,12 +26,12 @@ with open('docs/README.rst', encoding='utf-8') as f:
version='0.1.0a6dev0', # alpha zone
description='structured concurrrent `trio`-"actors"',
description='structured concurrrent "actors"',
author='Tyler Goodlet',
maintainer='Tyler Goodlet',
platforms=['linux', 'windows'],
@ -44,23 +44,21 @@ setup(
# trio related
# proper range spec:
'trio >= 0.22',
'trio >= 0.20, < 0.22',
# tooling
# tooling
# IPC serialization
# serialization
# debug mode REPL
# pip ref docs on these specs:
# and pep:
@ -71,9 +69,14 @@ setup(
'pyreadline3 ; platform_system == "Windows"',
# 3.10 has an outstanding unreleased issue and `pdbpp` itself
# pins to patched forks of its own dependencies as well..and
# we need a specific patch on master atm.
'pdbpp @ git+ ; python_version > "3.9"', # noqa: E501

View File

@ -7,7 +7,6 @@ import os
import random
import signal
import platform
import pathlib
import time
import inspect
from functools import partial, wraps
@ -114,21 +113,14 @@ no_windows = pytest.mark.skipif(
def repodir() -> pathlib.Path:
Return the abspath to the repo directory.
# 2 parents up to step up through tests/<repo_dir>
return pathlib.Path(__file__).parent.parent.absolute()
def examples_dir() -> pathlib.Path:
Return the abspath to the examples directory as `pathlib.Path`.
return repodir() / 'examples'
def repodir():
"""Return the abspath to the repo directory.
dirname = os.path.dirname
dirpath = os.path.abspath(
return dirpath
def pytest_addoption(parser):
@ -159,7 +151,7 @@ def loglevel(request):
def spawn_backend(request) -> str:
def spawn_backend(request):
return request.config.option.spawn_backend
@ -213,22 +205,16 @@ def sig_prog(proc, sig):
def daemon(
loglevel: str,
arb_addr: tuple[str, int],
Run a daemon actor as a "remote arbiter".
def daemon(loglevel, testdir, arb_addr):
"""Run a daemon actor as a "remote arbiter".
if loglevel in ('trace', 'debug'):
# too much logging will lock up the subproc (smh)
loglevel = 'info'
cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon([], registry_addr={}, loglevel={})"
"import tractor; tractor.run_daemon([], arbiter_addr={}, loglevel={})"
"'{}'".format(loglevel) if loglevel else None)

View File

@ -1,193 +0,0 @@
Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la
from functools import partial
import pytest
from _pytest.pathlib import import_path
import trio
import tractor
from conftest import (
[False, True],
ids=['no_debug_mode', 'debug_mode'],
# no breaks
'break_parent_ipc_after': False,
'break_child_ipc_after': False,
# only parent breaks
'break_parent_ipc_after': 500,
'break_child_ipc_after': False,
# only child breaks
'break_parent_ipc_after': False,
'break_child_ipc_after': 500,
# both: break parent first
'break_parent_ipc_after': 500,
'break_child_ipc_after': 800,
# both: break child first
'break_parent_ipc_after': 800,
'break_child_ipc_after': 500,
def test_ipc_channel_break_during_stream(
debug_mode: bool,
spawn_backend: str,
ipc_break: dict | None,
Ensure we can have an IPC channel break its connection during
streaming and it's still possible for the (simulated) user to kill
the actor tree using SIGINT.
We also verify the type of connection error expected in the parent
depending on which side if the IPC breaks first.
if spawn_backend != 'trio':
if debug_mode:
pytest.skip('`debug_mode` only supported on `trio` spawner')
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
expect_final_exc = trio.ClosedResourceError
mod = import_path(
examples_dir() / 'advanced_faults' / '',
expect_final_exc = KeyboardInterrupt
# when ONLY the child breaks we expect the parent to get a closed
# resource error on the next `MsgStream.receive()` and then fail out
# and cancel the child from there.
if (
# only child breaks
and ipc_break['break_parent_ipc_after'] is False
# both break but, parent breaks first
or (
ipc_break['break_child_ipc_after'] is not False
and (
> ipc_break['break_child_ipc_after']
expect_final_exc = trio.ClosedResourceError
# when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we expect the channel to be
# sent a stop msg from the child at some point which will signal the
# parent that the stream has been terminated.
# NOTE: when the parent breaks "after" the child you get this same
# case as well, the child breaks the IPC channel with a stop msg
# before any closure takes place.
elif (
# only parent breaks
and ipc_break['break_child_ipc_after'] is False
# both break but, child breaks first
or (
ipc_break['break_parent_ipc_after'] is not False
and (
> ipc_break['break_parent_ipc_after']
expect_final_exc = trio.EndOfChannel
with pytest.raises(expect_final_exc):
async def break_ipc_after_started(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream() as stream:
await stream.aclose()
await trio.sleep(0.2)
await ctx.chan.send(None)
print('child broke IPC and terminating')
def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
Verify that is a subactor's IPC goes down just after bringing up a stream
the parent can trigger a SIGINT and the child will be reaped out-of-IPC by
the localhost process supervision machinery: aka "zombie lord".
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
with trio.move_on_after(1):
async with (
) as (ctx, sent),
async with ctx.open_stream():
await trio.sleep(0.5)
print('parent waiting on context')
print('parent exited context')
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt):

View File

@ -14,7 +14,7 @@ def is_win():
return platform.system() == 'Windows'
_registry: dict[str, set[tractor.MsgStream]] = {
_registry: dict[str, set[tractor.ReceiveMsgStream]] = {
'even': set(),
'odd': set(),

View File

@ -8,10 +8,6 @@ import platform
import time
from itertools import repeat
from exceptiongroup import (
import pytest
import trio
import tractor
@ -60,49 +56,29 @@ def test_remote_error(arb_addr, args_err):
) as nursery:
# on a remote type error caused by bad input args
# this should raise directly which means we **don't** get
# an exception group outside the nursery since the error
# here and the far end task error are one in the same?
portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
# get result(s) from main task
# this means the root actor will also raise a local
# parent task error and thus an eg will propagate out
# of this actor nursery.
await portal.result()
except tractor.RemoteActorError as err:
assert err.type == errtype
print("Look Maa that actor failed hard, hehh")
# ensure boxed errors
if args:
with pytest.raises(tractor.RemoteActorError) as excinfo:
# ensure boxed error is correct
assert excinfo.value.type == errtype
# the root task will also error on the `.result()` call
# so we expect an error from there AND the child.
with pytest.raises(BaseExceptionGroup) as excinfo:
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == errtype
def test_multierror(arb_addr):
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
"""Verify we raise a ``trio.MultiError`` out of a nursery where
more then one actor errors.
async def main():
async with tractor.open_nursery(
@ -119,10 +95,10 @@ def test_multierror(arb_addr):
print("Look Maa that first actor failed hard, hehh")
# here we should get a ``BaseExceptionGroup`` containing exceptions
# here we should get a `trio.MultiError` containing exceptions
# from both subactors
with pytest.raises(BaseExceptionGroup):
with pytest.raises(trio.MultiError):
@ -131,7 +107,7 @@ def test_multierror(arb_addr):
'num_subactors', range(25, 26),
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
"""Verify we raise a ``trio.MultiError`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
@ -147,11 +123,10 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
# with pytest.raises(trio.MultiError) as exc_info:
with pytest.raises(BaseExceptionGroup) as exc_info:
with pytest.raises(trio.MultiError) as exc_info:
assert exc_info.type == ExceptionGroup
assert exc_info.type == tractor.MultiError
err = exc_info.value
exceptions = err.exceptions
@ -239,8 +214,8 @@ async def test_cancel_infinite_streamer(start_method):
# daemon actors sit idle while single task actors error out
(1, tractor.RemoteActorError, AssertionError, (assert_err, {}), None),
(2, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
(3, BaseExceptionGroup, AssertionError, (assert_err, {}), None),
(2, tractor.MultiError, AssertionError, (assert_err, {}), None),
(3, tractor.MultiError, AssertionError, (assert_err, {}), None),
# 1 daemon actor errors out while single task actors sleep forever
(3, tractor.RemoteActorError, AssertionError, (sleep_forever, {}),
@ -251,7 +226,7 @@ async def test_cancel_infinite_streamer(start_method):
(do_nuthin, {}), (assert_err, {'delay': 1}, True)),
# daemon complete quickly delay while single task
# actors error after brief delay
(3, BaseExceptionGroup, AssertionError,
(3, tractor.MultiError, AssertionError,
(assert_err, {'delay': 1}), (do_nuthin, {}, False)),
@ -318,7 +293,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as err:
if isinstance(err, BaseExceptionGroup):
if isinstance(err, tractor.MultiError):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError):
@ -362,7 +337,7 @@ async def spawn_and_error(breadth, depth) -> None:
async def test_nested_multierrors(loglevel, start_method):
Test that failed actor sets are wrapped in `BaseExceptionGroup`s. This
Test that failed actor sets are wrapped in `trio.MultiError`s. This
test goes only 2 nurseries deep but we should eventually have tests
for arbitrary n-depth actor trees.
@ -390,7 +365,7 @@ async def test_nested_multierrors(loglevel, start_method):
except BaseExceptionGroup as err:
except trio.MultiError as err:
assert len(err.exceptions) == subactor_breadth
for subexc in err.exceptions:
@ -408,10 +383,10 @@ async def test_nested_multierrors(loglevel, start_method):
assert subexc.type in (
elif isinstance(subexc, BaseExceptionGroup):
elif isinstance(subexc, trio.MultiError):
for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,):
@ -419,7 +394,7 @@ async def test_nested_multierrors(loglevel, start_method):
assert type(subsub) in (
assert isinstance(subexc, tractor.RemoteActorError)
@ -431,13 +406,13 @@ async def test_nested_multierrors(loglevel, start_method):
if is_win():
if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in (
assert isinstance(subexc, BaseExceptionGroup)
assert isinstance(subexc, trio.MultiError)
assert subexc.type is ExceptionGroup
assert subexc.type is trio.MultiError
assert subexc.type in (

View File

@ -1,6 +1,5 @@
import itertools
import pytest
import trio
import tractor
from tractor import open_actor_cluster
@ -12,72 +11,26 @@ from conftest import tractor_test
MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None:
async def main():
with trio.fail_after(1):
async with (
# NOTE: ensure we can passthrough runtime opts
# debug_mode=True,
) as portals,
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
p.open_context(worker) for p in portals.values()
assert 0
with pytest.raises(ValueError):
async def worker(
ctx: tractor.Context,
) -> None:
async def worker(ctx: tractor.Context) -> None:
await ctx.started()
async with ctx.open_stream(
) as stream:
# TODO: this with the below assert causes a hang bug?
# with trio.move_on_after(1):
async with ctx.open_stream(backpressure=True) as stream:
async for msg in stream:
# do something with msg
assert msg == MESSAGE
# TODO: does this ever cause a hang
# assert 0
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
with trio.move_on_after(1):
for stream in itertools.cycle(streams):

View File

@ -10,11 +10,9 @@ TODO:
- wonder if any of it'll work on OS X?
import itertools
from os import path
from typing import Optional
import platform
import pathlib
import sys
import time
@ -25,10 +23,7 @@ from pexpect.exceptions import (
from conftest import (
from conftest import repodir, _ci_env
# TODO: The next great debugger audit could be done by you!
# - recurrent entry to breakpoint() from single actor *after* and an
@ -47,13 +42,19 @@ if platform.system() == 'Windows':
def mk_cmd(ex_name: str) -> str:
Generate a command suitable to pass to ``pexpect.spawn()``.
def examples_dir():
"""Return the abspath to the examples directory.
return path.join(repodir(), 'examples', 'debugging/')
script_path: pathlib.Path = examples_dir() / 'debugging' / f'{ex_name}.py'
return ' '.join(['python', str(script_path)])
def mk_cmd(ex_name: str) -> str:
"""Generate a command suitable to pass to ``pexpect.spawn()``.
return ' '.join(
path.join(examples_dir(), f'{ex_name}.py')]
# TODO: was trying to this xfail style but some weird bug i see in CI
@ -95,7 +96,7 @@ def spawn(
return _spawn
PROMPT = r"\(Pdb\+\)"
PROMPT = r"\(Pdb\+\+\)"
def expect(
@ -151,14 +152,27 @@ def ctlc(
use_ctlc = request.param
if (
sys.version_info <= (3, 10)
and use_ctlc
# on 3.9 it seems the REPL UX
# is highly unreliable and frankly annoying
# to test for. It does work from manual testing
# but i just don't think it's wroth it to try
# and get this working especially since we want to
# be 3.10+ mega-asap.
pytest.skip('Py3.9 and `pdbpp` son no bueno..')
if ci_env:
node = request.node
markers = node.own_markers
for mark in markers:
if == 'has_nested_actors':
f'Test {node} has nested actors and fails with Ctrl-C.\n'
f'The test can sometimes run fine locally but until'
' we solve' 'this issue this CI test will be xfail:\n'
f'Test for {node} uses nested actors and fails in CI\n'
f'The test seems to run fine locally but until we solve'
'this issue this CI test will be xfail:\n'
@ -181,15 +195,13 @@ def ctlc(
ids=lambda item: f'{item[0]} -> {item[1]}',
def test_root_actor_error(spawn, user_in_out):
Demonstrate crash handler entering pdb from basic error in root actor.
"""Demonstrate crash handler entering pdbpp from basic error in root actor.
user_input, expect_err_str = user_in_out
child = spawn('root_actor_error')
# scan for the prompt
# scan for the pdbpp prompt
expect(child, PROMPT)
before = str(child.before.decode())
@ -220,8 +232,8 @@ def test_root_actor_bp(spawn, user_in_out):
user_input, expect_err_str = user_in_out
child = spawn('root_actor_breakpoint')
# scan for the prompt
# scan for the pdbpp prompt
assert 'Error' not in str(child.before)
@ -262,7 +274,7 @@ def do_ctlc(
if expect_prompt:
before = str(child.before.decode())
if patt:
@ -281,7 +293,7 @@ def test_root_actor_bp_forever(
# entries
for _ in range(10):
if ctlc:
@ -291,7 +303,7 @@ def test_root_actor_bp_forever(
# do one continue which should trigger a
# new task to lock the tty
# seems that if we hit ctrl-c too fast the
# sigint guard machinery might not kick in..
@ -302,10 +314,10 @@ def test_root_actor_bp_forever(
# XXX: this previously caused a bug!
# quit out of the loop
@ -328,8 +340,8 @@ def test_subactor_error(
child = spawn('subactor_error')
# scan for the prompt
# scan for the pdbpp prompt
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
@ -349,7 +361,7 @@ def test_subactor_error(
# creating actor
before = str(child.before.decode())
# root actor gets debugger engaged
@ -376,8 +388,8 @@ def test_subactor_breakpoint(
child = spawn('subactor_breakpoint')
# scan for the prompt
# scan for the pdbpp prompt
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -386,7 +398,7 @@ def test_subactor_breakpoint(
# entries
for _ in range(10):
if ctlc:
@ -394,7 +406,7 @@ def test_subactor_breakpoint(
# now run some "continues" to show re-entries
for _ in range(5):
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -405,7 +417,7 @@ def test_subactor_breakpoint(
# child process should exit but parent will capture pdb.BdbQuit
before = str(child.before.decode())
assert "RemoteActorError: ('breakpoint_forever'" in before
@ -437,8 +449,8 @@ def test_multi_subactors(
child = spawn(r'multi_subactors')
# scan for the prompt
# scan for the pdbpp prompt
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -450,7 +462,7 @@ def test_multi_subactors(
# entries
for _ in range(10):
if ctlc:
@ -459,7 +471,7 @@ def test_multi_subactors(
# first name_error failure
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert "NameError" in before
@ -471,21 +483,19 @@ def test_multi_subactors(
# 2nd name_error failure
# TODO: will we ever get the race where this crash will show up?
# blocklist strat now prevents this crash
# assert_before(child, [
# "Attaching to pdb in crashed actor: ('name_error_1'",
# "NameError",
# ])
assert_before(child, [
"Attaching to pdb in crashed actor: ('name_error_1'",
if ctlc:
# breakpoint loop should re-engage
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
@ -501,7 +511,7 @@ def test_multi_subactors(
before = str(child.before.decode())
if ctlc:
@ -520,11 +530,11 @@ def test_multi_subactors(
# now run some "continues" to show re-entries
for _ in range(5):
# quit the loop and expect parent to attach
before = str(child.before.decode())
assert_before(child, [
@ -568,16 +578,16 @@ def test_multi_daemon_subactors(
child = spawn('multi_daemon_subactors')
# there can be a race for which subactor will acquire
# the root's tty lock first so anticipate either crash
# message on the first entry.
bp_forever_msg = "Attaching pdb to actor: ('bp_forever'"
name_error_msg = "NameError: name 'doggypants' is not defined"
# there is a race for which subactor will acquire
# the root's tty lock first
before = str(child.before.decode())
bp_forever_msg = "Attaching pdb to actor: ('bp_forever'"
name_error_msg = "NameError"
if bp_forever_msg in before:
next_msg = name_error_msg
@ -598,8 +608,10 @@ def test_multi_daemon_subactors(
# second entry by `bp_forever`.
assert_before(child, [next_msg])
before = str(child.before.decode())
assert next_msg in before
# XXX: hooray the root clobbering the child here was fixed!
# IMO, this demonstrates the true power of SC system design.
@ -618,50 +630,31 @@ def test_multi_daemon_subactors(
if ctlc:
# expect another breakpoint actor entry
# wait for final error in root
while True:
before = str(child.before.decode())
assert_before(child, [bp_forever_msg])
except AssertionError:
assert_before(child, [name_error_msg])
# root error should be packed as remote error
assert "_exceptions.RemoteActorError: ('name_error'" in before
except AssertionError:
assert bp_forever_msg in before
if ctlc:
# should crash with the 2nd name error (simulates
# a retry) and then the root eventually (boxed) errors
# after 1 or more further bp actor entries.
assert_before(child, [name_error_msg])
# wait for final error in root
# where it crashs with boxed error
while True:
except AssertionError:
# boxed error raised in root task
"Attaching to pdb in crashed actor: ('root'",
"_exceptions.RemoteActorError: ('name_error'",
except TIMEOUT:
# Failed to exit using continue..?
@ -677,8 +670,8 @@ def test_multi_subactors_root_errors(
child = spawn('multi_subactor_root_errors')
# scan for the prompt
# scan for the pdbpp prompt
# at most one subactor should attach before the root is cancelled
before = str(child.before.decode())
@ -690,15 +683,7 @@ def test_multi_subactors_root_errors(
# continue again to catch 2nd name error from
# actor 'name_error_1' (which is 2nd depth).
# due to block list strat from #337, this will no longer
# propagate before the root errors and cancels the spawner sub-tree.
# only if the blocking condition doesn't kick in fast enough
before = str(child.before.decode())
if "Debug lock blocked for ['name_error_1'" not in before:
assert_before(child, [
"Attaching to pdb in crashed actor: ('name_error_1'",
@ -708,15 +693,10 @@ def test_multi_subactors_root_errors(
# check if the spawner crashed or was blocked from debug
# and if this intermediary attached check the boxed error
before = str(child.before.decode())
if "Attaching to pdb in crashed actor: ('spawn_error'" in before:
assert_before(child, [
# boxed error from spawner's child
"Attaching to pdb in crashed actor: ('spawn_error'",
# boxed error from previous step
"RemoteActorError: ('name_error_1'",
@ -725,30 +705,28 @@ def test_multi_subactors_root_errors(
# expect a root actor crash
assert_before(child, [
"RemoteActorError: ('name_error'",
# error from root actor and root task that created top level nursery
"Attaching to pdb in crashed actor: ('root'",
assert_before(child, [
# "Attaching to pdb in crashed actor: ('root'",
# boxed error from previous step
"RemoteActorError: ('name_error'",
'assert 0',
# warnings assert we probably don't need
# assert "Cancelling nursery in ('spawn_error'," in before
if ctlc:
# continue again
before = str(child.before.decode())
# error from root actor and root task that created top level nursery
assert "AssertionError" in before
def test_multi_nested_subactors_error_through_nurseries(
@ -772,31 +750,24 @@ def test_multi_nested_subactors_error_through_nurseries(
timed_out_early: bool = False
for send_char in itertools.cycle(['c', 'q']):
for i in range(12):
except EOF:
# race conditions on how fast the continue is sent?
print(f"Failed early on {i}?")
timed_out_early = True
assert_before(child, [
# boxed source errors
"NameError: name 'doggypants' is not defined",
"tractor._exceptions.RemoteActorError: ('name_error'",
# first level subtrees
"tractor._exceptions.RemoteActorError: ('spawner0'",
# "tractor._exceptions.RemoteActorError: ('spawner1'",
# propagation of errors up through nested subtrees
"tractor._exceptions.RemoteActorError: ('spawn_until_0'",
"tractor._exceptions.RemoteActorError: ('spawn_until_1'",
"tractor._exceptions.RemoteActorError: ('spawn_until_2'",
if not timed_out_early:
before = str(child.before.decode())
assert "NameError" in before
@ -816,7 +787,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child = spawn('root_cancelled_but_child_is_in_tty_lock')
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
@ -831,7 +802,7 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
for i in range(4):
except (
@ -888,7 +859,7 @@ def test_root_cancels_child_context_during_startup(
child = spawn('fast_error_in_root_after_spawn')
before = str(child.before.decode())
assert "AssertionError" in before
@ -905,7 +876,7 @@ def test_different_debug_mode_per_actor(
ctlc: bool,
child = spawn('per_actor_debug')
# only one actor should enter the debugger
before = str(child.before.decode())

View File

@ -12,17 +12,17 @@ import shutil
import pytest
from conftest import (
from conftest import repodir
def examples_dir():
"""Return the abspath to the examples directory.
return os.path.join(repodir(), 'examples')
def run_example_in_subproc(
loglevel: str,
arb_addr: tuple[str, int],
def run_example_in_subproc(loglevel, testdir, arb_addr):
def run(script_code):
@ -32,8 +32,8 @@ def run_example_in_subproc(
# on windows we need to create a special which will
# be executed with ``python -m <modulename>`` on windows..
examples_dir() / '',
str(testdir / ''),
os.path.join(examples_dir(), ''),
os.path.join(str(testdir), '')
# drop the ``if __name__ == '__main__'`` guard onwards from
@ -88,7 +88,6 @@ def run_example_in_subproc(
and f[0] != '_'
and 'debugging' not in p[0]
and 'integration' not in p[0]
and 'advanced_faults' not in p[0]
ids=lambda t: t[1],

View File

@ -8,7 +8,6 @@ import builtins
import itertools
import importlib
from exceptiongroup import BaseExceptionGroup
import pytest
import trio
import tractor
@ -171,11 +170,11 @@ async def trio_ctx(
# message.
with trio.fail_after(2):
async with (
trio.open_nursery() as n,
) as (first, chan),
trio.open_nursery() as n,
assert first == 'start'
@ -204,7 +203,6 @@ def test_context_spawns_aio_task_that_errors(
async def main():
with trio.fail_after(2):
async with tractor.open_nursery() as n:
p = await n.start_actor(
@ -410,12 +408,11 @@ def test_trio_error_cancels_intertask_chan(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo:
with pytest.raises(RemoteActorError) as excinfo:
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
# ensure boxed error is correct
assert excinfo.value.type == Exception
def test_trio_closes_early_and_channel_exits(arb_addr):
@ -444,12 +441,11 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
# should trigger remote actor error
await portal.result()
with pytest.raises(BaseExceptionGroup) as excinfo:
with pytest.raises(RemoteActorError) as excinfo:
# ensure boxed errors
for exc in excinfo.value.exceptions:
assert exc.type == Exception
# ensure boxed error is correct
assert excinfo.value.type == Exception

View File

@ -11,7 +11,7 @@ from conftest import tractor_test
async def test_no_runtime():
async def test_no_arbitter():
"""An arbitter must be established before any nurseries
can be created.
@ -19,7 +19,7 @@ async def test_no_runtime():
some point?)
with pytest.raises(RuntimeError):
async with tractor.find_actor('doggy'):
with tractor.open_nursery():

View File

@ -62,10 +62,7 @@ async def test_lifetime_stack_wipes_tmpfile(
except (
except tractor.RemoteActorError:
# tmp file should have been wiped by

View File

@ -251,7 +251,7 @@ def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
results, diff = time_quad_ex
assert results
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 2.666
assert diff < this_fast

View File

@ -12,10 +12,7 @@ import pytest
import trio
from trio.lowlevel import current_task
import tractor
from tractor.trionics import (
from tractor.trionics import broadcast_receiver, Lagged
@ -40,7 +37,7 @@ async def echo_sequences(
async def ensure_sequence(
stream: tractor.MsgStream,
stream: tractor.ReceiveMsgStream,
sequence: list,
delay: Optional[float] = None,
@ -214,8 +211,7 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
Ensure that if a faster task consuming from a stream is cancelled
'''Ensure that if a faster task consuming from a stream is cancelled
the slower task can continue to receive all expected values.
@ -464,51 +460,3 @@ def test_first_recver_is_cancelled():
assert value == 1
def test_no_raise_on_lag():
Run a simple 2-task broadcast where one task is slow but configured
so that it does not raise `Lagged` on overruns using
`raise_on_lasg=False` and verify that the task does not raise.
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
async def slow():
async with brx.subscribe(
) as br:
async for msg in br:
print(f'slow task got: {msg}')
await trio.sleep(0.1)
async def fast():
async with brx.subscribe() as br:
async for msg in br:
print(f'fast task got: {msg}')
async def main():
async with (
# NOTE: so we see the warning msg emitted by the bcaster
# internals when the no raise flag is set.
trio.open_nursery() as n,
for i in range(1000):
await tx.send(i)
# simulate user nailing ctl-c after realizing
# there's a lag in the slow task.
await trio.sleep(1)
raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt):

View File

@ -18,12 +18,13 @@
tractor: structured concurrent "actors".
from exceptiongroup import BaseExceptionGroup
from trio import MultiError
from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
@ -44,10 +45,7 @@ from ._exceptions import (
from ._debug import (
from ._debug import breakpoint, post_mortem
from . import msg
from ._root import (
@ -64,8 +62,9 @@ __all__ = [

View File

@ -32,12 +32,9 @@ import tractor
async def open_actor_cluster(
modules: list[str],
count: int = cpu_count(),
names: list[str] | None = None,
names: Optional[list[str]] = None,
start_method: Optional[str] = None,
hard_kill: bool = False,
# passed through verbatim to ``open_root_actor()``
) -> AsyncGenerator[
dict[str, tractor.Portal],
@ -52,9 +49,7 @@ async def open_actor_cluster(
raise ValueError(
'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery(
) as an:
async with tractor.open_nursery(start_method=start_method) as an:
async with trio.open_nursery() as n:
uid = tractor.current_actor().uid

View File

@ -20,16 +20,11 @@ Multi-core debugging for da peeps!
from __future__ import annotations
import bdb
import os
import sys
import signal
from functools import (
from functools import partial
from contextlib import asynccontextmanager as acm
from typing import (
@ -37,23 +32,27 @@ from typing import (
from types import FrameType
import pdbp
import tractor
import trio
from trio_typing import TaskStatus
from .log import get_logger
from ._discovery import get_root
from ._state import (
from ._exceptions import (
from ._state import is_root_process, debug_mode
from ._exceptions import is_multi_cancelled
from ._ipc import Channel
# wtf: only exported when installed in dev mode?
import pdbpp
except ImportError:
# pdbpp is installed in regular monkey patches stuff
import pdb
xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb
log = get_logger(__name__)
@ -67,28 +66,11 @@ class Lock:
Mostly to avoid a lot of ``global`` declarations for now XD.
repl: MultiActorPdb | None = None
# placeholder for function to set a ``trio.Event`` on debugger exit
# pdb_release_hook: Optional[Callable] = None
_trio_handler: Callable[
[int, Optional[FrameType]], Any
] | int | None = None
# actor-wide variable pointing to current task name using debugger
local_task_in_debug: str | None = None
# NOTE: set by the current task waiting on the root tty lock from
# the CALLER side of the `lock_tty_for_child()` context entry-call
# and must be cancelled if this actor is cancelled via IPC
# request-message otherwise deadlocks with the parent actor may
# ensure
_debugger_request_cs: Optional[trio.CancelScope] = None
# NOTE: set only in the root actor for the **local** root spawned task
# which has acquired the lock (i.e. this is on the callee side of
# the `lock_tty_for_child()` context entry).
_root_local_task_cs_in_debug: Optional[trio.CancelScope] = None
local_task_in_debug: Optional[str] = None
# actor tree-wide actor uid that supposedly has the tty lock
global_actor_in_debug: Optional[tuple[str, str]] = None
@ -99,22 +81,29 @@ class Lock:
# lock in root actor preventing multi-access to local tty
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
# XXX: set by the current task waiting on the root tty lock
# and must be cancelled if this actor is cancelled via message
# otherwise deadlocks with the parent actor may ensure
_debugger_request_cs: Optional[trio.CancelScope] = None
_orig_sigint_handler: Optional[Callable] = None
_blocked: set[tuple[str, str]] = set()
def shield_sigint(cls):
cls._orig_sigint_handler = signal.signal(
def unshield_sigint(cls):
# always restore ``trio``'s sigint handler. see notes below in
# the pdb factory about the nightmare that is that code swapping
# out the handler when the repl activates...
signal.signal(signal.SIGINT, cls._trio_handler)
if cls._orig_sigint_handler is not None:
# restore original sigint handler
cls._orig_sigint_handler = None
@ -141,29 +130,24 @@ class Lock:
# restore original sigint handler
cls.repl = None
class TractorConfig(pdbp.DefaultConfig):
class TractorConfig(pdbpp.DefaultConfig):
Custom ``pdbp`` goodness :surfer:
Custom ``pdbpp`` goodness.
use_pygments: bool = True
sticky_by_default: bool = False
enable_hidden_frames: bool = False
# much thanks @mdmintz for the hot tip!
# fixes line spacing issue when resizing terminal B)
truncate_long_lines: bool = False
# use_pygments = True
# sticky_by_default = True
enable_hidden_frames = False
class MultiActorPdb(pdbp.Pdb):
class MultiActorPdb(pdbpp.Pdb):
Add teardown hooks to the regular ``pdbp.Pdb``.
Add teardown hooks to the regular ``pdbpp.Pdb``.
# override the pdbp config with our coolio one
# override the pdbpp config with our coolio one
DefaultConfig = TractorConfig
# def preloop(self):
@ -184,35 +168,6 @@ class MultiActorPdb(pdbp.Pdb):
# XXX NOTE: we only override this because apparently the stdlib pdb
# bois likes to touch the SIGINT handler as much as i like to touch
# my d$%&.
def _cmdloop(self):
def shname(self) -> str | None:
Attempt to return the login shell name with a special check for
the infamous `xonsh` since it seems to have some issues much
different from std shells when it comes to flushing the prompt?
# SUPER HACKY and only really works if `xonsh` is not used
# before spawning further sub-shells..
shpath = os.getenv('SHELL', None)
if shpath:
if (
os.getenv('XONSH_LOGIN', default=False)
or 'xonsh' in shpath
return 'xonsh'
return os.path.basename(shpath)
return None
async def _acquire_debug_lock_from_root_task(
@ -241,12 +196,6 @@ async def _acquire_debug_lock_from_root_task(
f"entering lock checkpoint, remote task: {task_name}:{uid}"
we_acquired = True
# NOTE: if the surrounding cancel scope from the
# `lock_tty_for_child()` caller is cancelled, this line should
# unblock and NOT leave us in some kind of
# a "child-locked-TTY-but-child-is-uncontactable-over-IPC"
# condition.
await Lock._debug_lock.acquire()
if Lock.no_remote_has_tty is None:
@ -307,7 +256,7 @@ async def lock_tty_for_child(
) -> str:
Lock the TTY in the root process of an actor tree in a new
inter-actor-context-task such that the ``pdbp`` debugger console
inter-actor-context-task such that the ``pdbpp`` debugger console
can be mutex-allocated to the calling sub-actor for REPL control
without interference by other processes / threads.
@ -318,15 +267,6 @@ async def lock_tty_for_child(
task_name = trio.lowlevel.current_task().name
if tuple(subactor_uid) in Lock._blocked:
f'Actor {subactor_uid} is blocked from acquiring debug lock\n'
f"remote task: {task_name}:{subactor_uid}"
ctx._enter_debugger_on_cancel = False
await ctx.cancel(f'Debug lock blocked for {subactor_uid}')
return 'pdb_lock_blocked'
# TODO: when we get to true remote debugging
# this will deliver stdin data?
@ -340,9 +280,8 @@ async def lock_tty_for_child(
with (
trio.CancelScope(shield=True) as debug_lock_cs,
Lock._root_local_task_cs_in_debug = debug_lock_cs
async with _acquire_debug_lock_from_root_task(subactor_uid):
# indicate to child that we've locked stdio
@ -358,7 +297,6 @@ async def lock_tty_for_child(
return "pdb_unlock_complete"
Lock._root_local_task_cs_in_debug = None
@ -394,7 +332,7 @@ async def wait_for_parent_stdin_hijack(
) as (ctx, val):
log.debug('locked context')
log.pdb('locked context')
assert val == 'Locked'
async with ctx.open_stream() as stream:
@ -413,21 +351,21 @@ async def wait_for_parent_stdin_hijack(
# sync with callee termination
assert await ctx.result() == "pdb_unlock_complete"
log.debug('exitting child side locking task context')
log.pdb('unlocked context')
except ContextCancelled:
except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock')
log.pdb(f"Exiting debugger for actor {actor_uid}")
Lock.local_task_in_debug = None
log.debug('Exiting debugger from child')
log.pdb(f"Child {actor_uid} released parent stdio lock")
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb = MultiActorPdb()
# signal.signal = pdbp.hideframe(signal.signal)
# signal.signal = pdbpp.hideframe(signal.signal)
@ -454,8 +392,9 @@ async def _breakpoint(
__tracebackhide__ = True
actor = tractor.current_actor()
pdb, undo_sigint = mk_mpdb()
actor = tractor.current_actor()
task_name = trio.lowlevel.current_task().name
# TODO: is it possible to debug a trio.Cancelled except block?
@ -465,10 +404,7 @@ async def _breakpoint(
# with trio.CancelScope(shield=shield):
# await trio.lowlevel.checkpoint()
if (
not Lock.local_pdb_complete
or Lock.local_pdb_complete.is_set()
if not Lock.local_pdb_complete or Lock.local_pdb_complete.is_set():
Lock.local_pdb_complete = trio.Event()
# TODO: need a more robust check for the "root" actor
@ -482,10 +418,7 @@ async def _breakpoint(
# Recurrence entry case: this task already has the lock and
# is likely recurrently entering a breakpoint
if Lock.local_task_in_debug == task_name:
# noop on recurrent entry case but we want to trigger
# a checkpoint to allow other actors error-propagate and
# potetially avoid infinite re-entries in some subactor.
await trio.lowlevel.checkpoint()
# noop on recurrent entry case
# if **this** actor is already in debug mode block here
@ -504,29 +437,18 @@ async def _breakpoint(
# root nursery so that the debugger can continue to run without
# being restricted by the scope of a new task nursery.
# TODO: if we want to debug a trio.Cancelled triggered exception
# NOTE: if we want to debug a trio.Cancelled triggered exception
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# cancel on this task start? I *think* this works below?
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
with trio.CancelScope(shield=True):
await actor._service_n.start(
Lock.repl = pdb
except RuntimeError:
if actor._cancel_called:
# service nursery won't be usable and we
# don't want to lock up the root either way since
# we're in (the midst of) cancellation.
elif is_root_process():
@ -556,7 +478,6 @@ async def _breakpoint(
Lock.global_actor_in_debug = actor.uid
Lock.local_task_in_debug = task_name
Lock.repl = pdb
# block here one (at the appropriate frame *up*) where
@ -577,18 +498,22 @@ async def _breakpoint(
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbp.hideframe(signal.signal)
# # signal.signal = pdbpp.hideframe(signal.signal)
# signal.signal(
# signal.SIGINT,
# orig_handler
# )
def shield_sigint_handler(
def shield_sigint(
signum: int,
frame: 'frame', # type: ignore # noqa
# pdb_obj: Optional[MultiActorPdb] = None,
pdb_obj: Optional[MultiActorPdb] = None,
) -> None:
Specialized, debugger-aware SIGINT handler.
Specialized debugger compatible SIGINT handler.
In childred we always ignore to avoid deadlocks since cancellation
should always be managed by the parent supervising actor. The root
@ -600,7 +525,6 @@ def shield_sigint_handler(
uid_in_debug = Lock.global_actor_in_debug
actor = tractor.current_actor()
# print(f'{actor.uid} in HANDLER with ')
def do_cancel():
# If we haven't tried to cancel the runtime then do that instead
@ -634,9 +558,6 @@ def shield_sigint_handler(
return do_cancel()
# only set in the actor actually running the REPL
pdb_obj = Lock.repl
# root actor branch that reports whether or not a child
# has locked debugger.
if (
@ -649,36 +570,16 @@ def shield_sigint_handler(
# which has already terminated to unlock.
and any_connected
# we are root and some actor is in debug mode
# if uid_in_debug is not None:
if pdb_obj:
name = uid_in_debug[0]
if name != 'root':
f"Ignoring SIGINT, child in debug mode: `{uid_in_debug}`"
f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`"
"Ignoring SIGINT while in debug mode"
elif (
if pdb_obj:
"Ignoring SIGINT since debug mode is enabled"
if (
and not Lock._root_local_task_cs_in_debug.cancel_called
# revert back to ``trio`` handler asap!
# child actor that has locked the debugger
elif not is_root_process():
@ -694,10 +595,7 @@ def shield_sigint_handler(
return do_cancel()
task = Lock.local_task_in_debug
if (
and pdb_obj
if task:
f"Ignoring SIGINT while task in debug mode: `{task}`"
@ -707,26 +605,20 @@ def shield_sigint_handler(
# elif debug_mode():
else: # XXX: shouldn't ever get here?
raise KeyboardInterrupt
"Ignoring SIGINT since debug mode is enabled"
# NOTE: currently (at least on ``fancycompleter`` 0.9.2)
# it looks to be that the last command that was run (eg. ll)
# it lookks to be that the last command that was run (eg. ll)
# will be repeated by default.
# maybe redraw/print last REPL output to console since
# we want to alert the user that more input is expect since
# nothing has been done dur to ignoring sigint.
# TODO: maybe redraw/print last REPL output to console
if (
pdb_obj # only when this actor has a REPL engaged
and sys.version_info <= (3, 10)
# XXX: yah, mega hack, but how else do we catch this madness XD
if pdb_obj.shname == 'xonsh':
# TODO: make this work like sticky mode where if there is output
# detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above?
@ -737,13 +629,21 @@ def shield_sigint_handler(
# XXX LEGACY: lol, see ``pdbpp`` issue:
# XXX: lol, see ``pdbpp`` issue:
# TODO: pretty sure this is what we should expect to have to run
# in total but for now we're just going to wait until `pdbpp`
# figures out it's own stuff on 3.10 (and maybe we'll help).
# pdb_obj.do_longlist(None)
# XXX: we were doing this but it shouldn't be required..
print(pdb_obj.prompt, end='', flush=True)
def _set_trace(
actor: tractor.Actor | None = None,
pdb: MultiActorPdb | None = None,
actor: Optional[tractor.Actor] = None,
pdb: Optional[MultiActorPdb] = None,
__tracebackhide__ = True
actor = actor or tractor.current_actor()
@ -753,11 +653,7 @@ def _set_trace(
if frame:
frame = frame.f_back # type: ignore
if (
and pdb
and actor is not None
if frame and pdb and actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
# no f!#$&* idea, but when we're in async land
# we need 2x frames up?
@ -766,8 +662,7 @@ def _set_trace(
pdb, undo_sigint = mk_mpdb()
# we entered the global ``breakpoint()`` built-in from sync
# code?
# we entered the global ``breakpoint()`` built-in from sync code?
Lock.local_task_in_debug = 'sync'
@ -797,7 +692,7 @@ def _post_mortem(
# TODO: help with a 3.10+ major release if/when it arrives.
pdbp.xpm(Pdb=lambda: pdb)
pdbpp.xpm(Pdb=lambda: pdb)
post_mortem = partial(
@ -826,9 +721,7 @@ async def _maybe_enter_pm(err):
and not is_multi_cancelled(err)
log.debug("Actor crashed, entering debug mode")
await post_mortem()
return True
@ -868,10 +761,7 @@ async def maybe_wait_for_debugger(
) -> None:
if (
not debug_mode()
and not child_in_debug
if not debug_mode() and not child_in_debug:
if (

View File

@ -108,7 +108,7 @@ async def query_actor(
async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
arbiter_sockaddr: tuple[str, int] = None
) -> AsyncGenerator[Optional[Portal], None]:
@ -134,7 +134,7 @@ async def find_actor(
async def wait_for_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
arbiter_sockaddr: tuple[str, int] = None
) -> AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.

View File

@ -51,7 +51,7 @@ def _mp_main(
accept_addr: tuple[str, int],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: tuple[str, int] | None = None,
parent_addr: tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
@ -98,7 +98,7 @@ def _trio_main(
actor: Actor, # type: ignore
parent_addr: tuple[str, int] | None = None,
parent_addr: tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:

View File

@ -27,7 +27,6 @@ import importlib
import builtins
import traceback
import exceptiongroup as eg
import trio
@ -53,6 +52,9 @@ class RemoteActorError(Exception):
self.type = suberror_type
self.msgdata = msgdata
# TODO: a trio.MultiError.catch like context manager
# for catching underlying remote errors of a particular type
class InternalActorError(RemoteActorError):
"""Remote internal ``tractor`` error indicating
@ -121,12 +123,10 @@ def unpack_error(
) -> Exception:
Unpack an 'error' message from the wire
"""Unpack an 'error' message from the wire
into a local ``RemoteActorError``.
__tracebackhide__ = True
error = msg['error']
tb_str = error.get('tb_str', '')
@ -139,12 +139,7 @@ def unpack_error(
suberror_type = trio.Cancelled
else: # try to lookup a suitable local error type
for ns in [
for ns in [builtins, _this_mod, trio]:
suberror_type = getattr(ns, type_name)
@ -163,15 +158,12 @@ def unpack_error(
def is_multi_cancelled(exc: BaseException) -> bool:
Predicate to determine if a possible ``eg.BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
"""Predicate to determine if a ``trio.MultiError`` contains only
``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks.
if isinstance(exc, eg.BaseExceptionGroup):
return exc.subgroup(
lambda exc: isinstance(exc, trio.Cancelled)
) is not None
return False
return not trio.MultiError.filter(
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None,

View File

@ -341,7 +341,7 @@ class Channel:
async def connect(
destaddr: tuple[Any, ...] | None = None,
destaddr: tuple[Any, ...] = None,
) -> MsgTransport:

View File

@ -45,27 +45,24 @@ from ._exceptions import (
from ._streaming import (
from ._streaming import Context, ReceiveMsgStream
log = get_logger(__name__)
def _unwrap_msg(
msg: dict[str, Any],
channel: Channel
) -> Any:
__tracebackhide__ = True
return msg['return']
except KeyError:
# internal error should never get here
assert msg.get('cid'), "Received internal error at portal?"
raise unpack_error(msg, channel) from None
raise unpack_error(msg, channel)
class MessagingError(Exception):
@ -104,7 +101,7 @@ class Portal:
# it is expected that ``result()`` will be awaited at some
# point.
self._expect_result: Optional[Context] = None
self._streams: set[MsgStream] = set()
self._streams: set[ReceiveMsgStream] = set() = current_actor()
async def _submit_for_result(
@ -139,7 +136,6 @@ class Portal:
Return the result(s) from the remote actor's "main" task.
# __tracebackhide__ = True
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc =
@ -189,7 +185,7 @@ class Portal:
async def cancel_actor(
timeout: float | None = None,
timeout: float = None,
) -> bool:
@ -319,7 +315,7 @@ class Portal:
async_gen_func: Callable, # typing: ignore
) -> AsyncGenerator[MsgStream, None]:
) -> AsyncGenerator[ReceiveMsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func):
if not (
@ -344,7 +340,7 @@ class Portal:
# deliver receive only stream
async with MsgStream(
async with ReceiveMsgStream(
ctx, ctx._recv_chan,
) as rchan:
@ -464,6 +460,7 @@ class Portal:
# sure it's worth being pedantic:
# Exception,
# trio.Cancelled,
# trio.MultiError,
# KeyboardInterrupt,
) as err:
@ -500,10 +497,6 @@ class Portal:
f'actor: {uid}'
result = await ctx.result()
f'Context {fn_name} returned '
f'value from callee `{result}`'
# though it should be impossible for any tasks
# operating *in* this scope to have survived
@ -525,6 +518,12 @@ class Portal:
f'Context {fn_name} returned '
f'value from callee `{result}`'
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
@ -537,10 +536,7 @@ class Portal:
await maybe_wait_for_debugger()
# remove the context from runtime tracking
(, ctx.cid),
), ctx.cid))

View File

@ -22,21 +22,16 @@ from contextlib import asynccontextmanager
from functools import partial
import importlib
import logging
import signal
import sys
import os
from typing import (
import typing
import warnings
from exceptiongroup import BaseExceptionGroup
import trio
from ._runtime import (
from ._runtime import Actor, Arbiter, async_main
from . import _debug
from . import _spawn
from . import _state
@ -56,45 +51,37 @@ logger = log.get_logger('tractor')
async def open_root_actor(
# defaults are above
arbiter_addr: tuple[str, int] | None = None,
arbiter_addr: Optional[tuple[str, int]] = (
# defaults are above
registry_addr: tuple[str, int] | None = None,
name: str | None = 'root',
name: Optional[str] = 'root',
# either the `multiprocessing` start method:
# OR `trio` (the new default).
start_method: _spawn.SpawnMethodKey | None = None,
start_method: Optional[_spawn.SpawnMethodKey] = None,
# enables the multi-process debugger support
debug_mode: bool = False,
# internal logging
loglevel: str | None = None,
loglevel: Optional[str] = None,
enable_modules: list | None = None,
rpc_module_paths: list | None = None,
enable_modules: Optional[list] = None,
rpc_module_paths: Optional[list] = None,
) -> typing.Any:
Runtime init entry point for ``tractor``.
"""Async entry point for ``tractor``.
# Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in:
# ``trio``, see:
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug._set_trace'
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
@ -113,22 +100,10 @@ async def open_root_actor(
if start_method is not None:
if arbiter_addr is not None:
'`arbiter_addr` is now deprecated and has been renamed to'
'`registry_addr`.\nUse that instead..',
registry_addr = (host, port) = (
or arbiter_addr
or (
arbiter_addr = (host, port) = arbiter_addr or (
loglevel = (loglevel or log._default_loglevel).upper()
@ -173,7 +148,7 @@ async def open_root_actor(
except OSError:
# TODO: make this a "discovery" log level?
logger.warning(f"No actor registry found @ {host}:{port}")
logger.warning(f"No actor could be found @ {host}:{port}")
# create a local actor and start up its main routine/task
if arbiter_found:
@ -183,7 +158,7 @@ async def open_root_actor(
actor = Actor(
name or 'anonymous',
@ -199,7 +174,7 @@ async def open_root_actor(
actor = Arbiter(
name or 'arbiter',
@ -230,10 +205,7 @@ async def open_root_actor(
yield actor
except (
) as err:
except (Exception, trio.MultiError) as err:
entered = await _debug._maybe_enter_pm(err)
@ -257,15 +229,6 @@ async def open_root_actor(
await actor.cancel()
_state._current_actor = None
# restore breakpoint hook state
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
# clear env back to having no entry
logger.runtime("Root actor terminated")
@ -273,13 +236,13 @@ def run_daemon(
enable_modules: list[str],
# runtime kwargs
name: str | None = 'root',
registry_addr: tuple[str, int] = (
name: Optional[str] = 'root',
arbiter_addr: tuple[str, int] = (
start_method: str | None = None,
start_method: Optional[str] = None,
debug_mode: bool = False,
@ -301,7 +264,7 @@ def run_daemon(
async def _main():
async with open_root_actor(

View File

@ -25,23 +25,21 @@ from itertools import chain
import importlib
import importlib.util
import inspect
import signal
import sys
import uuid
from typing import (
Any, Optional,
import uuid
from types import ModuleType
import sys
import os
from contextlib import ExitStack
import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio # type: ignore
from trio_typing import TaskStatus
from async_generator import aclosing
from ._ipc import Channel
from ._streaming import Context
@ -196,7 +194,7 @@ async def _invoke(
res = await coro
await chan.send({'return': res, 'cid': cid})
except BaseExceptionGroup:
except trio.MultiError:
# if a context error was set then likely
# thei multierror was raised due to that
if ctx._error is not None:
@ -228,17 +226,14 @@ async def _invoke(
fname = func.__name__
if ctx._cancel_called:
msg = f'`{fname}()` cancelled itself'
msg = f'{fname} cancelled itself'
elif cs.cancel_called:
msg = (
f'`{fname}()` was remotely cancelled by its caller '
f'{fname} was remotely cancelled by its caller '
if ctx._cancel_msg:
msg += f' with msg:\n{ctx._cancel_msg}'
# task-contex was cancelled so relay to the cancel to caller
raise ContextCancelled(
@ -268,7 +263,7 @@ async def _invoke(
except (
) as err:
if not is_multi_cancelled(err):
@ -280,16 +275,8 @@ async def _invoke(
# if not is_multi_cancelled(err) and (
entered_debug: bool = False
if (
not isinstance(err, ContextCancelled)
or (
isinstance(err, ContextCancelled)
and ctx._cancel_called
# if the root blocks the debugger lock request from a child
# we will get a remote-cancelled condition.
and ctx._enter_debugger_on_cancel
if not isinstance(err, ContextCancelled) or (
isinstance(err, ContextCancelled) and ctx._cancel_called
# XXX: is there any case where we'll want to debug IPC
# disconnects as a default?
@ -299,6 +286,7 @@ async def _invoke(
# recovery logic - the only case is some kind of strange bug
# in our transport layer itself? Going to keep this
# open ended for now.
entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug:
@ -319,7 +307,7 @@ async def _invoke(
# if we can't propagate the error that's a big boo boo
f"Failed to ship error to caller @ {chan.uid} !?"
@ -351,7 +339,7 @@ def _get_mod_abspath(module):
async def try_ship_error_to_parent(
channel: Channel,
err: Union[Exception, BaseExceptionGroup],
err: Union[Exception, trio.MultiError],
) -> None:
with trio.CancelScope(shield=True):
@ -423,8 +411,8 @@ class Actor:
name: str,
enable_modules: list[str] = [],
uid: str | None = None,
loglevel: str | None = None,
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[tuple[str, int]] = None,
spawn_method: Optional[str] = None
) -> None:
@ -455,7 +443,7 @@ class Actor:
self._mods: dict[str, ModuleType] = {}
self.loglevel = loglevel
self._arb_addr: tuple[str, int] | None = (
self._arb_addr = (
) if arbiter_addr else None
@ -488,10 +476,7 @@ class Actor:
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[
tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: dict[
tuple[str, str],
ActorNursery | None,
] = {} # type: ignore # noqa
self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa
async def wait_for_peer(
self, uid: tuple[str, str]
@ -713,38 +698,18 @@ class Actor:
log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(uid, None)
# for (uid, cid) in self._contexts.copy():
# if chan.uid == uid:
# self._contexts.pop((uid, cid))
log.runtime(f"Peers is {self._peers}")
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
log.runtime("Signalling no more peer channels")
# NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the
# lock will be released if acquired due to having no
# more active IPC channels.
if _state.is_root_process():
pdb_lock = _debug.Lock
log.runtime(f"{uid} blocked from pdb locking")
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
if (
and not db_cs.cancel_called
# TODO: figure out why this breaks tests..
# XXX: is this necessary (GC should do it)?
if chan.connected():
# if the channel is still connected it may mean the far
@ -829,12 +794,7 @@ class Actor:
if ctx._backpressure:
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
raise StreamOverrun(text) from None
@ -988,7 +948,7 @@ class Actor:
handler_nursery: trio.Nursery,
# (host, port) to bind for channel server
accept_host: tuple[str, int] | None = None,
accept_host: tuple[str, int] = None,
accept_port: int = 0,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -1239,10 +1199,6 @@ async def async_main(
and when cancelled effectively cancels the actor.
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.Lock._trio_handler = signal.getsignal(signal.SIGINT)
registered_with_arbiter = False
@ -1379,12 +1335,10 @@ async def async_main(
# Unregister actor from the arbiter
if (
and not actor.is_arbiter
if registered_with_arbiter and (
actor._arb_addr is not None
failed = False
assert isinstance(actor._arb_addr, tuple)
with trio.move_on_after(0.5) as cs:
cs.shield = True
@ -1566,10 +1520,7 @@ async def process_messages(
partial(_invoke, actor, cid, chan, func, kwargs),
except (
except (RuntimeError, trio.MultiError):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task = True
@ -1609,18 +1560,12 @@ async def process_messages(
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up.
f'channel from {chan.uid} closed abruptly:\n'
f'-> {chan.raddr}\n'
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
except (
) as err:
except (Exception, trio.MultiError) as err:
if nursery_cancelled_before_task:
sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called
@ -1661,28 +1606,17 @@ class Arbiter(Actor):
is_arbiter = True
def __init__(self, *args, **kwargs) -> None:
def __init__(self, *args, **kwargs):
self._registry: dict[
tuple[str, str],
tuple[str, int],
] = {}
self._waiters: dict[
# either an event to sync to receiving an actor uid (which
# is filled in once the actor has sucessfully registered),
# or that uid after registry is complete.
list[trio.Event | tuple[str, str]]
] = {}
self._waiters = {}
super().__init__(*args, **kwargs)
async def find_actor(
name: str,
) -> tuple[str, int] | None:
async def find_actor(self, name: str) -> Optional[tuple[str, int]]:
for uid, sockaddr in self._registry.items():
if name in uid:
return sockaddr
@ -1717,8 +1651,7 @@ class Arbiter(Actor):
sockaddrs: list[tuple[str, int]] = []
sockaddr: tuple[str, int]
sockaddrs = []
for (aname, _), sockaddr in self._registry.items():
if name == aname:
@ -1728,9 +1661,7 @@ class Arbiter(Actor):
waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
return sockaddrs
@ -1741,11 +1672,11 @@ class Arbiter(Actor):
sockaddr: tuple[str, int]
) -> None:
uid = name, _ = (str(uid[0]), str(uid[1]))
uid = name, uuid = (str(uid[0]), str(uid[1]))
self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
# pop and signal all waiter events
events = self._waiters.pop(name, [])
events = self._waiters.pop(name, ())
self._waiters.setdefault(name, []).append(uid)
for event in events:
if isinstance(event, trio.Event):

View File

@ -23,14 +23,14 @@ import sys
import platform
from typing import (
from import Awaitable
from exceptiongroup import BaseExceptionGroup
import trio
from trio_typing import TaskStatus
@ -59,7 +59,7 @@ if TYPE_CHECKING:
log = get_logger('tractor')
# placeholder for an mp start context if so using that backend
_ctx: mp.context.BaseContext | None = None
_ctx: Optional[mp.context.BaseContext] = None
SpawnMethodKey = Literal[
'trio', # supported on all platforms
@ -85,7 +85,7 @@ else:
def try_set_start_method(
key: SpawnMethodKey
) -> mp.context.BaseContext | None:
) -> Optional[mp.context.BaseContext]:
Attempt to set the method for process starting, aka the "actor
spawning backend".
@ -139,7 +139,6 @@ async def exhaust_portal(
If the main task is an async generator do our best to consume
what's left of it.
__tracebackhide__ = True
log.debug(f"Waiting on final result from {actor.uid}")
@ -147,11 +146,8 @@ async def exhaust_portal(
# always be established and shutdown using a context manager api
final = await portal.result()
except (
) as err:
# we reraise in the parent task via a ``BaseExceptionGroup``
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err
except trio.Cancelled as err:
# lol, of course we need this too ;P
@ -179,7 +175,7 @@ async def cancel_on_completion(
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# an exception group and we still send out a cancel request
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result
@ -199,37 +195,16 @@ async def cancel_on_completion(
async def do_hard_kill(
proc: trio.Process,
terminate_after: int = 3,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs:
# NOTE: code below was copied verbatim from the now deprecated
# (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc
# string:
# Close any pipes we have to the process (both input and output)
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
if proc.stdout is not None:
await proc.stdout.aclose()
if proc.stderr is not None:
await proc.stderr.aclose()
await proc.wait()
if proc.returncode is None:
with trio.CancelScope(shield=True):
await proc.wait()
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
@ -280,9 +255,7 @@ async def soft_wait(
if proc.poll() is None: # type: ignore
'Actor still alive after cancel request:\n'
f'Process still alive after cancel request:\n{uid}')
@ -375,11 +348,12 @@ async def trio_proc(
cancelled_during_spawn: bool = False
proc: trio.Process | None = None
proc: Optional[trio.Process] = None
# TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process(spawn_cmd)
proc = await trio.lowlevel.open_process( # type: ignore
log.runtime(f"Started {proc}")
@ -463,8 +437,8 @@ async def trio_proc(
# XXX NOTE XXX: The "hard" reap since no actor zombies are
# allowed! Do this **after** cancellation/teardown to avoid
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
@ -478,13 +452,6 @@ async def trio_proc(
await proc.wait()
if is_root_process():
# TODO: solve the following issue where we need
# to do a similar wait like this but in an
# "intermediary" parent actor that itself isn't
# in debug but has a child that is, and we need
# to hold off on relaying SIGINT until that child
# is complete.
await maybe_wait_for_debugger(
'_debug_mode', False),

View File

@ -22,6 +22,7 @@ from typing import (
from import Mapping
import trio
@ -45,6 +46,30 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore #
return _current_actor
_conc_name_getters = {
'task': trio.lowlevel.current_task,
'actor': current_actor
class ActorContextInfo(Mapping):
"Dyanmic lookup for local actor and task names"
_context_keys = ('task', 'actor')
def __len__(self):
return len(self._context_keys)
def __iter__(self):
return iter(self._context_keys)
def __getitem__(self, key: str) -> str:
return _conc_name_getters[key]().name # type: ignore
except RuntimeError:
# no local actor/task context initialized yet
return f'no {key} context'
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.

View File

@ -50,13 +50,12 @@ log = get_logger(__name__)
# - use __slots__ on ``Context``?
class MsgStream(
class ReceiveMsgStream(
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC ``Channel``.
This is the type returned to a local task which entered either
``Portal.open_stream_from()`` or ``Context.open_stream()``.
A IPC message stream for receiving logically sequenced values over
an inter-actor ``Channel``. This is the type returned to a local
task which entered either ``Portal.open_stream_from()`` or
Termination rules:
@ -98,9 +97,6 @@ class MsgStream(
if self._eoc:
raise trio.EndOfChannel
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
msg = await self._rx_chan.receive()
return msg['yield']
@ -114,9 +110,6 @@ class MsgStream(
# - 'error'
# possibly just handle msg['stop'] here!
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or self._eoc:
log.debug(f"{self} was stopped at remote end")
@ -196,6 +189,7 @@ class MsgStream(
self._eoc = True
self._closed = True
# NOTE: this is super subtle IPC messaging stuff:
# Relay stop iteration to far end **iff** we're
@ -212,8 +206,12 @@ class MsgStream(
# In the bidirectional case, `Context.open_stream()` will create
# the `Actor._cids2qs` entry from a call to
# `Actor.get_context()` and will call us here to send the stop
# msg in ``__aexit__()`` on teardown.
# `Actor.get_context()` and will send the stop message in
# ``__aexit__()`` on teardown so it **does not** need to be
# called here.
if not self._ctx._portal:
# Only for 2 way streams can we can send stop from the
# caller side.
# NOTE: if this call is cancelled we expect this end to
# handle as though the stop was never sent (though if it
@ -230,14 +228,7 @@ class MsgStream(
# the underlying channel may already have been pulled
# in which case our stop message is meaningless since
# it can't traverse the transport.
ctx = self._ctx
f'Stream was already destroyed?\n'
f'actor: {ctx.chan.uid}\n'
f'ctx id: {ctx.cid}'
self._closed = True
log.debug(f'Channel for {self} was already closed')
# Do we close the local mem chan ``self._rx_chan`` ??!?
@ -280,8 +271,7 @@ class MsgStream(
) -> AsyncIterator[BroadcastReceiver]:
Allocate and return a ``BroadcastReceiver`` which delegates
'''Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream.
This allows multiple local tasks to receive each their own copy
@ -318,15 +308,15 @@ class MsgStream(
async with self._broadcaster.subscribe() as bstream:
assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
# NOTE: we patch on a `.send()` to the bcaster so that the
# caller can still conduct 2-way streaming using this
# ``bstream`` handle transparently as though it was the msg
# stream instance.
bstream.send = self.send # type: ignore
yield bstream
class MsgStream(ReceiveMsgStream,
Bidirectional message stream for use within an inter-actor actor
async def send(
data: Any
@ -381,8 +371,6 @@ class Context:
# status flags
_cancel_called: bool = False
_cancel_msg: Optional[str] = None
_enter_debugger_on_cancel: bool = True
_started_called: bool = False
_started_received: bool = False
_stream_opened: bool = False
@ -464,11 +452,7 @@ class Context:
if not self._scope_nursery._closed: # type: ignore
async def cancel(
msg: Optional[str] = None,
) -> None:
async def cancel(self) -> None:
Cancel this inter-actor-task context.
@ -477,8 +461,6 @@ class Context:
side = 'caller' if self._portal else 'callee'
if msg:
assert side == 'callee', 'Only callee side can provide cancel msg'
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
@ -515,10 +497,8 @@ class Context:
"Timed out on cancelling remote task "
f"{cid} for {}")
# callee side remote task
self._cancel_msg = msg
# callee side remote task
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
@ -603,23 +583,23 @@ class Context:
async with MsgStream(
) as stream:
) as rchan:
if self._portal:
self._stream_opened = True
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# ensure we aren't cancelled before delivering
# the stream
# await trio.lowlevel.checkpoint()
yield stream
yield rchan
# NOTE: Make the stream "one-shot use". On exit, signal
# XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await stream.aclose()
await self.send_stop()
if self._portal:

View File

@ -18,7 +18,6 @@
``trio`` inspired apis and helpers
from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from typing import (
@ -28,8 +27,8 @@ from typing import (
import typing
import warnings
from exceptiongroup import BaseExceptionGroup
import trio
from async_generator import asynccontextmanager
from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process
@ -83,7 +82,7 @@ class ActorNursery:
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], BaseException],
errors: dict[tuple[str, str], Exception],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
@ -111,11 +110,11 @@ class ActorNursery:
name: str,
bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
nursery: trio.Nursery | None = None,
debug_mode: Optional[bool] | None = None,
rpc_module_paths: list[str] = None,
enable_modules: list[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
debug_mode: Optional[bool] = None,
infect_asyncio: bool = False,
) -> Portal:
@ -182,9 +181,9 @@ class ActorNursery:
name: Optional[str] = None,
bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
rpc_module_paths: Optional[list[str]] = None,
enable_modules: list[str] = None,
loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn``
@ -295,17 +294,13 @@ class ActorNursery:
async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay?
__tracebackhide__ = True
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {}
errors: dict[tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
@ -338,17 +333,19 @@ async def _open_and_supervise_one_cancels_all_nursery(
# after we yield upwards
yield anursery
# When we didn't error in the caller's scope,
# signal all process-monitor-tasks to conduct
# the "hard join phase".
f"Waiting on subactors {anursery._children} "
"to complete"
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
# signal all process monitor tasks to conduct
# hard join phase.
except BaseException as inner_err:
errors[actor.uid] = inner_err
except BaseException as err:
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
@ -365,18 +362,19 @@ async def _open_and_supervise_one_cancels_all_nursery(
# worry more are coming).
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype = type(inner_err)
etype = type(err)
if etype in (
) or (
f"Nursery for {current_actor().uid} "
@ -384,23 +382,29 @@ async def _open_and_supervise_one_cancels_all_nursery(
f"Nursery for {current_actor().uid} "
f"errored with")
f"errored with {err}, ")
# cancel all subactors
await anursery.cancel()
except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
# ria_nursery scope end
# TODO: this is the handler around the ``.run_in_actor()``
# nursery. Ideally we can drop this entirely in the future as
# the whole ``.run_in_actor()`` API should be built "on top of"
# this lower level spawn-request-cancel "daemon actor" API where
# a local in-actor task nursery is used with one-to-one task
# + `await` calls and the results/errors are
# handled directly (inline) and errors by the local nursery.
# XXX: do we need a `trio.Cancelled` catch here as well?
# this is the catch around the ``.run_in_actor()`` nursery
except (
) as err:
@ -432,20 +436,18 @@ async def _open_and_supervise_one_cancels_all_nursery(
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `BaseExceptionGroup` as needed
# use `MultiError` as needed
if len(errors) > 1:
raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
raise trio.MultiError(tuple(errors.values()))
raise list(errors.values())[0]
# da_nursery scope end - nursery checkpoint
# final exit
# ria_nursery scope end - nursery checkpoint
# after nursery exit
async def open_nursery(

View File

@ -48,7 +48,7 @@ log = get_logger('messaging')
async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: dict[str, list],
packetizer: typing.Callable | None = None,
packetizer: typing.Callable = None,
) -> None:
Request and fan out quotes to each subscribed actor channel.
@ -144,7 +144,7 @@ _pubtask2lock: dict[str, trio.StrictFIFOLock] = {}
def pub(
wrapped: typing.Callable | None = None,
wrapped: typing.Callable = None,
tasks: set[str] = set(),
@ -249,8 +249,8 @@ def pub(
topics: set[str],
# *,
task_name: str | None = None, # default: only one task allocated
packetizer: Callable | None = None,
task_name: str = None, # default: only one task allocated
packetizer: Callable = None,
if task_name is None:

View File

@ -18,14 +18,12 @@
Log like a forester!
from import Mapping
import sys
import logging
import colorlog # type: ignore
from typing import Optional
import trio
from ._state import current_actor
from ._state import ActorContextInfo
_proj_name: str = 'tractor'
@ -38,8 +36,7 @@ LOG_FORMAT = (
# "{bold_white}{log_color}{asctime}{reset}"
" {bold_white}{thin_white}({reset}"
"{thin_white}{actor_name}[{actor_uid}], "
"{process}, {task}){reset}{bold_white}{thin_white})"
"{thin_white}{actor}, {process}, {task}){reset}{bold_white}{thin_white})"
" {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]"
" {log_color}{name}"
" {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}"
@ -139,40 +136,9 @@ class StackLevelAdapter(logging.LoggerAdapter):
_conc_name_getters = {
'task': lambda: trio.lowlevel.current_task().name,
'actor': lambda: current_actor(),
'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6],
class ActorContextInfo(Mapping):
"Dyanmic lookup for local actor and task names"
_context_keys = (
def __len__(self):
return len(self._context_keys)
def __iter__(self):
return iter(self._context_keys)
def __getitem__(self, key: str) -> str:
return _conc_name_getters[key]()
except RuntimeError:
# no local actor/task context initialized yet
return f'no {key} context'
def get_logger(
name: str | None = None,
name: str = None,
_root_name: str = _proj_name,
) -> StackLevelAdapter:
@ -207,7 +173,7 @@ def get_logger(
def get_console_log(
level: str | None = None,
level: str = None,
) -> logging.LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr.

View File

@ -466,11 +466,11 @@ async def open_channel_from(
# sync to a "started()"-like first delivered value from the
# ``asyncio`` task.
with chan._trio_cs:
first = await chan.receive()
# deliver stream handle upward
with chan._trio_cs:
yield first, chan
chan._trio_exited = True
@ -491,18 +491,16 @@ def run_as_asyncio_guest(
SC semantics.
# Uh, oh.
# :o
# Uh, oh. :o
# It looks like your event loop has caught a case of the ``trio``s.
# :()
# Don't worry, we've heard you'll barely notice. You might
# hallucinate a few more propagating errors and feel like your
# digestion has slowed but if anything get's too bad your parents
# will know about it.
# Don't worry, we've heard you'll barely notice. You might hallucinate
# a few more propagating errors and feel like your digestion has
# slowed but if anything get's too bad your parents will know about
# it.
# :)

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from abc import abstractmethod
from collections import deque
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import partial
from operator import ne
from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol
@ -32,10 +33,7 @@ import trio
from trio._core._run import Task
from import ReceiveChannel
from trio.lowlevel import current_task
from msgspec import Struct
from tractor.log import get_logger
log = get_logger(__name__)
# A regular invariant generic type
T = TypeVar("T")
@ -88,7 +86,8 @@ class Lagged(trio.TooSlowError):
class BroadcastState(Struct):
class BroadcastState:
Common state to all receivers of a broadcast.
@ -111,35 +110,7 @@ class BroadcastState(Struct):
eoc: bool = False
# If the broadcaster was cancelled, we might as well track it
cancelled: dict[int, Task] = {}
def statistics(self) -> dict[str, Any]:
Return broadcast receiver group "statistics" like many of
``trio``'s internal task-sync primitives.
key: int | None
ev: trio.Event | None
subs = self.subs
if self.recv_ready is not None:
key, ev = self.recv_ready
key = ev = None
qlens: dict[int, int] = {}
for tid, sz in subs.items():
qlens[tid] = sz if sz != -1 else 0
return {
'open_consumers': len(subs),
'queued_len_by_task': qlens,
'max_buffer_size': self.maxlen,
'tasks_waiting': ev.statistics().tasks_waiting if ev else 0,
'tasks_cancelled': self.cancelled,
'next_value_receiver_id': key,
cancelled: bool = False
class BroadcastReceiver(ReceiveChannel):
@ -157,40 +128,23 @@ class BroadcastReceiver(ReceiveChannel):
rx_chan: AsyncReceiver,
state: BroadcastState,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = True,
) -> None:
# register the original underlying (clone)
self.key = id(self)
self._state = state
# each consumer has an int count which indicates
# which index contains the next value that the task has not yet
# consumed and thus should read. In the "up-to-date" case the
# consumer task must wait for a new value from the underlying
# receiver and we use ``-1`` as the sentinel for this state.
state.subs[self.key] = -1
# underlying for this receiver
self._rx = rx_chan
self._recv = receive_afunc or rx_chan.receive
self._closed: bool = False
self._raise_on_lag = raise_on_lag
def receive_nowait(
_key: int | None = None,
_state: BroadcastState | None = None,
async def receive(self) -> ReceiveType:
) -> Any:
Sync version of `.receive()` which does all the low level work
of receiving from the underlying/wrapped receive channel.
key = _key or self.key
state = _state or self._state
key = self.key
state = self._state
# TODO: ideally we can make some way to "lock out" the
# underlying receive channel in some way such that if some task
@ -223,47 +177,32 @@ class BroadcastReceiver(ReceiveChannel):
# return this value."
mxln = state.maxlen
lost = seq - mxln
# decrement to the last value and expect
# consumer to either handle the ``Lagged`` and come back
# or bail out on its own (thus un-subscribing)
state.subs[key] = mxln - 1
state.subs[key] = state.maxlen - 1
# this task was overrun by the producer side
task: Task = current_task()
msg = f'Task `{}` overrun and dropped `{lost}` values'
if self._raise_on_lag:
raise Lagged(msg)
return self.receive_nowait(_key, _state)
raise Lagged(f'Task {} was overrun')
state.subs[key] -= 1
return value
raise trio.WouldBlock
async def _receive_from_underlying(
key: int,
state: BroadcastState,
) -> ReceiveType:
# current task already has the latest value **and** is the
# first task to begin waiting for a new one
if state.recv_ready is None:
if self._closed:
raise trio.ClosedResourceError
event = trio.Event()
assert state.recv_ready is None
state.recv_ready = key, event
# if we're cancelled here it should be
# fine to bail without affecting any other consumers
# right?
value = await self._recv()
# items with lower indices are "newer"
@ -281,6 +220,7 @@ class BroadcastReceiver(ReceiveChannel):
# already retreived the last value
# XXX: which of these impls is fastest?
# subs = state.subs.copy()
# subs.pop(key)
@ -311,85 +251,54 @@ class BroadcastReceiver(ReceiveChannel):
# consumers will be awoken with a sequence of -1
# and will potentially try to rewait the underlying
# receiver instead of just cancelling immediately.
self._state.cancelled[key] = current_task()
self._state.cancelled = True
if event.statistics().tasks_waiting:
# Reset receiver waiter task event for next blocking condition.
# this MUST be reset even if the above ``.recv()`` call
# was cancelled to avoid the next consumer from blocking on
# an event that won't be set!
state.recv_ready = None
async def receive(self) -> ReceiveType:
key = self.key
state = self._state
return self.receive_nowait(
except trio.WouldBlock:
# current task already has the latest value **and** is the
# first task to begin waiting for a new one so we begin blocking
# until rescheduled with the a new value from the underlying.
if state.recv_ready is None:
return await self._receive_from_underlying(key, state)
# This task is all caught up and ready to receive the latest
# value, so queue/schedule it to be woken on the next internal
# event.
# value, so queue sched it on the internal event.
while state.recv_ready is not None:
# seq = state.subs[key]
# assert seq == -1 # sanity
seq = state.subs[key]
assert seq == -1 # sanity
_, ev = state.recv_ready
await ev.wait()
return self.receive_nowait(
except trio.WouldBlock:
if self._closed:
raise trio.ClosedResourceError
subs = state.subs
if (
len(subs) == 1
and key in subs
# or cancelled
# XXX: we are the last and only user of this BR so
# likely it makes sense to unwind back to the
# underlying?
# import tractor
# await tractor.breakpoint()
f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?'
# NOTE: if we ever would like the behaviour where if the
# first task to recv on the underlying is cancelled but it
# still DOES trigger the ``.recv_ready``, event we'll likely need
# this logic:
if seq > -1:
# stuff from above..
seq = state.subs[key]
value = state.queue[seq]
state.subs[key] -= 1
return value
elif seq == -1:
# XXX: In the case where the first task to allocate the
# ``.recv_ready`` event is cancelled we will be woken
# with a non-incremented sequence number (the ``-1``
# sentinel) and thus will read the oldest value if we
# use that. Instead we need to detect if we have not
# been incremented and then receive again.
# return await self.receive()
# ``.recv_ready`` event is cancelled we will be woken with
# a non-incremented sequence number and thus will read the
# oldest value if we use that. Instead we need to detect if
# we have not been incremented and then receive again.
return await self.receive()
return await self._receive_from_underlying(key, state)
raise ValueError(f'Invalid sequence {seq}!?')
async def subscribe(
raise_on_lag: bool = True,
) -> AsyncIterator[BroadcastReceiver]:
Subscribe for values from this broadcast receiver.
@ -407,7 +316,6 @@ class BroadcastReceiver(ReceiveChannel):
# assert clone in state.subs
assert br.key in state.subs
@ -444,8 +352,7 @@ def broadcast_receiver(
recv_chan: AsyncReceiver,
max_buffer_size: int,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = True,
) -> BroadcastReceiver:
@ -456,6 +363,5 @@ def broadcast_receiver(

View File

@ -47,7 +47,7 @@ T = TypeVar("T")
async def maybe_open_nursery(
nursery: trio.Nursery | None = None,
nursery: trio.Nursery = None,
shield: bool = False,
) -> AsyncGenerator[trio.Nursery, Any]:
@ -109,17 +109,6 @@ async def gather_contexts(
all_entered = trio.Event()
parent_exit = trio.Event()
# XXX: ensure greedy sequence of manager instances
# since a lazy inline generator doesn't seem to work
# with `async with` syntax.
mngrs = list(mngrs)
if not mngrs:
raise ValueError(
'input mngrs is empty?\n'
'Did try to use inline generator syntax?'
async with trio.open_nursery() as n:
for mngr in mngrs:
@ -133,12 +122,12 @@ async def gather_contexts(
# deliver control once all managers have started up
await all_entered.wait()
# NOTE: order *should* be preserved in the output values
# since ``dict``s are now implicitly ordered.
yield tuple(unwrapped.values())
# NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
# we don't need a try/finally since cancellation will be triggered
# by the surrounding nursery on error.