Compare commits

..

23 Commits

Author SHA1 Message Date
Tyler Goodlet 4bdf7f79f2 `_root`: drop unused `typing` import 2025-03-17 17:28:22 -04:00
Tyler Goodlet a123809558 Use `import <name> as <name>,` style over `__all__` in pkg mod 2025-03-17 17:24:49 -04:00
Tyler Goodlet 2901274189 Log chan-server-startup failures via `.exception()` 2025-03-17 17:22:27 -04:00
Tyler Goodlet 77d79a28b6 `.discovery.get_arbiter()`: add warning around this now deprecated usage 2025-03-17 17:17:28 -04:00
Tyler Goodlet fd3a777cd6 Add `open_root_actor(ensure_registry: bool)`
Allows forcing the opened actor to either obtain the passed registry
addrs or raise a runtime error.
2025-03-17 17:17:18 -04:00
Tyler Goodlet 36e0c3473c Fix doc string "its" typo.. 2025-03-17 17:17:18 -04:00
Tyler Goodlet 4feba68409 Test with `any(portals)` since `gather_contexts()` will return `list[None | tuple]` 2025-03-17 17:17:18 -04:00
Tyler Goodlet 46d7737522 Ignore `greenback` import error if not installed 2025-03-17 17:03:37 -04:00
Tyler Goodlet 0db3dda269 Change remaining internals to use `Actor.reg_addrs` 2025-03-17 17:03:37 -04:00
Tyler Goodlet 92c70f7986 Expose per-actor registry addrs via `.reg_addrs`
Since it's handy to be able to debug the *writing* of this instance var
(particularly when checking state passed down to a child in
`Actor._from_parent()`), rename and wrap the underlying
`Actor._reg_addrs` as a settable `@property` and add validation to
the `.setter` for sanity - actor discovery is a critical functionality.

Other tweaks:
- fix `.cancel_soon()` to pass expected argument..
- update internal runtime error message to be simpler and link to GH issues.
- use new `Actor.reg_addrs` throughout core.
2025-03-17 17:03:35 -04:00
Tyler Goodlet 5c571ec522 Get remaining suites passing..
..by ensuring `reg_addr` fixture value passthrough to subactor eps
2025-03-17 17:00:48 -04:00
Tyler Goodlet 6ee07b21e4 Always dynamically re-read the `._root._default_lo_addrs` value in `find_actor()` 2025-03-17 17:00:48 -04:00
Tyler Goodlet 62ea085f01 Ensure `registry_addrs` is always set to something 2025-03-17 16:59:22 -04:00
Tyler Goodlet 6888984e3f Rename fixture `arb_addr` -> `reg_addr` and set the session value globally as `._root._default_lo_addrs` 2025-03-17 16:58:43 -04:00
Tyler Goodlet ea2f5a5da3 Facepalm, `wait_for_actor()` dun take an addr `list`.. 2025-03-17 16:46:45 -04:00
Tyler Goodlet 0de779012d Change old `._debug._pause()` name, cherry to #362 re `greenback` 2025-03-17 16:46:45 -04:00
Tyler Goodlet 352b8b866a ._root: set a `_default_lo_addrs` and apply it when not provided by caller 2025-03-17 16:46:44 -04:00
Tyler Goodlet 4b381ff656 Always set default reg addr in `find_actor()` if not defined 2025-03-17 16:46:42 -04:00
Tyler Goodlet 751ba476f9 Oof, default reg addrs needs to be in `list[tuple]` form.. 2025-03-17 16:45:50 -04:00
Tyler Goodlet 78305e8808 Add post-mortem catch around failed transport addr binds to aid with runtime debugging 2025-03-17 16:45:41 -04:00
Tyler Goodlet 7d041e056b Rename to `parse_maddr()` and fill out doc strings 2025-03-17 16:45:39 -04:00
Tyler Goodlet 8d2cf6c245 Add libp2p style "multi-address" parser from `piker`
Details are in the module docs; this is a first draft with lotsa room
for refinement and extension.
2025-03-17 16:45:39 -04:00
Tyler Goodlet 48d67f5902 Init-support for "multi homed" transports
Since we'd like to eventually allow a diverse set of transport
(protocol) methods and stacks, and a multi-peer discovery system for
distributed actor-tree applications, this reworks all runtime internals
to support multi-homing for any given tree on a logical host. In other
words any actor can now bind its transport server (currently only
unsecured TCP + `msgspec`) to more then one address available in its
(linux) network namespace. Further, registry actors (now dubbed
"registars" instead of "arbiters") can also similarly bind to multiple
network addresses and provide discovery services to remote actors via
multiple addresses which can now be provided at runtime startup.

Deats:
- adjust `._runtime` internals to use a `list[tuple[str, int]]` (and
  thus pluralized) socket address sequence where applicable for transport
  server socket binds, now exposed via `Actor.accept_addrs`:
  - `Actor.__init__()` now takes a `registry_addrs: list`.
  - `Actor.is_arbiter` -> `.is_registrar`.
  - `._arb_addr` -> `._reg_addrs: list[tuple]`.
  - always reg and de-reg from all registrars in `async_main()`.
  - only set the global runtime var `'_root_mailbox'` to the loopback
    address since normally all in-tree processes should have access to
    it, right?
  - `._serve_forever()` task now takes `listen_sockaddrs: list[tuple]`
- make `open_root_actor()` take a `registry_addrs: list[tuple[str, int]]`
  and defaults when not passed.
- change `ActorNursery.start_..()` methods take `bind_addrs: list` and
  pass down through the spawning layer(s) via the parent-seed-msg.
- generalize all `._discovery()` APIs to accept `registry_addrs`-like
  inputs and move all relevant subsystems to adopt the "registry" style
  naming instead of "arbiter":
  - make `find_actor()` support batched concurrent portal queries over
    all provided input addresses using `.trionics.gather_contexts()` Bo
  - syntax: move to using `async with <tuples>` 3.9+ style chained
    @acms.
  - a general modernization of the code to a python 3.9+ style.
  - start deprecation and change to "registry" naming / semantics:
    - `._discovery.get_arbiter()` -> `.get_registry()`
2025-03-17 16:44:00 -04:00
10 changed files with 332 additions and 950 deletions

View File

@ -10,7 +10,6 @@ TODO:
- wonder if any of it'll work on OS X?
"""
from functools import partial
import itertools
from typing import Optional
import platform
@ -27,10 +26,6 @@ from pexpect.exceptions import (
from tractor._testing import (
examples_dir,
)
from tractor.devx._debug import (
_pause_msg,
_crash_msg,
)
from conftest import (
_ci_env,
)
@ -128,52 +123,20 @@ def expect(
raise
def in_prompt_msg(
prompt: str,
parts: list[str],
pause_on_false: bool = False,
print_prompt_on_false: bool = True,
) -> bool:
'''
Predicate check if (the prompt's) std-streams output has all
`str`-parts in it.
Can be used in test asserts for bulk matching expected
log/REPL output for a given `pdb` interact point.
'''
for part in parts:
if part not in prompt:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
return False
return True
def assert_before(
child,
patts: list[str],
**kwargs,
) -> None:
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
parts=patts,
before = str(child.before.decode())
**kwargs
)
for patt in patts:
try:
assert patt in before
except AssertionError:
print(before)
raise
@pytest.fixture(
@ -232,10 +195,7 @@ def test_root_actor_error(spawn, user_in_out):
before = str(child.before.decode())
# make sure expected logging and error arrives
assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
)
assert "Attaching to pdb in crashed actor: ('root'" in before
assert 'AssertionError' in before
# send user command
@ -372,10 +332,7 @@ def test_subactor_error(
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
)
assert "Attaching to pdb in crashed actor: ('name_error'" in before
if do_next:
child.sendline('n')
@ -396,15 +353,9 @@ def test_subactor_error(
before = str(child.before.decode())
# root actor gets debugger engaged
assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
)
assert "Attaching to pdb in crashed actor: ('root'" in before
# error is a remote error propagated from the subactor
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
)
assert "RemoteActorError: ('name_error'" in before
# another round
if ctlc:
@ -429,10 +380,7 @@ def test_subactor_breakpoint(
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
# do some "next" commands to demonstrate recurrent breakpoint
# entries
@ -448,10 +396,7 @@ def test_subactor_breakpoint(
child.sendline('continue')
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
if ctlc:
do_ctlc(child)
@ -496,10 +441,7 @@ def test_multi_subactors(
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
if ctlc:
do_ctlc(child)
@ -519,10 +461,7 @@ def test_multi_subactors(
# first name_error failure
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
)
assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert "NameError" in before
if ctlc:
@ -548,10 +487,7 @@ def test_multi_subactors(
child.sendline('c')
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
)
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
if ctlc:
do_ctlc(child)
@ -591,21 +527,17 @@ def test_multi_subactors(
child.expect(PROMPT)
before = str(child.before.decode())
assert_before(
child, [
# debugger attaches to root
# "Attaching to pdb in crashed actor: ('root'",
_crash_msg,
"('root'",
assert_before(child, [
# debugger attaches to root
"Attaching to pdb in crashed actor: ('root'",
# expect a multierror with exceptions for each sub-actor
"RemoteActorError: ('breakpoint_forever'",
"RemoteActorError: ('name_error'",
"RemoteActorError: ('spawn_error'",
"RemoteActorError: ('name_error_1'",
'bdb.BdbQuit',
]
)
# expect a multierror with exceptions for each sub-actor
"RemoteActorError: ('breakpoint_forever'",
"RemoteActorError: ('name_error'",
"RemoteActorError: ('spawn_error'",
"RemoteActorError: ('name_error_1'",
'bdb.BdbQuit',
])
if ctlc:
do_ctlc(child)
@ -642,22 +574,15 @@ def test_multi_daemon_subactors(
# the root's tty lock first so anticipate either crash
# message on the first entry.
bp_forev_parts = [_pause_msg, "('bp_forever'"]
bp_forev_in_msg = partial(
in_prompt_msg,
parts=bp_forev_parts,
)
bp_forever_msg = "Attaching pdb to actor: ('bp_forever'"
name_error_msg = "NameError: name 'doggypants' is not defined"
name_error_parts = [name_error_msg]
before = str(child.before.decode())
if bp_forev_in_msg(prompt=before):
next_parts = name_error_parts
if bp_forever_msg in before:
next_msg = name_error_msg
elif name_error_msg in before:
next_parts = bp_forev_parts
next_msg = bp_forever_msg
else:
raise ValueError("Neither log msg was found !?")
@ -674,10 +599,7 @@ def test_multi_daemon_subactors(
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
next_parts,
)
assert_before(child, [next_msg])
# XXX: hooray the root clobbering the child here was fixed!
# IMO, this demonstrates the true power of SC system design.
@ -701,15 +623,9 @@ def test_multi_daemon_subactors(
child.expect(PROMPT)
try:
assert_before(
child,
bp_forev_parts,
)
assert_before(child, [bp_forever_msg])
except AssertionError:
assert_before(
child,
name_error_parts,
)
assert_before(child, [name_error_msg])
else:
if ctlc:
@ -721,10 +637,7 @@ def test_multi_daemon_subactors(
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
name_error_parts,
)
assert_before(child, [name_error_msg])
# wait for final error in root
# where it crashs with boxed error
@ -734,7 +647,7 @@ def test_multi_daemon_subactors(
child.expect(PROMPT)
assert_before(
child,
bp_forev_parts
[bp_forever_msg]
)
except AssertionError:
break
@ -743,9 +656,7 @@ def test_multi_daemon_subactors(
child,
[
# boxed error raised in root task
# "Attaching to pdb in crashed actor: ('root'",
_crash_msg,
"('root'",
"Attaching to pdb in crashed actor: ('root'",
"_exceptions.RemoteActorError: ('name_error'",
]
)
@ -859,7 +770,7 @@ def test_multi_nested_subactors_error_through_nurseries(
child = spawn('multi_nested_subactors_error_up_through_nurseries')
# timed_out_early: bool = False
timed_out_early: bool = False
for send_char in itertools.cycle(['c', 'q']):
try:
@ -960,14 +871,11 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
if not timed_out_early:
before = str(child.before.decode())
assert_before(
child,
[
"tractor._exceptions.RemoteActorError: ('spawner0'",
"tractor._exceptions.RemoteActorError: ('name_error'",
"NameError: name 'doggypants' is not defined",
],
)
assert_before(child, [
"tractor._exceptions.RemoteActorError: ('spawner0'",
"tractor._exceptions.RemoteActorError: ('name_error'",
"NameError: name 'doggypants' is not defined",
])
def test_root_cancels_child_context_during_startup(
@ -1001,10 +909,8 @@ def test_different_debug_mode_per_actor(
# only one actor should enter the debugger
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_crash_msg, "('debugged_boi'", "RuntimeError"],
)
assert "Attaching to pdb in crashed actor: ('debugged_boi'" in before
assert "RuntimeError" in before
if ctlc:
do_ctlc(child)

View File

@ -868,9 +868,6 @@ class Context:
# TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs?
# NOTE: this usage actually works here B)
# from .devx._debug import breakpoint
# await breakpoint()
# TODO: add to `Channel`?
@property

File diff suppressed because it is too large Load Diff

View File

@ -37,7 +37,7 @@ from ._runtime import (
# Arbiter as Registry,
async_main,
)
from .devx import _debug
from . import _debug
from . import _spawn
from . import _state
from . import log
@ -99,7 +99,7 @@ async def open_root_actor(
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None)
os.environ['PYTHONBREAKPOINT'] = 'tractor.devx._debug.pause_from_sync'
os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.pause_from_sync'
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
@ -146,7 +146,7 @@ async def open_root_actor(
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
enable_modules.append('tractor.devx._debug')
enable_modules.append('tractor._debug')
# if debug mode get's enabled *at least* use that level of
# logging for some informative console prompts.

View File

@ -78,7 +78,7 @@ from ._exceptions import (
ContextCancelled,
TransportClosed,
)
from .devx import _debug
from . import _debug
from ._discovery import get_registry
from ._portal import Portal
from . import _state
@ -197,7 +197,7 @@ class Actor:
self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
# always include debugging tools module
enable_modules.append('tractor.devx._debug')
enable_modules.append('tractor._debug')
self.enable_modules: dict[str, str] = {}
for name in enable_modules:

View File

@ -34,7 +34,7 @@ from typing import (
import trio
from trio import TaskStatus
from .devx._debug import (
from ._debug import (
maybe_wait_for_debugger,
acquire_debug_lock,
)
@ -554,14 +554,13 @@ async def trio_proc(
with trio.move_on_after(0.5):
await proc.wait()
log.pdb(
'Delaying subproc reaper while debugger locked..'
)
await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False
),
header_msg=(
'Delaying subproc reaper while debugger locked..\n'
),
# TODO: need a diff value then default?
# poll_steps=9999999,
)

View File

@ -28,7 +28,7 @@ import warnings
import trio
from .devx._debug import maybe_wait_for_debugger
from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
from ._runtime import Actor

View File

@ -1,37 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Runtime "developer experience" utils and addons to aid our
(advanced) users and core devs in building distributed applications
and working with/on the actor runtime.
"""
from ._debug import (
maybe_wait_for_debugger as maybe_wait_for_debugger,
acquire_debug_lock as acquire_debug_lock,
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
shield_sigint_handler as shield_sigint_handler,
MultiActorPdb as MultiActorPdb,
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
post_mortem as post_mortem,
)
from ._stackscope import (
enable_stack_on_sig as enable_stack_on_sig,
)

View File

@ -1,84 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The fundamental cross process SC abstraction: an inter-actor,
cancel-scope linked task "context".
A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors.
'''
from signal import (
signal,
SIGUSR1,
)
import trio
@trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None:
import stackscope
from tractor.log import get_console_log
tree_str: str = str(
stackscope.extract(
trio.lowlevel.current_root_task(),
recurse_child_tasks=True
)
)
log = get_console_log('cancel')
log.pdb(
f'Dumping `stackscope` tree:\n\n'
f'{tree_str}\n'
)
# import logging
# try:
# with open("/dev/tty", "w") as tty:
# tty.write(tree_str)
# except BaseException:
# logging.getLogger(
# "task_tree"
# ).exception("Error printing task tree")
def signal_handler(sig: int, frame: object) -> None:
import traceback
try:
trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree)
except RuntimeError:
# not in async context -- print a normal traceback
traceback.print_stack()
def enable_stack_on_sig(
sig: int = SIGUSR1
) -> None:
'''
Enable `stackscope` tracing on reception of a signal; by
default this is SIGUSR1.
'''
signal(
sig,
signal_handler,
)
# NOTE: not the above can be triggered from
# a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>')

View File

@ -1,129 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
CLI framework extensions for hacking on the actor runtime.
Currently popular frameworks supported are:
- `typer` via the `@callback` API
"""
from __future__ import annotations
from typing import (
Any,
Callable,
)
from typing_extensions import Annotated
import typer
_runtime_vars: dict[str, Any] = {}
def load_runtime_vars(
ctx: typer.Context,
callback: Callable,
pdb: bool = False, # --pdb
ll: Annotated[
str,
typer.Option(
'--loglevel',
'-l',
help='BigD logging level',
),
] = 'cancel', # -l info
):
'''
Maybe engage crash handling with `pdbp` when code inside
a `typer` CLI endpoint cmd raises.
To use this callback simply take your `app = typer.Typer()` instance
and decorate this function with it like so:
.. code:: python
from tractor.devx import cli
app = typer.Typer()
# manual decoration to hook into `click`'s context system!
cli.load_runtime_vars = app.callback(
invoke_without_command=True,
)
And then you can use the now augmented `click` CLI context as so,
.. code:: python
@app.command(
context_settings={
"allow_extra_args": True,
"ignore_unknown_options": True,
}
)
def my_cli_cmd(
ctx: typer.Context,
):
rtvars: dict = ctx.runtime_vars
pdb: bool = rtvars['pdb']
with tractor.devx.cli.maybe_open_crash_handler(pdb=pdb):
trio.run(
partial(
my_tractor_main_task_func,
debug_mode=pdb,
loglevel=rtvars['ll'],
)
)
which will enable log level and debug mode globally for the entire
`tractor` + `trio` runtime thereafter!
Bo
'''
global _runtime_vars
_runtime_vars |= {
'pdb': pdb,
'll': ll,
}
ctx.runtime_vars: dict[str, Any] = _runtime_vars
print(
f'`typer` sub-cmd: {ctx.invoked_subcommand}\n'
f'`tractor` runtime vars: {_runtime_vars}'
)
# XXX NOTE XXX: hackzone.. if no sub-cmd is specified (the
# default if the user just invokes `bigd`) then we simply
# invoke the sole `_bigd()` cmd passing in the "parent"
# typer.Context directly to that call since we're treating it
# as a "non sub-command" or wtv..
# TODO: ideally typer would have some kinda built-in way to get
# this behaviour without having to construct and manually
# invoke our own cmd..
if (
ctx.invoked_subcommand is None
or ctx.invoked_subcommand == callback.__name__
):
cmd: typer.core.TyperCommand = typer.core.TyperCommand(
name='bigd',
callback=callback,
)
ctx.params = {'ctx': ctx}
cmd.invoke(ctx)