Compare commits

..

4 Commits

Author SHA1 Message Date
Tyler Goodlet 7ffdf3483a Use shorthand nursery var-names per convention in codebase 2025-09-12 12:28:04 -04:00
Tyler Goodlet 76ed0f2ef6 Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding
a `.cancel_service_task()`.

Also,
- move `_open_and_supervise_service_ctx()` to mod level.
- rename `target` -> `ctx_fn` params througout.
- fill out method doc strings.
2025-09-12 12:28:04 -04:00
Tyler Goodlet 2afb624c48 Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).

API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.

`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
2025-09-12 12:28:04 -04:00
Tyler Goodlet 885137ac19 Initial idea-notes dump and @singleton factory idea from `trio`-gitter 2025-09-12 12:28:04 -04:00
46 changed files with 231 additions and 1273 deletions

View File

@ -17,7 +17,6 @@ from tractor import (
MsgStream, MsgStream,
_testing, _testing,
trionics, trionics,
TransportClosed,
) )
import trio import trio
import pytest import pytest
@ -209,16 +208,12 @@ async def main(
# TODO: is this needed or no? # TODO: is this needed or no?
raise raise
except ( except trio.ClosedResourceError:
trio.ClosedResourceError,
TransportClosed,
) as _tpt_err:
# NOTE: don't send if we already broke the # NOTE: don't send if we already broke the
# connection to avoid raising a closed-error # connection to avoid raising a closed-error
# such that we drop through to the ctl-c # such that we drop through to the ctl-c
# mashing by user. # mashing by user.
with trio.CancelScope(shield=True): await trio.sleep(0.01)
await trio.sleep(0.01)
# timeout: int = 1 # timeout: int = 1
# with trio.move_on_after(timeout) as cs: # with trio.move_on_after(timeout) as cs:
@ -252,7 +247,6 @@ async def main(
await stream.send(i) await stream.send(i)
pytest.fail('stream not closed?') pytest.fail('stream not closed?')
except ( except (
TransportClosed,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
) as send_err: ) as send_err:

View File

@ -1,27 +0,0 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1769018530,
"narHash": "sha256-MJ27Cy2NtBEV5tsK+YraYr2g851f3Fl1LpNHDzDX15c=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "88d3861acdd3d2f0e361767018218e51810df8a1",
"type": "github"
},
"original": {
"owner": "nixos",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs"
}
}
},
"root": "root",
"version": 7
}

View File

@ -1,70 +0,0 @@
# An "impure" template thx to `pyproject.nix`,
# https://pyproject-nix.github.io/pyproject.nix/templates.html#impure
# https://github.com/pyproject-nix/pyproject.nix/blob/master/templates/impure/flake.nix
{
description = "An impure overlay (w dev-shell) using `uv`";
inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable";
};
outputs =
{ nixpkgs, ... }:
let
inherit (nixpkgs) lib;
forAllSystems = lib.genAttrs lib.systems.flakeExposed;
in
{
devShells = forAllSystems (
system:
let
pkgs = nixpkgs.legacyPackages.${system};
# XXX NOTE XXX, for now we overlay specific pkgs via
# a major-version-pinned-`cpython`
cpython = "python313";
venv_dir = "py313";
pypkgs = pkgs."${cpython}Packages";
in
{
default = pkgs.mkShell {
packages = [
# XXX, ensure sh completions activate!
pkgs.bashInteractive
pkgs.bash-completion
# XXX, on nix(os), use pkgs version to avoid
# build/sys-sh-integration issues
pkgs.ruff
pkgs.uv
pkgs.${cpython}# ?TODO^ how to set from `cpython` above?
];
shellHook = ''
# unmask to debug **this** dev-shell-hook
# set -e
# link-in c++ stdlib for various AOT-ext-pkgs (numpy, etc.)
LD_LIBRARY_PATH="${pkgs.stdenv.cc.cc.lib}/lib:$LD_LIBRARY_PATH"
export LD_LIBRARY_PATH
# RUNTIME-SETTINGS
# ------ uv ------
# - always use the ./py313/ venv-subdir
# - sync env with all extras
export UV_PROJECT_ENVIRONMENT=${venv_dir}
uv sync --dev --all-extras
# ------ TIPS ------
# NOTE, to launch the py-venv installed `xonsh` (like @goodboy)
# run the `nix develop` cmd with,
# >> nix develop -c uv run xonsh
'';
};
}
);
};
}

View File

@ -53,33 +53,22 @@ dependencies = [
[dependency-groups] [dependency-groups]
dev = [ dev = [
{include-group = 'devx'},
{include-group = 'testing'},
{include-group = 'repl'},
]
devx = [
# `tractor.devx` tooling
"greenback>=1.2.1,<2",
"stackscope>=0.2.2,<0.3",
# ^ requires this?
"typing-extensions>=4.14.1",
]
testing = [
# test suite # test suite
# TODO: maybe some of these layout choices? # TODO: maybe some of these layout choices?
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules # https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
"pytest>=8.3.5", "pytest>=8.3.5",
"pexpect>=4.9.0,<5", "pexpect>=4.9.0,<5",
] # `tractor.devx` tooling
repl = [ "greenback>=1.2.1,<2",
"stackscope>=0.2.2,<0.3",
# ^ requires this?
"typing-extensions>=4.14.1",
"pyperclip>=1.9.0", "pyperclip>=1.9.0",
"prompt-toolkit>=3.0.50", "prompt-toolkit>=3.0.50",
"xonsh>=0.19.2", "xonsh>=0.19.2",
"psutil>=7.0.0", "psutil>=7.0.0",
] ]
lint = [
"ruff>=0.9.6"
]
# TODO, add these with sane versions; were originally in # TODO, add these with sane versions; were originally in
# `requirements-docs.txt`.. # `requirements-docs.txt`..
# docs = [ # docs = [

View File

@ -65,11 +65,7 @@ def loglevel(request):
import tractor import tractor
orig = tractor.log._default_loglevel orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel level = tractor.log._default_loglevel = request.config.option.loglevel
log = tractor.log.get_console_log( tractor.log.get_console_log(level)
level=level,
name='tractor', # <- enable root logger
)
log.info(f'Test-harness logging level: {level}\n')
yield level yield level
tractor.log._default_loglevel = orig tractor.log._default_loglevel = orig

View File

@ -4,7 +4,6 @@
''' '''
from __future__ import annotations from __future__ import annotations
import time import time
import signal
from typing import ( from typing import (
Callable, Callable,
TYPE_CHECKING, TYPE_CHECKING,
@ -35,10 +34,7 @@ if TYPE_CHECKING:
# a fn that sub-instantiates a `pexpect.spawn()` # a fn that sub-instantiates a `pexpect.spawn()`
# and returns it. # and returns it.
type PexpectSpawner = Callable[ type PexpectSpawner = Callable[[str], pty_spawn.spawn]
[str],
pty_spawn.spawn,
]
@pytest.fixture @pytest.fixture
@ -70,15 +66,12 @@ def spawn(
import os import os
os.environ['PYTHON_COLORS'] = '0' os.environ['PYTHON_COLORS'] = '0'
spawned: PexpectSpawner|None = None
def _spawn( def _spawn(
cmd: str, cmd: str,
**mkcmd_kwargs, **mkcmd_kwargs,
) -> pty_spawn.spawn: ) -> pty_spawn.spawn:
nonlocal spawned
unset_colors() unset_colors()
spawned = testdir.spawn( return testdir.spawn(
cmd=mk_cmd( cmd=mk_cmd(
cmd, cmd,
**mkcmd_kwargs, **mkcmd_kwargs,
@ -88,35 +81,9 @@ def spawn(
# ^TODO? get `pytest` core to expose underlying # ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff? # `pexpect.spawn()` stuff?
) )
return spawned
# such that test-dep can pass input script name. # such that test-dep can pass input script name.
yield _spawn # the `PexpectSpawner`, type alias. return _spawn # the `PexpectSpawner`, type alias.
if (
spawned
and
(ptyproc := spawned.ptyproc)
):
start: float = time.time()
timeout: float = 5
while (
ptyproc.isalive()
and
(
(_time_took := (time.time() - start))
<
timeout
)
):
ptyproc.kill(signal.SIGINT)
time.sleep(0.01)
if ptyproc.isalive():
ptyproc.kill(signal.SIGKILL)
# TODO? ensure we've cleaned up any UDS-paths?
# breakpoint()
@pytest.fixture( @pytest.fixture(
@ -142,11 +109,7 @@ def ctlc(
'https://github.com/goodboy/tractor/issues/320' 'https://github.com/goodboy/tractor/issues/320'
) )
if ( if mark.name == 'ctlcs_bish':
mark.name == 'ctlcs_bish'
and
use_ctlc
):
pytest.skip( pytest.skip(
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n' f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
f'The test and/or underlying example script can *sometimes* run fine ' f'The test and/or underlying example script can *sometimes* run fine '

View File

@ -1138,10 +1138,7 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
['peer IPC channel closed abruptly?', ['peer IPC channel closed abruptly?',
'another task closed this fd', 'another task closed this fd',
'Debug lock request was CANCELLED?', 'Debug lock request was CANCELLED?',
"'MsgpackUDSStream' was already closed locally?", "TransportClosed: 'MsgpackUDSStream' was already closed locally ?",]
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
]
# XXX races on whether these show/hit? # XXX races on whether these show/hit?
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!', # 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',

View File

@ -98,8 +98,7 @@ def test_ipc_channel_break_during_stream(
expect_final_exc = TransportClosed expect_final_exc = TransportClosed
mod: ModuleType = import_path( mod: ModuleType = import_path(
examples_dir() examples_dir() / 'advanced_faults'
/ 'advanced_faults'
/ 'ipc_failure_during_stream.py', / 'ipc_failure_during_stream.py',
root=examples_dir(), root=examples_dir(),
consider_namespace_packages=False, consider_namespace_packages=False,
@ -114,9 +113,8 @@ def test_ipc_channel_break_during_stream(
if ( if (
# only expect EoC if trans is broken on the child side, # only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
and
# AND we tell the child to call `MsgStream.aclose()`. # AND we tell the child to call `MsgStream.aclose()`.
pre_aclose_msgstream and pre_aclose_msgstream
): ):
# expect_final_exc = trio.EndOfChannel # expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this # ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
@ -162,8 +160,7 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
and ( and (
ipc_break['break_parent_ipc_after'] ipc_break['break_parent_ipc_after']
> > ipc_break['break_child_ipc_after']
ipc_break['break_child_ipc_after']
) )
): ):
if pre_aclose_msgstream: if pre_aclose_msgstream:
@ -251,15 +248,8 @@ def test_ipc_channel_break_during_stream(
# get raw instance from pytest wrapper # get raw instance from pytest wrapper
value = excinfo.value value = excinfo.value
if isinstance(value, ExceptionGroup): if isinstance(value, ExceptionGroup):
excs: tuple[Exception] = value.exceptions excs = value.exceptions
assert ( assert len(excs) == 1
len(excs) <= 2
and
all(
isinstance(exc, TransportClosed)
for exc in excs
)
)
final_exc = excs[0] final_exc = excs[0]
assert isinstance(final_exc, expect_final_exc) assert isinstance(final_exc, expect_final_exc)

View File

@ -11,13 +11,12 @@ import trio
import tractor import tractor
from tractor import ( # typing from tractor import ( # typing
Actor, Actor,
Context,
ContextCancelled,
MsgStream,
Portal,
RemoteActorError,
current_actor, current_actor,
open_nursery, open_nursery,
Portal,
Context,
ContextCancelled,
RemoteActorError,
) )
from tractor._testing import ( from tractor._testing import (
# tractor_test, # tractor_test,
@ -797,8 +796,8 @@ async def basic_echo_server(
) -> None: ) -> None:
''' '''
Just the simplest `MsgStream` echo server which resays what you Just the simplest `MsgStream` echo server which resays what
told it but with its uid in front ;) you told it but with its uid in front ;)
''' '''
actor: Actor = tractor.current_actor() actor: Actor = tractor.current_actor()
@ -967,14 +966,9 @@ async def tell_little_bro(
caller: str = '', caller: str = '',
err_after: float|None = None, err_after: float|None = None,
rng_seed: int = 100, rng_seed: int = 50,
# NOTE, ensure ^ is large enough (on fast hw anyway)
# to ensure the peer cancel req arrives before the
# echoing dialog does itself Bp
): ):
# contact target actor, do a stream dialog. # contact target actor, do a stream dialog.
lb: Portal
echo_ipc: MsgStream
async with ( async with (
tractor.wait_for_actor( tractor.wait_for_actor(
name=actor_name name=actor_name
@ -989,6 +983,7 @@ async def tell_little_bro(
else None else None
), ),
) as (sub_ctx, first), ) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc, sub_ctx.open_stream() as echo_ipc,
): ):
actor: Actor = current_actor() actor: Actor = current_actor()
@ -999,7 +994,6 @@ async def tell_little_bro(
i, i,
) )
await echo_ipc.send(msg) await echo_ipc.send(msg)
await trio.sleep(0.001)
resp = await echo_ipc.receive() resp = await echo_ipc.receive()
print( print(
f'{caller} => {actor_name}: {msg}\n' f'{caller} => {actor_name}: {msg}\n'
@ -1012,9 +1006,6 @@ async def tell_little_bro(
assert sub_uid != uid assert sub_uid != uid
assert _i == i assert _i == i
# XXX, usually should never get here!
# await tractor.pause()
@pytest.mark.parametrize( @pytest.mark.parametrize(
'raise_client_error', 'raise_client_error',
@ -1029,9 +1020,6 @@ def test_peer_spawns_and_cancels_service_subactor(
raise_client_error: str, raise_client_error: str,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
raise_sub_spawn_error_after: float|None, raise_sub_spawn_error_after: float|None,
loglevel: str,
# ^XXX, set to 'warning' to see masked-exc warnings
# that may transpire during actor-nursery teardown.
): ):
# NOTE: this tests for the modden `mod wks open piker` bug # NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx # discovered as part of implementing workspace ctx
@ -1061,7 +1049,6 @@ def test_peer_spawns_and_cancels_service_subactor(
# NOTE: to halt the peer tasks on ctxc, uncomment this. # NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode, debug_mode=debug_mode,
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
loglevel=loglevel,
) as an: ) as an:
server: Portal = await an.start_actor( server: Portal = await an.start_actor(
(server_name := 'spawn_server'), (server_name := 'spawn_server'),

View File

@ -1,185 +0,0 @@
'''
`tractor.log`-wrapping unit tests.
'''
from pathlib import Path
import shutil
from types import ModuleType
import pytest
import tractor
from tractor import (
_code_load,
log,
)
def test_root_pkg_not_duplicated_in_logger_name():
'''
When both `pkg_name` and `name` are passed and they have
a common `<root_name>.< >` prefix, ensure that it is not
duplicated in the child's `StackLevelAdapter.name: str`.
'''
project_name: str = 'pylib'
pkg_path: str = 'pylib.subpkg.mod'
assert not tractor.current_actor(
err_on_no_runtime=False,
)
proj_log = log.get_logger(
pkg_name=project_name,
mk_sublog=False,
)
sublog = log.get_logger(
pkg_name=project_name,
name=pkg_path,
)
assert proj_log is not sublog
assert sublog.name.count(proj_log.name) == 1
assert 'mod' not in sublog.name
def test_implicit_mod_name_applied_for_child(
testdir: pytest.Pytester,
loglevel: str,
):
'''
Verify that when `.log.get_logger(pkg_name='pylib')` is called
from a given sub-mod from within the `pylib` pkg-path, we
implicitly set the equiv of `name=__name__` from the caller's
module.
'''
# tractor.log.get_console_log(level=loglevel)
proj_name: str = 'snakelib'
mod_code: str = (
f'import tractor\n'
f'\n'
# if you need to trace `testdir` stuff @ import-time..
# f'breakpoint()\n'
f'log = tractor.log.get_logger(pkg_name="{proj_name}")\n'
)
# create a sub-module for each pkg layer
_lib = testdir.mkpydir(proj_name)
pkg: Path = Path(_lib)
pkg_init_mod: Path = pkg / "__init__.py"
pkg_init_mod.write_text(mod_code)
subpkg: Path = pkg / 'subpkg'
subpkg.mkdir()
subpkgmod: Path = subpkg / "__init__.py"
subpkgmod.touch()
subpkgmod.write_text(mod_code)
_submod: Path = testdir.makepyfile(
_mod=mod_code,
)
pkg_submod = pkg / 'mod.py'
pkg_subpkg_submod = subpkg / 'submod.py'
shutil.copyfile(
_submod,
pkg_submod,
)
shutil.copyfile(
_submod,
pkg_subpkg_submod,
)
testdir.chdir()
# NOTE, to introspect the py-file-module-layout use (in .xsh
# syntax): `ranger @str(testdir)`
# XXX NOTE, once the "top level" pkg mod has been
# imported, we can then use `import` syntax to
# import it's sub-pkgs and modules.
subpkgmod: ModuleType = _code_load.load_module_from_path(
Path(pkg / '__init__.py'),
module_name=proj_name,
)
pkg_root_log = log.get_logger(
pkg_name=proj_name,
mk_sublog=False,
)
# the top level pkg-mod, created just now,
# by above API call.
assert pkg_root_log.name == proj_name
assert not pkg_root_log.logger.getChildren()
#
# ^TODO! test this same output but created via a `get_logger()`
# call in the `snakelib.__init__py`!!
# NOTE, the pkg-level "init mod" should of course
# have the same name as the package ns-path.
import snakelib as init_mod
assert init_mod.log.name == proj_name
# NOTE, a first-pkg-level sub-module should only
# use the package-name since the leaf-node-module
# will be included in log headers by default.
from snakelib import mod
assert mod.log.name == proj_name
from snakelib import subpkg
assert (
subpkg.log.name
==
subpkg.__package__
==
f'{proj_name}.subpkg'
)
from snakelib.subpkg import submod
assert (
submod.log.name
==
submod.__package__
==
f'{proj_name}.subpkg'
)
sub_logs = pkg_root_log.logger.getChildren()
assert len(sub_logs) == 1 # only one nested sub-pkg module
assert submod.log.logger in sub_logs
# TODO, moar tests against existing feats:
# ------ - ------
# - [ ] color settings?
# - [ ] header contents like,
# - actor + thread + task names from various conc-primitives,
# - [ ] `StackLevelAdapter` extensions,
# - our custom levels/methods: `transport|runtime|cance|pdb|devx`
# - [ ] custom-headers support?
#
# TODO, test driven dev of new-ideas/long-wanted feats,
# ------ - ------
# - [ ] https://github.com/goodboy/tractor/issues/244
# - [ ] @catern mentioned using a sync / deterministic sys
# and in particular `svlogd`?
# |_ https://smarden.org/runit/svlogd.8
# - [ ] using adapter vs. filters?
# - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838
# - [ ] `.at_least_level()` optimization which short circuits wtv
# `logging` is doing behind the scenes when the level filters
# the emission..?
# - [ ] use of `.log.get_console_log()` in subactors and the
# subtleties of ensuring it actually emits from a subproc.
# - [ ] this idea of activating per-subsys emissions with some
# kind of `.name` filter passed to the runtime or maybe configured
# via the root `StackLevelAdapter`?
# - [ ] use of `logging.dict.dictConfig()` to simplify the impl
# of any of ^^ ??
# - https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# - https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
# - https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig

View File

@ -1,13 +1,8 @@
""" """
Multiple python programs invoking the runtime. Multiple python programs invoking the runtime.
""" """
from __future__ import annotations
import platform import platform
import subprocess
import time import time
from typing import (
TYPE_CHECKING,
)
import pytest import pytest
import trio import trio
@ -15,29 +10,14 @@ import tractor
from tractor._testing import ( from tractor._testing import (
tractor_test, tractor_test,
) )
from tractor import (
current_actor,
_state,
Actor,
Context,
Portal,
)
from .conftest import ( from .conftest import (
sig_prog, sig_prog,
_INT_SIGNAL, _INT_SIGNAL,
_INT_RETURN_CODE, _INT_RETURN_CODE,
) )
if TYPE_CHECKING:
from tractor.msg import Aid
from tractor._addr import (
UnwrappedAddress,
)
def test_abort_on_sigint(daemon):
def test_abort_on_sigint(
daemon: subprocess.Popen,
):
assert daemon.returncode is None assert daemon.returncode is None
time.sleep(0.1) time.sleep(0.1)
sig_prog(daemon, _INT_SIGNAL) sig_prog(daemon, _INT_SIGNAL)
@ -50,11 +30,8 @@ def test_abort_on_sigint(
@tractor_test @tractor_test
async def test_cancel_remote_arbiter( async def test_cancel_remote_arbiter(daemon, reg_addr):
daemon: subprocess.Popen, assert not tractor.current_actor().is_arbiter
reg_addr: UnwrappedAddress,
):
assert not current_actor().is_arbiter
async with tractor.get_registry(reg_addr) as portal: async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor() await portal.cancel_actor()
@ -68,106 +45,24 @@ async def test_cancel_remote_arbiter(
pass pass
def test_register_duplicate_name( def test_register_duplicate_name(daemon, reg_addr):
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
) as an: ) as n:
assert not current_actor().is_arbiter assert not tractor.current_actor().is_arbiter
p1 = await an.start_actor('doggy') p1 = await n.start_actor('doggy')
p2 = await an.start_actor('doggy') p2 = await n.start_actor('doggy')
async with tractor.wait_for_actor('doggy') as portal: async with tractor.wait_for_actor('doggy') as portal:
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid) assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
await an.cancel() await n.cancel()
# XXX, run manually since we want to start this root **after** # run it manually since we want to start **after**
# the other "daemon" program with it's own root. # the other "daemon" program
trio.run(main)
@tractor.context
async def get_root_portal(
ctx: Context,
):
'''
Connect back to the root actor manually (using `._discovery` API)
and ensure it's contact info is the same as our immediate parent.
'''
sub: Actor = current_actor()
rtvs: dict = _state._runtime_vars
raddrs: list[UnwrappedAddress] = rtvs['_root_addrs']
# await tractor.pause()
# XXX, in case the sub->root discovery breaks you might need
# this (i know i did Xp)!!
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
assert (
len(raddrs) == 1
and
list(sub._parent_chan.raddr.unwrap()) in raddrs
)
# connect back to our immediate parent which should also
# be the actor-tree's root.
from tractor._discovery import get_root
ptl: Portal
async with get_root() as ptl:
root_aid: Aid = ptl.chan.aid
parent_ptl: Portal = current_actor().get_parent()
assert (
root_aid.name == 'root'
and
parent_ptl.chan.aid == root_aid
)
await ctx.started()
def test_non_registrar_spawns_child(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
loglevel: str,
debug_mode: bool,
):
'''
Ensure a non-regristar (serving) root actor can spawn a sub and
that sub can connect back (manually) to it's rent that is the
root without issue.
More or less this audits the global contact info in
`._state._runtime_vars`.
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
loglevel=loglevel,
debug_mode=debug_mode,
) as an:
actor: Actor = tractor.current_actor()
assert not actor.is_registrar
sub_ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with sub_ptl.open_context(
get_root_portal,
) as (ctx, _):
print('Waiting for `sub` to connect back to us..')
await an.cancel()
# XXX, run manually since we want to start this root **after**
# the other "daemon" program with it's own root.
trio.run(main) trio.run(main)

View File

@ -17,8 +17,9 @@ from tractor.log import (
get_console_log, get_console_log,
get_logger, get_logger,
) )
log = get_logger(__name__)
log = get_logger()
_resource: int = 0 _resource: int = 0

View File

@ -37,7 +37,7 @@ from .ipc._uds import UDSAddress
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger() log = get_logger(__name__)
# TODO, maybe breakout the netns key to a struct? # TODO, maybe breakout the netns key to a struct?
@ -259,8 +259,6 @@ def wrap_address(
case _: case _:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
raise TypeError( raise TypeError(
f'Can not wrap unwrapped-address ??\n' f'Can not wrap unwrapped-address ??\n'
f'type(addr): {type(addr)!r}\n' f'type(addr): {type(addr)!r}\n'

View File

@ -1,48 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
(Hot) coad (re-)load utils for python.
'''
import importlib
from pathlib import Path
import sys
from types import ModuleType
# ?TODO, move this into internal libs?
# -[ ] we already use it in `modden.config._pymod` as well
def load_module_from_path(
path: Path,
module_name: str|None = None,
) -> ModuleType:
'''
Taken from SO,
https://stackoverflow.com/a/67208147
which is based on stdlib docs,
https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly
'''
module_name = module_name or path.stem
spec = importlib.util.spec_from_file_location(
module_name,
str(path),
)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module

View File

@ -70,7 +70,6 @@ from ._exceptions import (
MsgTypeError, MsgTypeError,
RemoteActorError, RemoteActorError,
StreamOverrun, StreamOverrun,
TransportClosed,
pack_from_raise, pack_from_raise,
unpack_error, unpack_error,
) )
@ -114,7 +113,7 @@ if TYPE_CHECKING:
CallerInfo, CallerInfo,
) )
log = get_logger() log = get_logger(__name__)
class Unresolved: class Unresolved:
@ -2392,18 +2391,16 @@ async def open_context_from_portal(
case trio.Cancelled(): case trio.Cancelled():
logmeth = log.cancel logmeth = log.cancel
cause: str = 'cancelled' cause: str = 'cancelled'
msg: str = (
f'ctx {ctx.side!r}-side {cause!r} with,\n'
f'{ctx.repr_outcome()!r}\n'
)
# XXX explicitly report on any non-graceful-taskc cases # XXX explicitly report on any non-graceful-taskc cases
case _: case _:
cause: str = 'errored' cause: str = 'errored'
logmeth = log.exception logmeth = log.exception
msg: str = f'ctx {ctx.side!r}-side {cause!r} with,\n'
logmeth(msg) logmeth(
f'ctx {ctx.side!r}-side {cause!r} with,\n'
f'{ctx.repr_outcome()!r}\n'
)
if debug_mode(): if debug_mode():
# async with debug.acquire_debug_lock(portal.actor.uid): # async with debug.acquire_debug_lock(portal.actor.uid):
@ -2429,7 +2426,10 @@ async def open_context_from_portal(
try: try:
# await pause(shield=True) # await pause(shield=True)
await ctx.cancel() await ctx.cancel()
except TransportClosed: except (
trio.BrokenResourceError,
trio.ClosedResourceError,
):
log.warning( log.warning(
'IPC connection for context is broken?\n' 'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n' f'task: {ctx.cid}\n'

View File

@ -53,7 +53,7 @@ if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger() log = get_logger(__name__)
@acm @acm
@ -91,13 +91,10 @@ async def get_registry(
@acm @acm
async def get_root(**kwargs) -> AsyncGenerator[Portal, None]: async def get_root(
''' **kwargs,
Deliver the current actor's "root process" actor (yes in actor ) -> AsyncGenerator[Portal, None]:
and proc tree terms) by delivering a `Portal` from the spawn-time
provided contact address.
'''
# TODO: rename mailbox to `_root_maddr` when we finally # TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs? # add and impl libp2p multi-addrs?
addr = _runtime_vars['_root_mailbox'] addr = _runtime_vars['_root_mailbox']
@ -196,11 +193,6 @@ async def maybe_open_portal(
addr: UnwrappedAddress, addr: UnwrappedAddress,
name: str, name: str,
): ):
'''
Open a `Portal` to the actor serving @ `addr` or `None` if no
peer can be contacted or found.
'''
async with query_actor( async with query_actor(
name=name, name=name,
regaddr=addr, regaddr=addr,

View File

@ -50,7 +50,7 @@ if TYPE_CHECKING:
from ._spawn import SpawnMethodKey from ._spawn import SpawnMethodKey
log = get_logger() log = get_logger(__name__)
def _mp_main( def _mp_main(
@ -72,15 +72,11 @@ def _mp_main(
spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method) spawn_ctx: mp.context.BaseContext = try_set_start_method(start_method)
assert spawn_ctx assert spawn_ctx
# XXX, enable root log at level
if actor.loglevel is not None: if actor.loglevel is not None:
log.info( log.info(
f'Setting loglevel for {actor.uid} to {actor.loglevel!r}' f'Setting loglevel for {actor.uid} to {actor.loglevel}'
)
get_console_log(
level=actor.loglevel,
name='tractor',
) )
get_console_log(actor.loglevel)
# TODO: use scops headers like for `trio` below! # TODO: use scops headers like for `trio` below!
# (well after we libify it maybe..) # (well after we libify it maybe..)
@ -130,12 +126,8 @@ def _trio_main(
parent_addr=parent_addr parent_addr=parent_addr
) )
# XXX, enable root log at level
if actor.loglevel is not None: if actor.loglevel is not None:
get_console_log( get_console_log(actor.loglevel)
level=actor.loglevel,
name='tractor',
)
log.info( log.info(
f'Starting `trio` subactor from parent @ ' f'Starting `trio` subactor from parent @ '
f'{parent_addr}\n' f'{parent_addr}\n'

View File

@ -69,7 +69,7 @@ from ._streaming import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger() log = get_logger(__name__)
class Portal: class Portal:
@ -329,7 +329,18 @@ class Portal:
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
return False return False
except TransportClosed as tpt_err: except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
TransportClosed,
) as tpt_err:
ipc_borked_report: str = ( ipc_borked_report: str = (
f'IPC for actor already closed/broken?\n\n' f'IPC for actor already closed/broken?\n\n'
f'\n' f'\n'

View File

@ -88,8 +88,7 @@ async def maybe_block_bp(
bp_blocked: bool bp_blocked: bool
if ( if (
debug_mode debug_mode
and and maybe_enable_greenback
maybe_enable_greenback
and ( and (
maybe_mod := await debug.maybe_init_greenback( maybe_mod := await debug.maybe_init_greenback(
raise_not_found=False, raise_not_found=False,
@ -290,12 +289,10 @@ async def open_root_actor(
for uw_addr in uw_reg_addrs for uw_addr in uw_reg_addrs
] ]
loglevel: str = ( loglevel = (
loglevel loglevel
or or log._default_loglevel
log._default_loglevel ).upper()
)
loglevel: str = loglevel.upper()
if ( if (
debug_mode debug_mode
@ -326,10 +323,7 @@ async def open_root_actor(
) )
assert loglevel assert loglevel
_log = log.get_console_log( _log = log.get_console_log(loglevel)
level=loglevel,
name='tractor',
)
assert _log assert _log
# TODO: factor this into `.devx._stackscope`!! # TODO: factor this into `.devx._stackscope`!!
@ -386,13 +380,10 @@ async def open_root_actor(
addr, addr,
) )
tpt_bind_addrs: list[ trans_bind_addrs: list[UnwrappedAddress] = []
Address # `Address.get_random()` case
|UnwrappedAddress # registrar case `= uw_reg_addrs`
] = []
# ------ NON-REGISTRAR ------ # Create a new local root-actor instance which IS NOT THE
# create a new root-actor instance. # REGISTRAR
if ponged_addrs: if ponged_addrs:
if ensure_registry: if ensure_registry:
raise RuntimeError( raise RuntimeError(
@ -419,21 +410,12 @@ async def open_root_actor(
# XXX INSTEAD, bind random addrs using the same tpt # XXX INSTEAD, bind random addrs using the same tpt
# proto. # proto.
for addr in ponged_addrs: for addr in ponged_addrs:
tpt_bind_addrs.append( trans_bind_addrs.append(
# XXX, these are `Address` NOT `UnwrappedAddress`.
#
# NOTE, in the case of posix/berkley socket
# protos we allocate port=0 such that the system
# allocates a random value at bind time; this
# happens in the `.ipc.*` stack's backend.
addr.get_random( addr.get_random(
bindspace=addr.bindspace, bindspace=addr.bindspace,
) )
) )
# ------ REGISTRAR ------
# create a new "registry providing" root-actor instance.
#
# Start this local actor as the "registrar", aka a regular # Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of # actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors. # other process-tree-local sub-actors.
@ -442,7 +424,7 @@ async def open_root_actor(
# following init steps are taken: # following init steps are taken:
# - the tranport layer server is bound to each addr # - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default. # pair defined in provided registry_addrs, or the default.
tpt_bind_addrs = uw_reg_addrs trans_bind_addrs = uw_reg_addrs
# - it is normally desirable for any registrar to stay up # - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub) # indefinitely until either all registered (child/sub)
@ -462,10 +444,20 @@ async def open_root_actor(
enable_modules=enable_modules, enable_modules=enable_modules,
) )
# XXX, in case the root actor runtime was actually run from # XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOT # `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `.trio.run()`. # `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio'] actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# NOTE, only set the loopback addr for the
# process-tree-global "root" mailbox since all sub-actors
# should be able to speak to their root actor over that
# channel.
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(trans_bind_addrs)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Start up main task set via core actor-runtime nurseries. # Start up main task set via core actor-runtime nurseries.
try: try:
# assign process-local actor # assign process-local actor
@ -502,39 +494,14 @@ async def open_root_actor(
# "actor runtime" primitives are SC-compat and thus all # "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as # transitively spawned actors/processes must be as
# well. # well.
accept_addrs: list[UnwrappedAddress] await root_tn.start(
reg_addrs: list[UnwrappedAddress]
(
accept_addrs,
reg_addrs,
) = await root_tn.start(
partial( partial(
_runtime.async_main, _runtime.async_main,
actor, actor,
accept_addrs=tpt_bind_addrs, accept_addrs=trans_bind_addrs,
parent_addr=None parent_addr=None
) )
) )
# NOTE, only set a local-host addr (i.e. like
# `lo`-loopback for TCP) for the process-tree-global
# "root"-process (its tree-wide "mailbox") since all
# sub-actors should be able to speak to their root
# actor over that channel.
#
# ?TODO, per-OS non-network-proto alt options?
# -[ ] on linux we should be able to always use UDS?
#
raddrs: list[UnwrappedAddress] = _state._runtime_vars['_root_addrs']
raddrs.extend(
accept_addrs,
)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# if 'chart' in actor.aid.name:
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
try: try:
yield actor yield actor
except ( except (
@ -616,13 +583,6 @@ async def open_root_actor(
): ):
_state._runtime_vars['_debug_mode'] = False _state._runtime_vars['_debug_mode'] = False
# !XXX, clear ALL prior contact info state, this is MEGA
# important if you are opening the runtime multiple times
# from the same parent process (like in our test
# harness)!
_state._runtime_vars['_root_addrs'].clear()
_state._runtime_vars['_root_mailbox'] = None
_state._current_actor = None _state._current_actor = None
_state._last_actor_terminated = actor _state._last_actor_terminated = actor

View File

@ -284,15 +284,6 @@ async def _errors_relayed_via_ipc(
try: try:
yield # run RPC invoke body yield # run RPC invoke body
# NOTE, never REPL any pseudo-expected tpt-disconnect.
except TransportClosed as err:
rpc_err = err
log.warning(
f'Tpt disconnect during remote-exc relay due to,\n'
f'{err!r}\n'
)
raise err
# box and ship RPC errors for wire-transit via # box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel. # the task's requesting parent IPC-channel.
except ( except (
@ -336,15 +327,10 @@ async def _errors_relayed_via_ipc(
# recovery logic - the only case is some kind of # recovery logic - the only case is some kind of
# strange bug in our transport layer itself? Going # strange bug in our transport layer itself? Going
# to keep this open ended for now. # to keep this open ended for now.
log.debug(
if _state.debug_mode(): 'RPC task crashed, attempting to enter debugger\n'
log.exception( f'|_{ctx}'
f'RPC task crashed!\n' )
f'Attempting to enter debugger\n'
f'\n'
f'{ctx}'
)
entered_debug = await debug._maybe_enter_pm( entered_debug = await debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
@ -433,7 +419,7 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
if is_rpc: if is_rpc:
log.warning( log.warning(
'RPC task likely crashed or cancelled before start?\n' 'RPC task likely errored or cancelled before start?\n'
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n' f' >> {ctx.repr_rpc}\n'
) )
@ -876,9 +862,9 @@ async def _invoke(
) )
logmeth( logmeth(
f'{message}' f'{message}\n'
f'\n' f'\n'
f'{descr_str}' f'{descr_str}\n'
) )
@ -914,11 +900,6 @@ async def try_ship_error_to_remote(
# XXX NOTE XXX in SC terms this is one of the worst things # XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma.. # that can happen and provides for a 2-general's dilemma..
#
# FURHTER, we should never really have to handle these
# lowlevel excs from `trio` since the `Channel.send()` layers
# downward should be mostly wrapping such cases in a
# tpt-closed; the `.critical()` usage is warranted.
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,

View File

@ -147,8 +147,6 @@ def get_mod_nsps2fps(mod_ns_paths: list[str]) -> dict[str, str]:
return nsp2fp return nsp2fp
_bp = False
class Actor: class Actor:
''' '''
The fundamental "runtime" concurrency primitive. The fundamental "runtime" concurrency primitive.
@ -183,14 +181,6 @@ class Actor:
def is_registrar(self) -> bool: def is_registrar(self) -> bool:
return self.is_arbiter return self.is_arbiter
@property
def is_root(self) -> bool:
'''
This actor is the parent most in the tree?
'''
return _state.is_root_process()
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`, # nursery placeholders filled in by `async_main()`,
@ -282,9 +272,7 @@ class Actor:
stacklevel=2, stacklevel=2,
) )
registry_addrs: list[Address] = [ registry_addrs: list[Address] = [wrap_address(arbiter_addr)]
wrap_address(arbiter_addr)
]
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -971,21 +959,6 @@ class Actor:
rvs['_is_root'] = False # obvi XD rvs['_is_root'] = False # obvi XD
# TODO, remove! left in just while protoing init fix!
# global _bp
# if (
# 'chart' in self.aid.name
# and
# isinstance(
# rvs['_root_addrs'][0],
# dict,
# )
# and
# not _bp
# ):
# _bp = True
# breakpoint()
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
# `SpawnSpec.reg_addrs` # `SpawnSpec.reg_addrs`
@ -1482,12 +1455,7 @@ async def async_main(
# be False when running as root actor and True when as # be False when running as root actor and True when as
# a subactor. # a subactor.
parent_addr: UnwrappedAddress|None = None, parent_addr: UnwrappedAddress|None = None,
task_status: TaskStatus[ task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
tuple[
list[UnwrappedAddress], # accept_addrs
list[UnwrappedAddress], # reg_addrs
]
] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
@ -1666,7 +1634,6 @@ async def async_main(
# if addresses point to the same actor.. # if addresses point to the same actor..
# So we need a way to detect that? maybe iterate # So we need a way to detect that? maybe iterate
# only on unique actor uids? # only on unique actor uids?
addr: UnwrappedAddress
for addr in actor.reg_addrs: for addr in actor.reg_addrs:
try: try:
waddr = wrap_address(addr) waddr = wrap_address(addr)
@ -1675,9 +1642,7 @@ async def async_main(
await debug.pause() await debug.pause()
# !TODO, get rid of the local-portal crap XD # !TODO, get rid of the local-portal crap XD
reg_portal: Portal
async with get_registry(addr) as reg_portal: async with get_registry(addr) as reg_portal:
accept_addr: UnwrappedAddress
for accept_addr in accept_addrs: for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr) accept_addr = wrap_address(accept_addr)
@ -1693,12 +1658,8 @@ async def async_main(
is_registered: bool = True is_registered: bool = True
# init steps complete, deliver IPC-server and # init steps complete
# registrar addrs back to caller. task_status.started()
task_status.started((
accept_addrs,
actor.reg_addrs,
))
# Begin handling our new connection back to our # Begin handling our new connection back to our
# parent. This is done last since we don't want to # parent. This is done last since we don't want to

View File

@ -38,7 +38,6 @@ import trio
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
RemoteActorError, RemoteActorError,
TransportClosed,
) )
from .log import get_logger from .log import get_logger
from .trionics import ( from .trionics import (
@ -60,7 +59,7 @@ if TYPE_CHECKING:
from .ipc import Channel from .ipc import Channel
log = get_logger() log = get_logger(__name__)
# TODO: the list # TODO: the list
@ -410,8 +409,10 @@ class MsgStream(trio.abc.Channel):
# it). # it).
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await self._ctx.send_stop() await self._ctx.send_stop()
except ( except (
TransportClosed, trio.BrokenResourceError,
trio.ClosedResourceError
) as re: ) as re:
# the underlying channel may already have been pulled # the underlying channel may already have been pulled
# in which case our stop message is meaningless since # in which case our stop message is meaningless since
@ -592,8 +593,9 @@ class MsgStream(trio.abc.Channel):
), ),
) )
except ( except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError, BrokenPipeError,
TransportClosed,
) as _trans_err: ) as _trans_err:
trans_err = _trans_err trans_err = _trans_err
if ( if (

View File

@ -62,7 +62,7 @@ if TYPE_CHECKING:
from .ipc import IPCServer from .ipc import IPCServer
log = get_logger() log = get_logger(__name__)
class ActorNursery: class ActorNursery:

View File

@ -49,7 +49,7 @@ from tractor.msg import (
import wrapt import wrapt
log = get_logger() log = get_logger(__name__)
# TODO: yeah, i don't love this and we should prolly just # TODO: yeah, i don't love this and we should prolly just
# write a decorator that actually keeps a stupid ref to the func # write a decorator that actually keeps a stupid ref to the func

View File

@ -51,7 +51,7 @@ from tractor import (
) )
from tractor.devx import debug from tractor.devx import debug
log = logmod.get_logger() log = logmod.get_logger(__name__)
if TYPE_CHECKING: if TYPE_CHECKING:

View File

@ -59,7 +59,7 @@ from ._sigint import (
_ctlc_ignore_header as _ctlc_ignore_header _ctlc_ignore_header as _ctlc_ignore_header
) )
log = get_logger() log = get_logger(__name__)
# ---------------- # ----------------
# XXX PKG TODO XXX # XXX PKG TODO XXX

View File

@ -84,7 +84,7 @@ _crash_msg: str = (
'Opening a pdb REPL in crashed actor' 'Opening a pdb REPL in crashed actor'
) )
log = get_logger() log = get_logger(__package__)
class BoxedMaybeException(Struct): class BoxedMaybeException(Struct):

View File

@ -47,7 +47,7 @@ if TYPE_CHECKING:
Actor, Actor,
) )
log = get_logger() log = get_logger(__name__)
_ctlc_ignore_header: str = ( _ctlc_ignore_header: str = (
'Ignoring SIGINT while debug REPL in use' 'Ignoring SIGINT while debug REPL in use'

View File

@ -58,7 +58,7 @@ from ._sigint import (
_ctlc_ignore_header as _ctlc_ignore_header _ctlc_ignore_header as _ctlc_ignore_header
) )
log = get_logger() log = get_logger(__package__)
async def maybe_wait_for_debugger( async def maybe_wait_for_debugger(

View File

@ -93,7 +93,7 @@ if TYPE_CHECKING:
# from ._post_mortem import BoxedMaybeException # from ._post_mortem import BoxedMaybeException
from ._repl import PdbREPL from ._repl import PdbREPL
log = get_logger() log = get_logger(__package__)
_pause_msg: str = 'Opening a pdb REPL in paused actor' _pause_msg: str = 'Opening a pdb REPL in paused actor'
_repl_fail_msg: str|None = ( _repl_fail_msg: str|None = (
@ -628,7 +628,7 @@ def _set_trace(
log.pdb( log.pdb(
f'{_pause_msg}\n' f'{_pause_msg}\n'
f'>(\n' f'>(\n'
f'|_{actor.aid.uid}\n' f'|_{actor.uid}\n'
f' |_{task}\n' # @ {actor.uid}\n' f' |_{task}\n' # @ {actor.uid}\n'
# f'|_{task}\n' # f'|_{task}\n'
# ^-TODO-^ more compact pformating? # ^-TODO-^ more compact pformating?
@ -1257,26 +1257,3 @@ async def breakpoint(
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
**kwargs, **kwargs,
) )
async def maybe_pause_bp():
'''
Internal (ONLY for now) `breakpoint()`-er fn which only tries to
use the multi-actor `.pause()` API when the current actor is the
root.
?! BUT WHY !?
-------
This is useful when debugging cases where the tpt layer breaks
(or is intentionally broken, say during resiliency testing) in
the case where a child can no longer contact the root process to
acquire the process-tree-singleton TTY lock.
'''
import tractor
actor = tractor.current_actor()
if actor.aid.name == 'root':
await tractor.pause(shield=True)
else:
tractor.devx.mk_pdb().set_trace()

View File

@ -81,7 +81,7 @@ if TYPE_CHECKING:
BoxedMaybeException, BoxedMaybeException,
) )
log = get_logger() log = get_logger(__name__)
class LockStatus( class LockStatus(

View File

@ -60,7 +60,7 @@ if TYPE_CHECKING:
from ._transport import MsgTransport from ._transport import MsgTransport
log = get_logger() log = get_logger(__name__)
_is_windows = platform.system() == 'Windows' _is_windows = platform.system() == 'Windows'
@ -307,12 +307,7 @@ class Channel:
) -> None: ) -> None:
''' '''
Send a coded msg-blob over the underlying IPC transport. Send a coded msg-blob over the transport.
This fn raises `TransportClosed` on comms failures and is
normally handled by higher level runtime machinery for the
expected-graceful cases, normally ephemercal
(re/dis)connects.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -339,10 +334,9 @@ class Channel:
except KeyError: except KeyError:
raise err raise err
case TransportClosed(): case TransportClosed():
src_exc_str: str = err.repr_src_exc()
log.transport( log.transport(
f'Transport stream closed due to,\n' f'Transport stream closed due to\n'
f'{src_exc_str}' f'{err.repr_src_exc()}\n'
) )
case _: case _:
@ -351,11 +345,6 @@ class Channel:
raise raise
async def recv(self) -> Any: async def recv(self) -> Any:
'''
Receive the latest (queued) msg-blob from the underlying IPC
transport.
'''
assert self._transport assert self._transport
return await self._transport.recv() return await self._transport.recv()
@ -429,18 +418,16 @@ class Channel:
self self
) -> AsyncGenerator[Any, None]: ) -> AsyncGenerator[Any, None]:
''' '''
Yield `MsgType` IPC msgs decoded and deliverd from an Yield `MsgType` IPC msgs decoded and deliverd from
underlying `MsgTransport` protocol. an underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an This is a streaming routine alo implemented as an async-gen
async-generator func (same a `MsgTransport._iter_pkts()`) func (same a `MsgTransport._iter_pkts()`) gets allocated by
gets allocated by a `.__call__()` inside `.__init__()` where a `.__call__()` inside `.__init__()` where it is assigned to
it is assigned to the `._aiter_msgs` attr. the `._aiter_msgs` attr.
''' '''
if not self._transport: assert self._transport
raise RuntimeError('No IPC transport initialized!?')
while True: while True:
try: try:
async for msg in self._transport: async for msg in self._transport:
@ -475,15 +462,7 @@ class Channel:
# continue # continue
def connected(self) -> bool: def connected(self) -> bool:
''' return self._transport.connected() if self._transport else False
Predicate whether underlying IPC tpt is connected.
'''
return (
self._transport.connected()
if self._transport
else False
)
async def _do_handshake( async def _do_handshake(
self, self,
@ -514,11 +493,8 @@ async def _connect_chan(
addr: UnwrappedAddress addr: UnwrappedAddress
) -> typing.AsyncGenerator[Channel, None]: ) -> typing.AsyncGenerator[Channel, None]:
''' '''
Create and connect a `Channel` to the provided `addr`, disconnect Create and connect a channel with disconnect on context manager
it on cm exit. teardown.
NOTE, this is a lowlevel, normally internal-only iface. You
should likely use `.open_portal()` instead.
''' '''
chan = await Channel.from_addr(addr) chan = await Channel.from_addr(addr)

View File

@ -72,7 +72,7 @@ if TYPE_CHECKING:
from .._supervise import ActorNursery from .._supervise import ActorNursery
log = log.get_logger() log = log.get_logger(__name__)
async def maybe_wait_on_canced_subs( async def maybe_wait_on_canced_subs(

View File

@ -59,7 +59,7 @@ except ImportError:
pass pass
log = get_logger() log = get_logger(__name__)
SharedMemory = disable_mantracker() SharedMemory = disable_mantracker()

View File

@ -41,7 +41,7 @@ from tractor.ipc._transport import (
) )
log = get_logger() log = get_logger(__name__)
class TCPAddress( class TCPAddress(

View File

@ -56,7 +56,7 @@ from tractor.msg import (
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor._addr import Address from tractor._addr import Address
log = get_logger() log = get_logger(__name__)
# (codec, transport) # (codec, transport)
@ -154,6 +154,7 @@ class MsgTransport(Protocol):
# ... # ...
class MsgpackTransport(MsgTransport): class MsgpackTransport(MsgTransport):
# TODO: better naming for this? # TODO: better naming for this?
@ -277,18 +278,14 @@ class MsgpackTransport(MsgTransport):
except trio.ClosedResourceError as cre: except trio.ClosedResourceError as cre:
closure_err = cre closure_err = cre
# await tractor.devx._trace.maybe_pause_bp()
raise TransportClosed( raise TransportClosed(
message=( message=(
f'{tpt_name} was already closed locally?' f'{tpt_name} was already closed locally ?\n'
), ),
src_exc=closure_err, src_exc=closure_err,
loglevel='error', loglevel='error',
raise_on_report=( raise_on_report=(
'another task closed this fd' 'another task closed this fd' in closure_err.args
in
closure_err.args
), ),
) from closure_err ) from closure_err
@ -438,11 +435,6 @@ class MsgpackTransport(MsgTransport):
trans_err = _re trans_err = _re
tpt_name: str = f'{type(self).__name__!r}' tpt_name: str = f'{type(self).__name__!r}'
trans_err_msg: str = trans_err.args[0]
by_whom: str = {
'another task closed this fd': 'locally',
'this socket was already closed': 'by peer',
}.get(trans_err_msg)
match trans_err: match trans_err:
# XXX, specifc to UDS transport and its, # XXX, specifc to UDS transport and its,
@ -454,42 +446,38 @@ class MsgpackTransport(MsgTransport):
case trio.BrokenResourceError() if ( case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' '[Errno 32] Broken pipe'
in in
trans_err_msg trans_err.args[0]
): ):
tpt_closed = TransportClosed.from_src_exc( tpt_closed = TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} already closed by peer\n'
), ),
body=f'{self}', body=f'{self}\n',
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
) )
raise tpt_closed from trans_err raise tpt_closed from trans_err
# ??TODO??, what case in piker does this and HOW # case trio.ClosedResourceError() if (
# CAN WE RE-PRODUCE IT?!?!? # 'this socket was already closed'
case trio.ClosedResourceError() if ( # in
by_whom # trans_err.args[0]
): # ):
tpt_closed = TransportClosed.from_src_exc( # tpt_closed = TransportClosed.from_src_exc(
message=( # message=(
f'{tpt_name} was already closed {by_whom!r}?\n' # f'{tpt_name} already closed by peer\n'
), # ),
body=f'{self}', # body=f'{self}\n',
src_exc=trans_err, # src_exc=trans_err,
raise_on_report=True, # raise_on_report=True,
loglevel='transport', # loglevel='transport',
) # )
# raise tpt_closed from trans_err
# await tractor.devx._trace.maybe_pause_bp() # unless the disconnect condition falls under "a
raise tpt_closed from trans_err # normal operation breakage" we usualy console warn
# about it.
# XXX, unless the disconnect condition falls
# under "a normal/expected operating breakage"
# (per the `trans_err_msg` guards in the cases
# above) we usualy console-error about it and
# raise-thru. about it.
case _: case _:
log.exception( log.exception(
f'{tpt_name} layer failed pre-send ??\n' f'{tpt_name} layer failed pre-send ??\n'

View File

@ -63,7 +63,7 @@ if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger() log = get_logger(__name__)
def unwrap_sockpath( def unwrap_sockpath(
@ -166,10 +166,6 @@ class UDSAddress(
) )
if actor: if actor:
sockname: str = '::'.join(actor.uid) + f'@{pid}' sockname: str = '::'.join(actor.uid) + f'@{pid}'
# ?^TODO, for `multiaddr`'s parser we can't use the `::`
# above^, SO maybe a `.` or something else here?
# sockname: str = '.'.join(actor.uid) + f'@{pid}'
# -[ ] CURRENTLY using `.` BREAKS TEST SUITE tho..
else: else:
prefix: str = '<unknown-actor>' prefix: str = '<unknown-actor>'
if is_root_process(): if is_root_process():

View File

@ -14,23 +14,11 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' """
An enhanced logging subsys. Log like a forester!
An extended logging layer using (for now) the stdlib's `logging` """
+ `colorlog` which embeds concurrency-primitive/runtime info into
records (headers) to help you better grok your distributed systems
built on `tractor`.
'''
from collections.abc import Mapping from collections.abc import Mapping
from functools import partial
from inspect import (
FrameInfo,
getmodule,
stack,
)
import sys import sys
import logging import logging
from logging import ( from logging import (
@ -38,24 +26,20 @@ from logging import (
Logger, Logger,
StreamHandler, StreamHandler,
) )
from types import ModuleType
import warnings
import colorlog # type: ignore import colorlog # type: ignore
# ?TODO, some other (modern) alt libs?
# import coloredlogs
# import colored_traceback.auto # ?TODO, need better config?
import trio import trio
from ._state import current_actor from ._state import current_actor
_proj_name: str = 'tractor'
_default_loglevel: str = 'ERROR' _default_loglevel: str = 'ERROR'
# Super sexy formatting thanks to ``colorlog``. # Super sexy formatting thanks to ``colorlog``.
# (NOTE: we use the '{' format style) # (NOTE: we use the '{' format style)
# Here, `thin_white` is just the layperson's gray. # Here, `thin_white` is just the layperson's gray.
LOG_FORMAT: str = ( LOG_FORMAT = (
# "{bold_white}{log_color}{asctime}{reset}" # "{bold_white}{log_color}{asctime}{reset}"
"{log_color}{asctime}{reset}" "{log_color}{asctime}{reset}"
" {bold_white}{thin_white}({reset}" " {bold_white}{thin_white}({reset}"
@ -67,7 +51,7 @@ LOG_FORMAT: str = (
" {reset}{bold_white}{thin_white}{message}" " {reset}{bold_white}{thin_white}{message}"
) )
DATE_FORMAT: str = '%b %d %H:%M:%S' DATE_FORMAT = '%b %d %H:%M:%S'
# FYI, ERROR is 40 # FYI, ERROR is 40
# TODO: use a `bidict` to avoid the :155 check? # TODO: use a `bidict` to avoid the :155 check?
@ -91,10 +75,7 @@ STD_PALETTE = {
'TRANSPORT': 'cyan', 'TRANSPORT': 'cyan',
} }
BOLD_PALETTE: dict[ BOLD_PALETTE = {
str,
dict[int, str],
] = {
'bold': { 'bold': {
level: f"bold_{color}" for level, color in STD_PALETTE.items()} level: f"bold_{color}" for level, color in STD_PALETTE.items()}
} }
@ -116,26 +97,9 @@ def at_least_level(
return False return False
# TODO, compare with using a "filter" instead? # TODO: this isn't showing the correct '{filename}'
# - https://stackoverflow.com/questions/60691759/add-information-to-every-log-message-in-python-logging/61830838#61830838 # as it did before..
# |_corresponding dict-config,
# https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig/7507842#7507842
# - [ ] what's the benefit/tradeoffs?
#
class StackLevelAdapter(LoggerAdapter): class StackLevelAdapter(LoggerAdapter):
'''
A (software) stack oriented logger "adapter".
'''
@property
def level(self) -> str:
'''
The currently set `str` emit level (in lowercase).
'''
return logging.getLevelName(
self.getEffectiveLevel()
).lower()
def at_least_level( def at_least_level(
self, self,
@ -284,14 +248,9 @@ def pformat_task_uid(
return f'{task.name}[{tid_part}]' return f'{task.name}[{tid_part}]'
_curr_actor_no_exc = partial(
current_actor,
err_on_no_runtime=False,
)
_conc_name_getters = { _conc_name_getters = {
'task': pformat_task_uid, 'task': pformat_task_uid,
'actor': lambda: _curr_actor_no_exc(), 'actor': lambda: current_actor(),
'actor_name': lambda: current_actor().name, 'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6], 'actor_uid': lambda: current_actor().uid[1][:6],
} }
@ -323,16 +282,9 @@ class ActorContextInfo(Mapping):
return f'no {key} context' return f'no {key} context'
_proj_name: str = 'tractor'
def get_logger( def get_logger(
name: str|None = None, name: str|None = None,
# ^NOTE, setting `name=_proj_name=='tractor'` enables the "root _root_name: str = _proj_name,
# logger" for `tractor` itself.
pkg_name: str = _proj_name,
# XXX, deprecated, use ^
_root_name: str|None = None,
logger: Logger|None = None, logger: Logger|None = None,
@ -341,287 +293,49 @@ def get_logger(
# |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig # |_https://stackoverflow.com/questions/7507825/where-is-a-complete-example-of-logging-config-dictconfig
# |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema # |_https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema
subsys_spec: str|None = None, subsys_spec: str|None = None,
mk_sublog: bool = True,
_strict_debug: bool = False,
) -> StackLevelAdapter: ) -> StackLevelAdapter:
''' '''
Return the `tractor`-library root logger or a sub-logger for Return the `tractor`-library root logger or a sub-logger for
`name` if provided. `name` if provided.
When `name` is left null we try to auto-detect the caller's
`mod.__name__` and use that as a the sub-logger key.
This allows for example creating a module level instance like,
.. code:: python
log = tractor.log.get_logger(_root_name='mylib')
and by default all console record headers will show the caller's
(of any `log.<level>()`-method) correct sub-pkg's
+ py-module-file.
''' '''
if _root_name:
msg: str = (
'The `_root_name: str` param of `get_logger()` is now deprecated.\n'
'Use the new `pkg_name: str` instead, it is the same usage.\n'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
pkg_name: str = _root_name
def get_caller_mod(
frames_up:int = 2
):
'''
Attempt to get the module which called `tractor.get_logger()`.
'''
callstack: list[FrameInfo] = stack()
caller_fi: FrameInfo = callstack[frames_up]
caller_mod: ModuleType = getmodule(caller_fi.frame)
return caller_mod
# --- Auto--naming-CASE ---
# -------------------------
# Implicitly introspect the caller's module-name whenever `name`
# if left as the null default.
#
# When the `pkg_name` is `in` in the `mod.__name__` we presume
# this instance can be created as a sub-`StackLevelAdapter` and
# that the intention is to get free module-path tracing and
# filtering (well once we implement that) oriented around the
# py-module code hierarchy of the consuming project.
#
if (
mk_sublog
and
name is None
and
pkg_name
):
if (caller_mod := get_caller_mod()):
# ?XXX how is this `caller_mod.__name__` defined?
# => well by how the mod is imported.. XD
# |_https://stackoverflow.com/a/15883682
#
# if pkg_name in caller_mod.__package__:
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
mod_ns_path: str = caller_mod.__name__
mod_pkg_ns_path: str = caller_mod.__package__
if (
mod_pkg_ns_path in mod_ns_path
or
pkg_name in mod_ns_path
):
# proper_mod_name = mod_ns_path.lstrip(
proper_mod_name = mod_pkg_ns_path.removeprefix(
f'{pkg_name}.'
)
name = proper_mod_name
elif (
pkg_name
# and
# pkg_name in mod_ns_path
):
name = mod_ns_path
if _strict_debug:
msg: str = (
f'@ {get_caller_mod()}\n'
f'Generating sub-logger name,\n'
f'{pkg_name}.{name}\n'
)
if _curr_actor_no_exc():
_root_log.debug(msg)
elif pkg_name != _proj_name:
print(
f'=> tractor.log.get_logger():\n'
f'{msg}\n'
)
# build a root logger instance
log: Logger log: Logger
rlog = log = ( log = rlog = logger or logging.getLogger(_root_name)
logger
or
logging.getLogger(pkg_name)
)
# XXX, lowlevel debuggin..
# if pkg_name != _proj_name:
# from tractor.devx.debug import mk_pdb
# mk_pdb().set_trace()
# NOTE: for handling for modules that use the unecessary,
# `get_logger(__name__)`
#
# we make the following stylistic choice:
# - always avoid duplicate project-package token
# in msg output: i.e. tractor.tractor.ipc._chan.py in header
# looks ridiculous XD
# - never show the leaf module name in the {name} part
# since in python the {filename} is always this same
# module-file.
if ( if (
name name
and and
# ?TODO? more correct? name != _proj_name
# _proj_name not in name
name != pkg_name
): ):
# ex. modden.runtime.progman
# -> rname='modden', _, pkg_path='runtime.progman'
if (
pkg_name
and
pkg_name in name
):
proper_name: str = name.removeprefix(
f'{pkg_name}.'
)
msg: str = (
f'@ {get_caller_mod()}\n'
f'Duplicate pkg-name in sub-logger `name`-key?\n'
f'pkg_name = {pkg_name!r}\n'
f'name = {name!r}\n'
f'\n'
f'=> You should change your input params to,\n'
f'get_logger(\n'
f' pkg_name={pkg_name!r}\n'
f' name={proper_name!r}\n'
f')'
)
# assert _duplicate == rname
if _curr_actor_no_exc():
_root_log.warning(msg)
else:
print(
f'=> tractor.log.get_logger() ERROR:\n'
f'{msg}\n'
)
name = proper_name # NOTE: for handling for modules that use `get_logger(__name__)`
# we make the following stylistic choice:
# - always avoid duplicate project-package token
# in msg output: i.e. tractor.tractor.ipc._chan.py in header
# looks ridiculous XD
# - never show the leaf module name in the {name} part
# since in python the {filename} is always this same
# module-file.
rname: str = pkg_name sub_name: None|str = None
pkg_path: str = name rname, _, sub_name = name.partition('.')
pkgpath, _, modfilename = sub_name.rpartition('.')
# NOTE: for tractor itself never include the last level
# module key in the name such that something like: eg.
# 'tractor.trionics._broadcast` only includes the first
# 2 tokens in the (coloured) name part.
if rname == 'tractor':
sub_name = pkgpath
# ( if _root_name in sub_name:
# rname, duplicate, _, sub_name = sub_name.partition('.')
# _,
# pkg_path,
# ) = name.partition('.')
# For ex. 'modden.runtime.progman' if not sub_name:
# -> pkgpath='runtime', _, leaf_mod='progman'
(
subpkg_path,
_,
leaf_mod,
) = pkg_path.rpartition('.')
# NOTE: special usage for passing `name=__name__`,
#
# - remove duplication of any root-pkg-name in the
# (sub/child-)logger name; i.e. never include the
# `pkg_name` *twice* in the top-most-pkg-name/level
#
# -> this happens normally since it is added to `.getChild()`
# and as the name of its root-logger.
#
# => So for ex. (module key in the name) something like
# `name='tractor.trionics._broadcast` is passed,
# only includes the first 2 sub-pkg name-tokens in the
# child-logger's name; the colored "pkg-namespace" header
# will then correctly show the same value as `name`.
if (
# XXX, TRY to remove duplication cases
# which get warn-logged on below!
(
# when, subpkg_path == pkg_path
subpkg_path
and
rname == pkg_name
)
# ) or (
# # when, pkg_path == leaf_mod
# pkg_path
# and
# leaf_mod == pkg_path
# )
):
pkg_path = subpkg_path
# XXX, do some double-checks for duplication of,
# - root-pkg-name, already in root logger
# - leaf-module-name already in `{filename}` header-field
if (
_strict_debug
and
pkg_name
and
pkg_name in pkg_path
):
_duplicate, _, pkg_path = pkg_path.partition('.')
if _duplicate:
msg: str = (
f'@ {get_caller_mod()}\n'
f'Duplicate pkg-name in sub-logger key?\n'
f'pkg_name = {pkg_name!r}\n'
f'pkg_path = {pkg_path!r}\n'
)
# assert _duplicate == rname
if _curr_actor_no_exc():
_root_log.warning(msg)
else:
print(
f'=> tractor.log.get_logger() ERROR:\n'
f'{msg}\n'
)
# XXX, should never get here?
breakpoint()
if (
_strict_debug
and
leaf_mod
and
leaf_mod in pkg_path
):
msg: str = (
f'@ {get_caller_mod()}\n'
f'Duplicate leaf-module-name in sub-logger key?\n'
f'leaf_mod = {leaf_mod!r}\n'
f'pkg_path = {pkg_path!r}\n'
)
if _curr_actor_no_exc():
_root_log.warning(msg)
else:
print(
f'=> tractor.log.get_logger() ERROR:\n'
f'{msg}\n'
)
# mk/get underlying (sub-)`Logger`
if (
not pkg_path
and
leaf_mod == pkg_name
):
# breakpoint()
log = rlog log = rlog
else:
elif mk_sublog: log = rlog.getChild(sub_name)
# breakpoint()
log = rlog.getChild(pkg_path)
log.level = rlog.level log.level = rlog.level
@ -636,13 +350,8 @@ def get_logger(
for name, val in CUSTOM_LEVELS.items(): for name, val in CUSTOM_LEVELS.items():
logging.addLevelName(val, name) logging.addLevelName(val, name)
# ensure our custom adapter levels exist as methods # ensure customs levels exist as methods
assert getattr( assert getattr(logger, name.lower()), f'Logger does not define {name}'
logger,
name.lower()
), (
f'Logger does not define {name}'
)
return logger return logger
@ -716,4 +425,4 @@ def get_loglevel() -> str:
# global module logger for tractor itself # global module logger for tractor itself
_root_log: StackLevelAdapter = get_logger('tractor') log: StackLevelAdapter = get_logger('tractor')

View File

@ -68,7 +68,7 @@ from tractor.log import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor._context import Context from tractor._context import Context
log = get_logger() log = get_logger(__name__)
# TODO: unify with `MsgCodec` by making `._dec` part this? # TODO: unify with `MsgCodec` by making `._dec` part this?

View File

@ -77,7 +77,7 @@ if TYPE_CHECKING:
from tractor._streaming import MsgStream from tractor._streaming import MsgStream
log = get_logger() log = get_logger(__name__)
_def_any_pldec: MsgDec[Any] = mk_dec(spec=Any) _def_any_pldec: MsgDec[Any] = mk_dec(spec=Any)

View File

@ -51,7 +51,7 @@ from tractor.log import get_logger
# from tractor._addr import UnwrappedAddress # from tractor._addr import UnwrappedAddress
log = get_logger() log = get_logger('tractor.msgspec')
# type variable for the boxed payload field `.pld` # type variable for the boxed payload field `.pld`
PayloadT = TypeVar('PayloadT') PayloadT = TypeVar('PayloadT')
@ -202,10 +202,7 @@ class SpawnSpec(
# TODO: similar to the `Start` kwargs spec needed below, we need # TODO: similar to the `Start` kwargs spec needed below, we need
# a hard `Struct` def for all of these fields! # a hard `Struct` def for all of these fields!
_parent_main_data: dict _parent_main_data: dict
_runtime_vars: ( _runtime_vars: dict[str, Any]
dict[str, Any]
#|RuntimeVars # !TODO
)
# ^NOTE see `._state._runtime_vars: dict` # ^NOTE see `._state._runtime_vars: dict`
# module import capability # module import capability

View File

@ -71,7 +71,7 @@ from outcome import (
Outcome, Outcome,
) )
log: StackLevelAdapter = get_logger() log: StackLevelAdapter = get_logger(__name__)
__all__ = [ __all__ = [

View File

@ -42,7 +42,7 @@ from trio.lowlevel import current_task
from msgspec import Struct from msgspec import Struct
from tractor.log import get_logger from tractor.log import get_logger
log = get_logger() log = get_logger(__name__)
# TODO: use new type-vars syntax from 3.12 # TODO: use new type-vars syntax from 3.12
# https://realpython.com/python312-new-features/#dedicated-type-variable-syntax # https://realpython.com/python312-new-features/#dedicated-type-variable-syntax

View File

@ -49,7 +49,7 @@ if TYPE_CHECKING:
from tractor import ActorNursery from tractor import ActorNursery
log = get_logger() log = get_logger(__name__)
# A regular invariant generic type # A regular invariant generic type
T = TypeVar("T") T = TypeVar("T")

View File

@ -34,7 +34,7 @@ from typing import (
import trio import trio
from tractor.log import get_logger from tractor.log import get_logger
log = get_logger() log = get_logger(__name__)
if TYPE_CHECKING: if TYPE_CHECKING:
@ -246,12 +246,23 @@ async def maybe_raise_from_masking_exc(
type(exc_match) # masked type type(exc_match) # masked type
) )
# Add to masked `exc_ctx`
if do_warn: if do_warn:
exc_ctx.add_note(note) exc_ctx.add_note(note)
# don't unmask already known "special" cases.. if (
do_warn
and
type(exc_match) in always_warn_on
):
log.warning(note)
if (
do_warn
and
raise_unmasked
):
if len(masked) < 2: if len(masked) < 2:
# don't unmask already known "special" cases..
if ( if (
_mask_cases _mask_cases
and and
@ -272,26 +283,11 @@ async def maybe_raise_from_masking_exc(
) )
raise exc_match raise exc_match
# ^?TODO, see above but, possibly unmasking sub-exc raise exc_ctx from exc_match
# entries if there are > 1
# else:
# await pause(shield=True)
if type(exc_match) in always_warn_on:
import traceback
trace: list[str] = traceback.format_exception(
type(exc_ctx),
exc_ctx,
exc_ctx.__traceback__
)
tb_str: str = ''.join(trace)
log.warning(tb_str)
# XXX, for debug
# from tractor import pause
# await pause(shield=True)
if raise_unmasked:
raise exc_ctx from exc_match
# ??TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
# else:
# await pause(shield=True)
else: else:
raise raise

62
uv.lock
View File

@ -1,5 +1,5 @@
version = 1 version = 1
revision = 3 revision = 2
requires-python = ">=3.11" requires-python = ">=3.11"
[[package]] [[package]]
@ -329,32 +329,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/30/3d/64ad57c803f1fa1e963a7946b6e0fea4a70df53c1a7fed304586539c2bac/pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820", size = 343634, upload-time = "2025-03-02T12:54:52.069Z" }, { url = "https://files.pythonhosted.org/packages/30/3d/64ad57c803f1fa1e963a7946b6e0fea4a70df53c1a7fed304586539c2bac/pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820", size = 343634, upload-time = "2025-03-02T12:54:52.069Z" },
] ]
[[package]]
name = "ruff"
version = "0.14.14"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2e/06/f71e3a86b2df0dfa2d2f72195941cd09b44f87711cb7fa5193732cb9a5fc/ruff-0.14.14.tar.gz", hash = "sha256:2d0f819c9a90205f3a867dbbd0be083bee9912e170fd7d9704cc8ae45824896b", size = 4515732, upload-time = "2026-01-22T22:30:17.527Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d2/89/20a12e97bc6b9f9f68343952da08a8099c57237aef953a56b82711d55edd/ruff-0.14.14-py3-none-linux_armv6l.whl", hash = "sha256:7cfe36b56e8489dee8fbc777c61959f60ec0f1f11817e8f2415f429552846aed", size = 10467650, upload-time = "2026-01-22T22:30:08.578Z" },
{ url = "https://files.pythonhosted.org/packages/a3/b1/c5de3fd2d5a831fcae21beda5e3589c0ba67eec8202e992388e4b17a6040/ruff-0.14.14-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6006a0082336e7920b9573ef8a7f52eec837add1265cc74e04ea8a4368cd704c", size = 10883245, upload-time = "2026-01-22T22:30:04.155Z" },
{ url = "https://files.pythonhosted.org/packages/b8/7c/3c1db59a10e7490f8f6f8559d1db8636cbb13dccebf18686f4e3c9d7c772/ruff-0.14.14-py3-none-macosx_11_0_arm64.whl", hash = "sha256:026c1d25996818f0bf498636686199d9bd0d9d6341c9c2c3b62e2a0198b758de", size = 10231273, upload-time = "2026-01-22T22:30:34.642Z" },
{ url = "https://files.pythonhosted.org/packages/a1/6e/5e0e0d9674be0f8581d1f5e0f0a04761203affce3232c1a1189d0e3b4dad/ruff-0.14.14-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f666445819d31210b71e0a6d1c01e24447a20b85458eea25a25fe8142210ae0e", size = 10585753, upload-time = "2026-01-22T22:30:31.781Z" },
{ url = "https://files.pythonhosted.org/packages/23/09/754ab09f46ff1884d422dc26d59ba18b4e5d355be147721bb2518aa2a014/ruff-0.14.14-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:3c0f18b922c6d2ff9a5e6c3ee16259adc513ca775bcf82c67ebab7cbd9da5bc8", size = 10286052, upload-time = "2026-01-22T22:30:24.827Z" },
{ url = "https://files.pythonhosted.org/packages/c8/cc/e71f88dd2a12afb5f50733851729d6b571a7c3a35bfdb16c3035132675a0/ruff-0.14.14-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1629e67489c2dea43e8658c3dba659edbfd87361624b4040d1df04c9740ae906", size = 11043637, upload-time = "2026-01-22T22:30:13.239Z" },
{ url = "https://files.pythonhosted.org/packages/67/b2/397245026352494497dac935d7f00f1468c03a23a0c5db6ad8fc49ca3fb2/ruff-0.14.14-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:27493a2131ea0f899057d49d303e4292b2cae2bb57253c1ed1f256fbcd1da480", size = 12194761, upload-time = "2026-01-22T22:30:22.542Z" },
{ url = "https://files.pythonhosted.org/packages/5b/06/06ef271459f778323112c51b7587ce85230785cd64e91772034ddb88f200/ruff-0.14.14-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:01ff589aab3f5b539e35db38425da31a57521efd1e4ad1ae08fc34dbe30bd7df", size = 12005701, upload-time = "2026-01-22T22:30:20.499Z" },
{ url = "https://files.pythonhosted.org/packages/41/d6/99364514541cf811ccc5ac44362f88df66373e9fec1b9d1c4cc830593fe7/ruff-0.14.14-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1cc12d74eef0f29f51775f5b755913eb523546b88e2d733e1d701fe65144e89b", size = 11282455, upload-time = "2026-01-22T22:29:59.679Z" },
{ url = "https://files.pythonhosted.org/packages/ca/71/37daa46f89475f8582b7762ecd2722492df26421714a33e72ccc9a84d7a5/ruff-0.14.14-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bb8481604b7a9e75eff53772496201690ce2687067e038b3cc31aaf16aa0b974", size = 11215882, upload-time = "2026-01-22T22:29:57.032Z" },
{ url = "https://files.pythonhosted.org/packages/2c/10/a31f86169ec91c0705e618443ee74ede0bdd94da0a57b28e72db68b2dbac/ruff-0.14.14-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:14649acb1cf7b5d2d283ebd2f58d56b75836ed8c6f329664fa91cdea19e76e66", size = 11180549, upload-time = "2026-01-22T22:30:27.175Z" },
{ url = "https://files.pythonhosted.org/packages/fd/1e/c723f20536b5163adf79bdd10c5f093414293cdf567eed9bdb7b83940f3f/ruff-0.14.14-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:e8058d2145566510790eab4e2fad186002e288dec5e0d343a92fe7b0bc1b3e13", size = 10543416, upload-time = "2026-01-22T22:30:01.964Z" },
{ url = "https://files.pythonhosted.org/packages/3e/34/8a84cea7e42c2d94ba5bde1d7a4fae164d6318f13f933d92da6d7c2041ff/ruff-0.14.14-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:e651e977a79e4c758eb807f0481d673a67ffe53cfa92209781dfa3a996cf8412", size = 10285491, upload-time = "2026-01-22T22:30:29.51Z" },
{ url = "https://files.pythonhosted.org/packages/55/ef/b7c5ea0be82518906c978e365e56a77f8de7678c8bb6651ccfbdc178c29f/ruff-0.14.14-py3-none-musllinux_1_2_i686.whl", hash = "sha256:cc8b22da8d9d6fdd844a68ae937e2a0adf9b16514e9a97cc60355e2d4b219fc3", size = 10733525, upload-time = "2026-01-22T22:30:06.499Z" },
{ url = "https://files.pythonhosted.org/packages/6a/5b/aaf1dfbcc53a2811f6cc0a1759de24e4b03e02ba8762daabd9b6bd8c59e3/ruff-0.14.14-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:16bc890fb4cc9781bb05beb5ab4cd51be9e7cb376bf1dd3580512b24eb3fda2b", size = 11315626, upload-time = "2026-01-22T22:30:36.848Z" },
{ url = "https://files.pythonhosted.org/packages/2c/aa/9f89c719c467dfaf8ad799b9bae0df494513fb21d31a6059cb5870e57e74/ruff-0.14.14-py3-none-win32.whl", hash = "sha256:b530c191970b143375b6a68e6f743800b2b786bbcf03a7965b06c4bf04568167", size = 10502442, upload-time = "2026-01-22T22:30:38.93Z" },
{ url = "https://files.pythonhosted.org/packages/87/44/90fa543014c45560cae1fffc63ea059fb3575ee6e1cb654562197e5d16fb/ruff-0.14.14-py3-none-win_amd64.whl", hash = "sha256:3dde1435e6b6fe5b66506c1dff67a421d0b7f6488d466f651c07f4cab3bf20fd", size = 11630486, upload-time = "2026-01-22T22:30:10.852Z" },
{ url = "https://files.pythonhosted.org/packages/9e/6a/40fee331a52339926a92e17ae748827270b288a35ef4a15c9c8f2ec54715/ruff-0.14.14-py3-none-win_arm64.whl", hash = "sha256:56e6981a98b13a32236a72a8da421d7839221fa308b223b9283312312e5ac76c", size = 10920448, upload-time = "2026-01-22T22:30:15.417Z" },
]
[[package]] [[package]]
name = "sniffio" name = "sniffio"
version = "1.3.1" version = "1.3.1"
@ -421,24 +395,6 @@ dev = [
{ name = "typing-extensions" }, { name = "typing-extensions" },
{ name = "xonsh" }, { name = "xonsh" },
] ]
devx = [
{ name = "greenback" },
{ name = "stackscope" },
{ name = "typing-extensions" },
]
lint = [
{ name = "ruff" },
]
repl = [
{ name = "prompt-toolkit" },
{ name = "psutil" },
{ name = "pyperclip" },
{ name = "xonsh" },
]
testing = [
{ name = "pexpect" },
{ name = "pytest" },
]
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
@ -464,22 +420,6 @@ dev = [
{ name = "typing-extensions", specifier = ">=4.14.1" }, { name = "typing-extensions", specifier = ">=4.14.1" },
{ name = "xonsh", specifier = ">=0.19.2" }, { name = "xonsh", specifier = ">=0.19.2" },
] ]
devx = [
{ name = "greenback", specifier = ">=1.2.1,<2" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },
{ name = "typing-extensions", specifier = ">=4.14.1" },
]
lint = [{ name = "ruff", specifier = ">=0.9.6" }]
repl = [
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh", specifier = ">=0.19.2" },
]
testing = [
{ name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "pytest", specifier = ">=8.3.5" },
]
[[package]] [[package]]
name = "tricycle" name = "tricycle"