Compare commits

..

No commits in common. "8fd7d1cec49b36c2b84b37818e494b20ad720a5a" and "1762b3eb64ec48b48dbe55286ffe6400e9563ffe" have entirely different histories.

33 changed files with 917 additions and 2115 deletions

View File

@ -120,7 +120,6 @@ async def main(
break_parent_ipc_after: int|bool = False,
break_child_ipc_after: int|bool = False,
pre_close: bool = False,
tpt_proto: str = 'tcp',
) -> None:
@ -132,7 +131,6 @@ async def main(
# a hang since it never engages due to broken IPC
debug_mode=debug_mode,
loglevel=loglevel,
enable_transports=[tpt_proto],
) as an,
):
@ -147,8 +145,7 @@ async def main(
_testing.expect_ctxc(
yay=(
break_parent_ipc_after
or
break_child_ipc_after
or break_child_ipc_after
),
# TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either

View File

@ -46,7 +46,6 @@ dependencies = [
# typed IPC msging
"msgspec>=0.19.0",
"cffi>=1.17.1",
"bidict>=0.23.1",
]
# ------ project ------
@ -64,7 +63,6 @@ dev = [
"pyperclip>=1.9.0",
"prompt-toolkit>=3.0.50",
"xonsh>=0.19.2",
"psutil>=7.0.0",
]
# TODO, add these with sane versions; were originally in
# `requirements-docs.txt`..

View File

@ -1,8 +1,6 @@
"""
Top level of the testing suites!
``tractor`` testing!!
"""
from __future__ import annotations
import sys
import subprocess
import os
@ -32,11 +30,7 @@ else:
_KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = (
0.6
if sys.version_info < (3, 7)
else 0.4
)
_PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4
no_windows = pytest.mark.skipif(
@ -45,9 +39,7 @@ no_windows = pytest.mark.skipif(
)
def pytest_addoption(
parser: pytest.Parser,
):
def pytest_addoption(parser):
parser.addoption(
"--ll",
action="store",
@ -64,8 +56,7 @@ def pytest_addoption(
)
parser.addoption(
"--tpdb",
"--debug-mode",
"--tpdb", "--debug-mode",
action="store_true",
dest='tractor_debug_mode',
# default=False,
@ -76,17 +67,6 @@ def pytest_addoption(
),
)
# provide which IPC transport protocols opting-in test suites
# should accumulatively run against.
parser.addoption(
"--tpt-proto",
nargs='+', # accumulate-multiple-args
action="store",
dest='tpt_protos',
default=['tcp'],
help="Transport protocol to use under the `tractor.ipc.Channel`",
)
def pytest_configure(config):
backend = config.option.spawn_backend
@ -94,7 +74,7 @@ def pytest_configure(config):
@pytest.fixture(scope='session')
def debug_mode(request) -> bool:
def debug_mode(request):
debug_mode: bool = request.config.option.tractor_debug_mode
# if debug_mode:
# breakpoint()
@ -115,35 +95,11 @@ def spawn_backend(request) -> str:
return request.config.option.spawn_backend
@pytest.fixture(scope='session')
def tpt_protos(request) -> list[str]:
# allow quoting on CLI
proto_keys: list[str] = [
proto_key.replace('"', '').replace("'", "")
for proto_key in request.config.option.tpt_protos
]
# ?TODO, eventually support multiple protos per test-sesh?
if len(proto_keys) > 1:
pytest.fail(
'We only support one `--tpt-proto <key>` atm!\n'
)
# XXX ensure we support the protocol by name via lookup!
for proto_key in proto_keys:
addr_type = tractor._addr._address_types[proto_key]
assert addr_type.proto_key == proto_key
yield proto_keys
@pytest.fixture(scope='session')
def tpt_proto(
tpt_protos: list[str],
) -> str:
yield tpt_protos[0]
# @pytest.fixture(scope='function', autouse=True)
# def debug_enabled(request) -> str:
# from tractor import _state
# if _state._runtime_vars['_debug_mode']:
# breakpoint()
_ci_env: bool = os.environ.get('CI', False)
@ -151,7 +107,7 @@ _ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session')
def ci_env() -> bool:
'''
Detect CI environment.
Detect CI envoirment.
'''
return _ci_env
@ -159,45 +115,30 @@ def ci_env() -> bool:
# TODO: also move this to `._testing` for now?
# -[ ] possibly generalize and re-use for multi-tree spawning
# along with the new stuff for multi-addrs?
# along with the new stuff for multi-addrs in distribute_dis
# branch?
#
# choose random port at import time
_rando_port: str = random.randint(1000, 9999)
# choose randomly at import time
_reg_addr: tuple[str, int] = (
'127.0.0.1',
random.randint(1000, 9999),
)
@pytest.fixture(scope='session')
def reg_addr(
tpt_proto: str,
) -> tuple[str, int|str]:
def reg_addr() -> tuple[str, int]:
# globally override the runtime to the per-test-session-dynamic
# addr so that all tests never conflict with any other actor
# tree using the default.
from tractor import (
_addr,
)
addr_type = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
from tractor import _root
_root._default_lo_addrs = [_reg_addr]
testrun_reg_addr: tuple[str, int]
match tpt_proto:
case 'tcp':
testrun_reg_addr = (
addr_type.def_bindspace,
_rando_port,
)
# NOTE, file-name uniqueness (no-collisions) will be based on
# the runtime-directory and root (pytest-proc's) pid.
case 'uds':
testrun_reg_addr = addr_type.get_random().unwrap()
assert def_reg_addr != testrun_reg_addr
return testrun_reg_addr
return _reg_addr
def pytest_generate_tests(metafunc):
spawn_backend: str = metafunc.config.option.spawn_backend
spawn_backend = metafunc.config.option.spawn_backend
if not spawn_backend:
# XXX some weird windows bug with `pytest`?
@ -210,53 +151,45 @@ def pytest_generate_tests(metafunc):
'trio',
)
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
# NOTE: used to be used to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect
# that cli input to be manually specified, BUT, maybe we'll do
# something like this again in the future?
if 'start_method' in metafunc.fixturenames:
metafunc.parametrize(
"start_method",
[spawn_backend],
scope='module',
)
# TODO, parametrize any `tpt_proto: str` declaring tests!
# proto_tpts: list[str] = metafunc.config.option.proto_tpts
# if 'tpt_proto' in metafunc.fixturenames:
# metafunc.parametrize(
# 'tpt_proto',
# proto_tpts, # TODO, double check this list usage!
# scope='module',
# )
metafunc.parametrize("start_method", [spawn_backend], scope='module')
def sig_prog(
proc: subprocess.Popen,
sig: int,
canc_timeout: float = 0.1,
) -> int:
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't registry addr collide!
# @pytest.fixture
# def open_test_runtime(
# reg_addr: tuple,
# ) -> AsyncContextManager:
# return partial(
# tractor.open_nursery,
# registry_addrs=[reg_addr],
# )
def sig_prog(proc, sig):
"Kill the actor-process with ``sig``."
proc.send_signal(sig)
time.sleep(canc_timeout)
time.sleep(0.1)
if not proc.poll():
# TODO: why sometimes does SIGINT not work on teardown?
# seems to happen only when trace logging enabled?
proc.send_signal(_KILL_SIGNAL)
ret: int = proc.wait()
ret = proc.wait()
assert ret
# TODO: factor into @cm and move to `._testing`?
@pytest.fixture
def daemon(
debug_mode: bool,
loglevel: str,
testdir,
reg_addr: tuple[str, int],
tpt_proto: str,
) -> subprocess.Popen:
):
'''
Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests.
@ -267,100 +200,28 @@ def daemon(
loglevel: str = 'info'
code: str = (
"import tractor; "
"tractor.run_daemon([], "
"registry_addrs={reg_addrs}, "
"debug_mode={debug_mode}, "
"loglevel={ll})"
"import tractor; "
"tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
# breakpoint()
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc: subprocess.Popen = testdir.popen(
proc = testdir.popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
# UDS sockets are **really** fast to bind()/listen()/connect()
# so it's often required that we delay a bit more starting
# the first actor-tree..
if tpt_proto == 'uds':
global _PROC_SPAWN_WAIT
_PROC_SPAWN_WAIT = 0.6
time.sleep(_PROC_SPAWN_WAIT)
assert not proc.returncode
time.sleep(_PROC_SPAWN_WAIT)
yield proc
sig_prog(proc, _INT_SIGNAL)
# XXX! yeah.. just be reaaal careful with this bc sometimes it
# can lock up on the `_io.BufferedReader` and hang..
stderr: str = proc.stderr.read().decode()
if stderr:
print(
f'Daemon actor tree produced STDERR:\n'
f'{proc.args}\n'
f'\n'
f'{stderr}\n'
)
if proc.returncode != -2:
raise RuntimeError(
'Daemon actor tree failed !?\n'
f'{proc.args}\n'
)
# @pytest.fixture(autouse=True)
# def shared_last_failed(pytestconfig):
# val = pytestconfig.cache.get("example/value", None)
# breakpoint()
# if val is None:
# pytestconfig.cache.set("example/value", val)
# return val
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't `registry_addrs` collide!
# -[ ] maybe use some kinda standard `def main()` arg-spec that
# we can introspect from a fixture that is called from the test
# body?
# -[ ] test and figure out typing for below prototype! Bp
#
# @pytest.fixture
# def set_script_runtime_args(
# reg_addr: tuple,
# ) -> Callable[[...], None]:
# def import_n_partial_in_args_n_triorun(
# script: Path, # under examples?
# **runtime_args,
# ) -> Callable[[], Any]: # a `partial`-ed equiv of `trio.run()`
# # NOTE, below is taken from
# # `.test_advanced_faults.test_ipc_channel_break_during_stream`
# mod: ModuleType = import_path(
# examples_dir() / 'advanced_faults'
# / 'ipc_failure_during_stream.py',
# root=examples_dir(),
# consider_namespace_packages=False,
# )
# return partial(
# trio.run,
# partial(
# mod.main,
# **runtime_args,
# )
# )
# return import_n_partial_in_args_n_triorun

View File

@ -10,9 +10,6 @@ import pytest
from _pytest.pathlib import import_path
import trio
import tractor
from tractor import (
TransportClosed,
)
from tractor._testing import (
examples_dir,
break_ipc,
@ -77,7 +74,6 @@ def test_ipc_channel_break_during_stream(
spawn_backend: str,
ipc_break: dict|None,
pre_aclose_msgstream: bool,
tpt_proto: str,
):
'''
Ensure we can have an IPC channel break its connection during
@ -95,7 +91,7 @@ def test_ipc_channel_break_during_stream(
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = TransportClosed
expect_final_exc = tractor.TransportClosed
mod: ModuleType = import_path(
examples_dir() / 'advanced_faults'
@ -108,8 +104,6 @@ def test_ipc_channel_break_during_stream(
# period" wherein the user eventually hits ctl-c to kill the
# root-actor tree.
expect_final_exc: BaseException = KeyboardInterrupt
expect_final_cause: BaseException|None = None
if (
# only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False
@ -144,9 +138,6 @@ def test_ipc_channel_break_during_stream(
# a user sending ctl-c by raising a KBI.
if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# XXX OLD XXX
# if child calls `MsgStream.aclose()` then expect EoC.
@ -166,10 +157,6 @@ def test_ipc_channel_break_during_stream(
if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# NOTE when the parent IPC side dies (even if the child does as well
# but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect
@ -182,8 +169,8 @@ def test_ipc_channel_break_during_stream(
and
ipc_break['break_child_ipc_after'] is False
):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_cause = trio.ClosedResourceError
# BOTH but, PARENT breaks FIRST
elif (
@ -194,8 +181,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after']
)
):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_cause = trio.ClosedResourceError
with pytest.raises(
expected_exception=(
@ -211,7 +198,6 @@ def test_ipc_channel_break_during_stream(
start_method=spawn_backend,
loglevel=loglevel,
pre_close=pre_aclose_msgstream,
tpt_proto=tpt_proto,
**ipc_break,
)
)
@ -234,15 +220,10 @@ def test_ipc_channel_break_during_stream(
)
cause: Exception = tc.__cause__
assert (
# type(cause) is trio.ClosedResourceError
type(cause) is expect_final_cause
# TODO, should we expect a certain exc-message (per
# tpt) as well??
# and
# cause.args[0] == 'another task closed this fd'
type(cause) is trio.ClosedResourceError
and
cause.args[0] == 'another task closed this fd'
)
raise
# get raw instance from pytest wrapper

View File

@ -7,9 +7,7 @@ import platform
from functools import partial
import itertools
import psutil
import pytest
import subprocess
import tractor
from tractor._testing import tractor_test
import trio
@ -154,23 +152,13 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry(
reg_addr: tuple,
use_signal: bool,
debug_mode: bool = False,
remote_arbiter: bool = False,
with_streaming: bool = False,
maybe_daemon: tuple[
subprocess.Popen,
psutil.Process,
]|None = None,
) -> None:
if maybe_daemon:
popen, proc = maybe_daemon
# breakpoint()
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
):
async with tractor.get_registry(reg_addr) as portal:
# runtime needs to be up to call this
@ -188,11 +176,11 @@ async def spawn_and_check_registry(
extra = 2 # local root actor + remote arbiter
# ensure current actor is registered
registry: dict = await get_reg()
registry = await get_reg()
assert actor.uid in registry
try:
async with tractor.open_nursery() as an:
async with tractor.open_nursery() as n:
async with trio.open_nursery(
strict_exception_groups=False,
) as trion:
@ -201,17 +189,17 @@ async def spawn_and_check_registry(
for i in range(3):
name = f'a{i}'
if with_streaming:
portals[name] = await an.start_actor(
portals[name] = await n.start_actor(
name=name, enable_modules=[__name__])
else: # no streaming
portals[name] = await an.run_in_actor(
portals[name] = await n.run_in_actor(
trio.sleep_forever, name=name)
# wait on last actor to come up
async with tractor.wait_for_actor(name):
registry = await get_reg()
for uid in an._children:
for uid in n._children:
assert uid in registry
assert len(portals) + extra == len(registry)
@ -244,7 +232,6 @@ async def spawn_and_check_registry(
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel(
debug_mode: bool,
start_method,
use_signal,
reg_addr,
@ -261,7 +248,6 @@ def test_subactors_unregister_on_cancel(
spawn_and_check_registry,
reg_addr,
use_signal,
debug_mode=debug_mode,
remote_arbiter=False,
with_streaming=with_streaming,
),
@ -271,8 +257,7 @@ def test_subactors_unregister_on_cancel(
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel_remote_daemon(
daemon: subprocess.Popen,
debug_mode: bool,
daemon,
start_method,
use_signal,
reg_addr,
@ -288,13 +273,8 @@ def test_subactors_unregister_on_cancel_remote_daemon(
spawn_and_check_registry,
reg_addr,
use_signal,
debug_mode=debug_mode,
remote_arbiter=True,
with_streaming=with_streaming,
maybe_daemon=(
daemon,
psutil.Process(daemon.pid)
),
),
)
@ -393,7 +373,7 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
daemon: subprocess.Popen,
daemon,
start_method,
use_signal,
reg_addr,

View File

@ -11,9 +11,6 @@ from tractor.ipc import (
)
from tractor._testing.samples import generate_sample_messages
# in case you don't want to melt your cores, uncomment dis!
pytestmark = pytest.mark.skip
@tractor.context
async def child_read_shm(

View File

@ -1,85 +0,0 @@
'''
Runtime boot/init sanity.
'''
import pytest
import trio
import tractor
from tractor._exceptions import RuntimeFailure
@tractor.context
async def open_new_root_in_sub(
ctx: tractor.Context,
) -> None:
async with tractor.open_root_actor():
pass
@pytest.mark.parametrize(
'open_root_in',
['root', 'sub'],
ids='open_2nd_root_in={}'.format,
)
def test_only_one_root_actor(
open_root_in: str,
reg_addr: tuple,
debug_mode: bool
):
'''
Verify we specially fail whenever more then one root actor
is attempted to be opened within an already opened tree.
'''
async def main():
async with tractor.open_nursery() as an:
if open_root_in == 'root':
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
pass
ptl: tractor.Portal = await an.start_actor(
name='bad_rooty_boi',
enable_modules=[__name__],
)
async with ptl.open_context(
open_new_root_in_sub,
) as (ctx, first):
pass
if open_root_in == 'root':
with pytest.raises(
RuntimeFailure
) as excinfo:
trio.run(main)
else:
with pytest.raises(
tractor.RemoteActorError,
) as excinfo:
trio.run(main)
assert excinfo.value.boxed_type is RuntimeFailure
def test_implicit_root_via_first_nursery(
reg_addr: tuple,
debug_mode: bool
):
'''
The first `ActorNursery` open should implicitly call
`_root.open_root_actor()`.
'''
async def main():
async with tractor.open_nursery() as an:
assert an._implicit_runtime_started
assert tractor.current_actor().aid.name == 'root'
trio.run(main)

View File

@ -2,7 +2,6 @@
Spawning basics
"""
from functools import partial
from typing import (
Any,
)
@ -13,95 +12,74 @@ import tractor
from tractor._testing import tractor_test
data_to_pass_down = {
'doggy': 10,
'kitty': 4,
}
data_to_pass_down = {'doggy': 10, 'kitty': 4}
async def spawn(
should_be_root: bool,
is_arbiter: bool,
data: dict,
reg_addr: tuple[str, int],
debug_mode: bool = False,
):
namespaces = [__name__]
await trio.sleep(0.1)
actor = tractor.current_actor(err_on_no_runtime=False)
if should_be_root:
assert actor is None # no runtime yet
async with (
tractor.open_root_actor(
arbiter_addr=reg_addr,
),
tractor.open_nursery() as an,
):
# now runtime exists
actor: tractor.Actor = tractor.current_actor()
assert actor.is_arbiter == should_be_root
async with tractor.open_root_actor(
arbiter_addr=reg_addr,
):
actor = tractor.current_actor()
assert actor.is_arbiter == is_arbiter
data = data_to_pass_down
# spawns subproc here
portal: tractor.Portal = await an.run_in_actor(
fn=spawn,
if actor.is_arbiter:
async with tractor.open_nursery() as nursery:
# spawning args
name='sub-actor',
enable_modules=[__name__],
# forks here
portal = await nursery.run_in_actor(
spawn,
is_arbiter=False,
name='sub-actor',
data=data,
reg_addr=reg_addr,
enable_modules=namespaces,
)
# passed to a subactor-recursive RPC invoke
# of this same `spawn()` fn.
should_be_root=False,
data=data_to_pass_down,
reg_addr=reg_addr,
)
assert len(an._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers
# get result from child subactor
result = await portal.result()
assert result == 10
return result
else:
assert actor.is_arbiter == should_be_root
return 10
assert len(nursery._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers
# be sure we can still get the result
result = await portal.result()
assert result == 10
return result
else:
return 10
def test_run_in_actor_same_func_in_child(
reg_addr: tuple,
debug_mode: bool,
def test_local_arbiter_subactor_global_state(
reg_addr,
):
result = trio.run(
partial(
spawn,
should_be_root=True,
data=data_to_pass_down,
reg_addr=reg_addr,
debug_mode=debug_mode,
)
spawn,
True,
data_to_pass_down,
reg_addr,
)
assert result == 10
async def movie_theatre_question():
'''
A question asked in a dark theatre, in a tangent
"""A question asked in a dark theatre, in a tangent
(errr, I mean different) process.
'''
"""
return 'have you ever seen a portal?'
@tractor_test
async def test_movie_theatre_convo(start_method):
'''
The main ``tractor`` routine.
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery(debug_mode=True) as n:
'''
async with tractor.open_nursery(debug_mode=True) as an:
portal = await an.start_actor(
portal = await n.start_actor(
'frank',
# enable the actor to run funcs from this current module
enable_modules=[__name__],
@ -140,8 +118,8 @@ async def test_most_beautiful_word(
with trio.fail_after(1):
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.run_in_actor(
) as n:
portal = await n.run_in_actor(
cellar_door,
return_value=return_value,
name='some_linguist',

View File

@ -14,151 +14,56 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from pathlib import Path
import os
# import tempfile
import tempfile
from uuid import uuid4
from typing import (
Protocol,
ClassVar,
# TypeVar,
# Union,
Type,
TYPE_CHECKING,
TypeVar,
Union,
Type
)
from bidict import bidict
# import trio
from trio import (
socket,
SocketListener,
open_tcp_listeners,
)
from .log import get_logger
from ._state import (
get_rt_dir,
current_actor,
is_root_process,
_def_tpt_proto,
)
if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger(__name__)
import trio
from trio import socket
# TODO, maybe breakout the netns key to a struct?
# class NetNs(Struct)[str, int]:
# ...
# TODO, can't we just use a type alias
# for this? namely just some `tuple[str, int, str, str]`?
#
# -[ ] would also just be simpler to keep this as SockAddr[tuple]
# or something, implying it's just a simple pair of values which can
# presumably be mapped to all transports?
# -[ ] `pydoc socket.socket.getsockname()` delivers a 4-tuple for
# ipv6 `(hostaddr, port, flowinfo, scope_id)`.. so how should we
# handle that?
# -[ ] as a further alternative to this wrap()/unwrap() approach we
# could just implement `enc/dec_hook()`s for the `Address`-types
# and just deal with our internal objs directly and always and
# leave it to the codec layer to figure out marshalling?
# |_ would mean only one spot to do the `.unwrap()` (which we may
# end up needing to call from the hook()s anyway?)
# -[x] rename to `UnwrappedAddress[Descriptor]` ??
# seems like the right name as per,
# https://www.geeksforgeeks.org/introduction-to-address-descriptor/
#
UnwrappedAddress = (
# tcp/udp/uds
tuple[
str, # host/domain(tcp), filesys-dir(uds)
int|str, # port/path(uds)
]
# ?TODO? should we also include another 2 fields from
# our `Aid` msg such that we include the runtime `Actor.uid`
# of `.name` and `.uuid`?
# - would ensure uniqueness across entire net?
# - allows for easier runtime-level filtering of "actors by
# service name"
)
NamespaceType = TypeVar('NamespaceType')
AddressType = TypeVar('AddressType')
StreamType = TypeVar('StreamType')
ListenerType = TypeVar('ListenerType')
# TODO, maybe rename to `SocketAddress`?
class Address(Protocol):
proto_key: ClassVar[str]
unwrapped_type: ClassVar[UnwrappedAddress]
class Address(Protocol[
NamespaceType,
AddressType,
StreamType,
ListenerType
]):
name_key: ClassVar[str]
address_type: ClassVar[Type[AddressType]]
# TODO, i feel like an `.is_bound()` is a better thing to
# support?
# Lke, what use does this have besides a noop and if it's not
# valid why aren't we erroring on creation/use?
@property
def is_valid(self) -> bool:
...
# TODO, maybe `.netns` is a better name?
@property
def namespace(self) -> tuple[str, int]|None:
'''
The if-available, OS-specific "network namespace" key.
'''
...
@property
def bindspace(self) -> str:
'''
Deliver the socket address' "bindable space" from
a `socket.socket.bind()` and thus from the perspective of
specific transport protocol domain.
I.e. for most (layer-4) network-socket protocols this is
normally the ipv4/6 address, for UDS this is normally
a filesystem (sub-directory).
For (distributed) network protocols this is normally the routing
layer's domain/(ip-)address, though it might also include a "network namespace"
key different then the default.
For local-host-only transports this is either an explicit
namespace (with types defined by the OS: netns, Cgroup, IPC,
pid, etc. on linux) or failing that the sub-directory in the
filesys in which socket/shm files are located *under*.
'''
def namespace(self) -> NamespaceType|None:
...
@classmethod
def from_addr(cls, addr: UnwrappedAddress) -> Address:
def from_addr(cls, addr: AddressType) -> Address:
...
def unwrap(self) -> UnwrappedAddress:
'''
Deliver the underying minimum field set in
a primitive python data type-structure.
'''
def unwrap(self) -> AddressType:
...
@classmethod
def get_random(
cls,
current_actor: Actor,
bindspace: str|None = None,
) -> Address:
def get_random(cls, namespace: NamespaceType | None = None) -> Address:
...
# TODO, this should be something like a `.get_def_registar_addr()`
# or similar since,
# - it should be a **host singleton** (not root/tree singleton)
# - we **only need this value** when one isn't provided to the
# runtime at boot and we want to implicitly provide a host-wide
# registrar.
# - each rooted-actor-tree should likely have its own
# micro-registry (likely the root being it), also see
@classmethod
def get_root(cls) -> Address:
...
@ -169,20 +74,25 @@ class Address(Protocol):
def __eq__(self, other) -> bool:
...
async def open_listener(
self,
**kwargs,
) -> SocketListener:
async def open_stream(self, **kwargs) -> StreamType:
...
async def open_listener(self, **kwargs) -> ListenerType:
...
async def close_listener(self):
...
class TCPAddress(Address):
proto_key: str = 'tcp'
unwrapped_type: type = tuple[str, int]
def_bindspace: str = '127.0.0.1'
class TCPAddress(Address[
str,
tuple[str, int],
trio.SocketStream,
trio.SocketListener
]):
name_key: str = 'tcp'
address_type: type = tuple[str, int]
def __init__(
self,
@ -194,63 +104,35 @@ class TCPAddress(Address):
or
not isinstance(port, int)
):
raise TypeError(
f'Expected host {host!r} to be str and port {port!r} to be int'
)
self._host: str = host
self._port: int = port
raise TypeError(f'Expected host {host} to be str and port {port} to be int')
self._host = host
self._port = port
@property
def is_valid(self) -> bool:
return self._port != 0
@property
def bindspace(self) -> str:
return self._host
@property
def domain(self) -> str:
def namespace(self) -> str:
return self._host
@classmethod
def from_addr(
cls,
addr: tuple[str, int]
) -> TCPAddress:
match addr:
case (str(), int()):
return TCPAddress(addr[0], addr[1])
case _:
raise ValueError(
f'Invalid unwrapped address for {cls}\n'
f'{addr}\n'
)
def from_addr(cls, addr: tuple[str, int]) -> TCPAddress:
return TCPAddress(addr[0], addr[1])
def unwrap(self) -> tuple[str, int]:
return (
self._host,
self._port,
)
return self._host, self._port
@classmethod
def get_random(
cls,
bindspace: str = def_bindspace,
) -> TCPAddress:
return TCPAddress(bindspace, 0)
def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress:
return TCPAddress(namespace, 0)
@classmethod
def get_root(cls) -> Address:
return TCPAddress(
'127.0.0.1',
1616,
)
return TCPAddress('127.0.0.1', 1616)
def __repr__(self) -> str:
return (
f'{type(self).__name__}[{self.unwrap()}]'
)
return f'{type(self)} @ {self.unwrap()}'
def __eq__(self, other) -> bool:
if not isinstance(other, TCPAddress):
@ -264,11 +146,17 @@ class TCPAddress(Address):
self._port == other._port
)
async def open_listener(
self,
**kwargs,
) -> SocketListener:
listeners: list[SocketListener] = await open_tcp_listeners(
async def open_stream(self, **kwargs) -> trio.SocketStream:
stream = await trio.open_tcp_stream(
self._host,
self._port,
**kwargs
)
self._host, self._port = stream.socket.getsockname()[:2]
return stream
async def open_listener(self, **kwargs) -> trio.SocketListener:
listeners = await trio.open_tcp_listeners(
host=self._host,
port=self._port,
**kwargs
@ -282,156 +170,47 @@ class TCPAddress(Address):
...
def unwrap_sockpath(
sockpath: Path,
) -> tuple[Path, Path]:
return (
sockpath.parent,
sockpath.name,
)
class UDSAddress(Address[
None,
str,
trio.SocketStream,
trio.SocketListener
]):
class UDSAddress(Address):
# TODO, maybe we should use better field and value
# -[x] really this is a `.protocol_key` not a "name" of anything.
# -[ ] consider a 'unix' proto-key instead?
# -[ ] need to check what other mult-transport frameworks do
# like zmq, nng, uri-spec et al!
proto_key: str = 'uds'
unwrapped_type: type = tuple[str, int]
def_bindspace: Path = get_rt_dir()
name_key: str = 'uds'
address_type: type = str
def __init__(
self,
filedir: Path|str|None,
# TODO, i think i want `.filename` here?
filename: str|Path,
# XXX, in the sense you can also pass
# a "non-real-world-process-id" such as is handy to represent
# our host-local default "port-like" key for the very first
# root actor to create a registry address.
maybe_pid: int|None = None,
filepath: str
):
fdir = self._filedir = Path(
filedir
or
self.def_bindspace
).absolute()
fpath = self._filename = Path(filename)
fp: Path = fdir / fpath
assert (
fp.is_absolute()
and
fp == self.sockpath
)
# to track which "side" is the peer process by reading socket
# credentials-info.
self._pid: int = maybe_pid
@property
def sockpath(self) -> Path:
return self._filedir / self._filename
self._filepath = filepath
@property
def is_valid(self) -> bool:
'''
We block socket files not allocated under the runtime subdir.
'''
return self.bindspace in self.sockpath.parents
return True
@property
def bindspace(self) -> Path:
'''
We replicate the "ip-set-of-hosts" part of a UDS socket as
just the sub-directory in which we allocate socket files.
'''
return self._filedir or self.def_bindspace
def namespace(self) -> None:
return
@classmethod
def from_addr(
cls,
addr: (
tuple[Path|str, Path|str]|Path|str
),
) -> UDSAddress:
match addr:
case tuple()|list():
filedir = Path(addr[0])
filename = Path(addr[1])
# sockpath: Path = Path(addr[0])
# filedir, filename = unwrap_sockpath(sockpath)
# pid: int = addr[1]
return UDSAddress(
filedir=filedir,
filename=filename,
# maybe_pid=pid,
)
# NOTE, in case we ever decide to just `.unwrap()`
# to a `Path|str`?
case str()|Path():
sockpath: Path = Path(addr)
return UDSAddress(*unwrap_sockpath(sockpath))
case _:
# import pdbp; pdbp.set_trace()
raise TypeError(
f'Bad unwrapped-address for {cls} !\n'
f'{addr!r}\n'
)
def from_addr(cls, filepath: str) -> UDSAddress:
return UDSAddress(filepath)
def unwrap(self) -> tuple[str, int]:
# XXX NOTE, since this gets passed DIRECTLY to
# `.ipc._uds.open_unix_socket_w_passcred()`
return (
str(self._filedir),
str(self._filename),
)
def unwrap(self) -> str:
return self._filepath
@classmethod
def get_random(
cls,
bindspace: Path|None = None, # default netns
) -> UDSAddress:
filedir: Path = bindspace or cls.def_bindspace
pid: int = os.getpid()
actor: Actor|None = current_actor(
err_on_no_runtime=False,
)
if actor:
sockname: str = '::'.join(actor.uid) + f'@{pid}'
else:
prefix: str = '<unknown-actor>'
if is_root_process():
prefix: str = 'root'
sockname: str = f'{prefix}@{pid}'
sockpath: Path = Path(f'{sockname}.sock')
return UDSAddress(
filedir=filedir,
filename=sockpath,
maybe_pid=pid,
)
def get_random(cls, namespace: None = None) -> UDSAddress:
return UDSAddress(f'{tempfile.gettempdir()}/{uuid4()}.sock')
@classmethod
def get_root(cls) -> Address:
def_uds_filename: Path = 'registry@1616.sock'
return UDSAddress(
filedir=None,
filename=def_uds_filename,
# maybe_pid=1616,
)
return UDSAddress('tractor.sock')
def __repr__(self) -> str:
return (
f'{type(self).__name__}'
f'['
f'({self._filedir}, {self._filename})'
f']'
)
return f'{type(self)} @ {self._filepath}'
def __eq__(self, other) -> bool:
if not isinstance(other, UDSAddress):
@ -439,144 +218,92 @@ class UDSAddress(Address):
f'Can not compare {type(other)} with {type(self)}'
)
return self.sockpath == other.sockpath
return self._filepath == other._filepath
# async def open_listener(self, **kwargs) -> SocketListener:
async def open_listener(
self,
**kwargs,
) -> SocketListener:
sock = self._sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM
)
log.info(
f'Attempting to bind UDS socket\n'
f'>[\n'
f'|_{self}\n'
async def open_stream(self, **kwargs) -> trio.SocketStream:
stream = await trio.open_unix_socket(
self._filepath,
**kwargs
)
return stream
bindpath: Path = self.sockpath
await sock.bind(str(bindpath))
sock.listen(1)
log.info(
f'Listening on UDS socket\n'
f'[>\n'
f' |_{self}\n'
)
return SocketListener(self._sock)
async def open_listener(self, **kwargs) -> trio.SocketListener:
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
await self._sock.bind(self._filepath)
self._sock.listen(1)
return trio.SocketListener(self._sock)
def close_listener(self):
async def close_listener(self):
self._sock.close()
os.unlink(self.sockpath)
os.unlink(self._filepath)
_address_types: bidict[str, Type[Address]] = {
'tcp': TCPAddress,
'uds': UDSAddress
preferred_transport = 'uds'
_address_types = (
TCPAddress,
UDSAddress
)
_default_addrs: dict[str, Type[Address]] = {
cls.name_key: cls
for cls in _address_types
}
# TODO! really these are discovery sys default addrs ONLY useful for
# when none is provided to a root actor on first boot.
AddressTypes = Union[
tuple([
cls.address_type
for cls in _address_types
])
]
_default_lo_addrs: dict[
str,
UnwrappedAddress
AddressTypes
] = {
'tcp': TCPAddress.get_root().unwrap(),
'uds': UDSAddress.get_root().unwrap(),
cls.name_key: cls.get_root().unwrap()
for cls in _address_types
}
def get_address_cls(name: str) -> Type[Address]:
return _address_types[name]
return _default_addrs[name]
def is_wrapped_addr(addr: any) -> bool:
return type(addr) in _address_types.values()
return type(addr) in _address_types
def mk_uuid() -> str:
'''
Encapsulate creation of a uuid4 as `str` as used
for creating `Actor.uid: tuple[str, str]` and/or
`.msg.types.Aid`.
def wrap_address(addr: AddressTypes) -> Address:
'''
return str(uuid4())
def wrap_address(
addr: UnwrappedAddress
) -> Address:
'''
Wrap an `UnwrappedAddress` as an `Address`-type based
on matching builtin python data-structures which we adhoc
use for each.
XXX NOTE, careful care must be placed to ensure
`UnwrappedAddress` cases are **definitely unique** otherwise the
wrong transport backend may be loaded and will break many
low-level things in our runtime in a not-fun-to-debug way!
XD
'''
if is_wrapped_addr(addr):
return addr
cls: Type|None = None
# if 'sock' in addr[0]:
# import pdbp; pdbp.set_trace()
cls = None
match addr:
# classic network socket-address as tuple/list
case (
(str(), int())
|
[str(), int()]
):
cls = TCPAddress
case (
# (str()|Path(), str()|Path()),
# ^TODO? uhh why doesn't this work!?
(_, filename)
) if type(filename) is str:
case str():
cls = UDSAddress
# likely an unset UDS or TCP reg address as defaulted in
# `_state._runtime_vars['_root_mailbox']`
#
# TODO? figure out when/if we even need this?
case (
None
|
[None, None]
):
cls: Type[Address] = get_address_cls(_def_tpt_proto)
addr: UnwrappedAddress = cls.get_root().unwrap()
case tuple() | list():
cls = TCPAddress
case None:
cls = get_address_cls(preferred_transport)
addr = cls.get_root().unwrap()
case _:
# import pdbp; pdbp.set_trace()
raise TypeError(
f'Can not wrap unwrapped-address ??\n'
f'type(addr): {type(addr)!r}\n'
f'addr: {addr!r}\n'
f'Can not wrap addr {addr} of type {type(addr)}'
)
return cls.from_addr(addr)
def default_lo_addrs(
transports: list[str],
) -> list[Type[Address]]:
'''
Return the default, host-singleton, registry address
for an input transport key set.
'''
def default_lo_addrs(transports: list[str]) -> list[AddressTypes]:
return [
_default_lo_addrs[transport]
for transport in transports

View File

@ -50,8 +50,8 @@ if __name__ == "__main__":
args = parser.parse_args()
subactor = Actor(
name=args.uid[0],
uuid=args.uid[1],
args.uid[0],
uid=args.uid[1],
loglevel=args.loglevel,
spawn_method="trio"
)

View File

@ -366,7 +366,7 @@ class Context:
# f' ---\n'
f' |_ipc: {self.dst_maddr}\n'
# f' dst_maddr{ds}{self.dst_maddr}\n'
f" uid{ds}'{self.chan.aid}'\n"
f" uid{ds}'{self.chan.uid}'\n"
f" cid{ds}'{self.cid}'\n"
# f' ---\n'
f'\n'
@ -945,10 +945,10 @@ class Context:
reminfo: str = (
# ' =>\n'
# f'Context.cancel() => {self.chan.uid}\n'
f'\n'
f'c)=> {self.chan.uid}\n'
f' |_[{self.dst_maddr}\n'
f' >>{self.repr_rpc}\n'
# f'{self.chan.uid}\n'
f' |_ @{self.dst_maddr}\n'
f' >> {self.repr_rpc}\n'
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
# TODO: pull msg-type from spec re #320
)

View File

@ -31,8 +31,9 @@ from tractor.log import get_logger
from .trionics import gather_contexts
from .ipc import _connect_chan, Channel
from ._addr import (
UnwrappedAddress,
AddressTypes,
Address,
preferred_transport,
wrap_address
)
from ._portal import (
@ -43,7 +44,6 @@ from ._portal import (
from ._state import (
current_actor,
_runtime_vars,
_def_tpt_proto,
)
if TYPE_CHECKING:
@ -54,9 +54,7 @@ log = get_logger(__name__)
@acm
async def get_registry(
addr: UnwrappedAddress|None = None,
) -> AsyncGenerator[
async def get_registry(addr: AddressTypes | None = None) -> AsyncGenerator[
Portal | LocalPortal | None,
None,
]:
@ -73,9 +71,7 @@ async def get_registry(
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(
actor,
Channel(transport=None)
# ^XXX, we DO NOT actually provide nor connect an
# underlying transport since this is merely an API shim.
await Channel.from_addr(addr)
)
else:
# TODO: try to look pre-existing connection from
@ -139,10 +135,10 @@ def get_peer_by_name(
@acm
async def query_actor(
name: str,
regaddr: UnwrappedAddress|None = None,
regaddr: AddressTypes|None = None,
) -> AsyncGenerator[
UnwrappedAddress|None,
AddressTypes|None,
None,
]:
'''
@ -172,7 +168,7 @@ async def query_actor(
async with get_registry(regaddr) as reg_portal:
# TODO: return portals to all available actors - for now
# just the last one that registered
addr: UnwrappedAddress = await reg_portal.run_from_ns(
addr: AddressTypes = await reg_portal.run_from_ns(
'self',
'find_actor',
name=name,
@ -182,7 +178,7 @@ async def query_actor(
@acm
async def maybe_open_portal(
addr: UnwrappedAddress,
addr: AddressTypes,
name: str,
):
async with query_actor(
@ -202,8 +198,8 @@ async def maybe_open_portal(
@acm
async def find_actor(
name: str,
registry_addrs: list[UnwrappedAddress]|None = None,
enable_transports: list[str] = [_def_tpt_proto],
registry_addrs: list[AddressTypes]|None = None,
enable_transports: list[str] = [preferred_transport],
only_first: bool = True,
raise_on_none: bool = False,
@ -238,7 +234,7 @@ async def find_actor(
)
maybe_portals: list[
AsyncContextManager[UnwrappedAddress]
AsyncContextManager[AddressTypes]
] = list(
maybe_open_portal(
addr=addr,
@ -280,7 +276,7 @@ async def find_actor(
@acm
async def wait_for_actor(
name: str,
registry_addr: UnwrappedAddress | None = None,
registry_addr: AddressTypes | None = None,
) -> AsyncGenerator[Portal, None]:
'''
@ -297,7 +293,7 @@ async def wait_for_actor(
yield peer_portal
return
regaddr: UnwrappedAddress = (
regaddr: AddressTypes = (
registry_addr
or
actor.reg_addrs[0]
@ -314,7 +310,7 @@ async def wait_for_actor(
# get latest registered addr by default?
# TODO: offer multi-portal yields in multi-homed case?
addr: UnwrappedAddress = addrs[-1]
addr: AddressTypes = addrs[-1]
async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:

View File

@ -37,7 +37,7 @@ from .log import (
from . import _state
from .devx import _debug
from .to_asyncio import run_as_asyncio_guest
from ._addr import UnwrappedAddress
from ._addr import AddressTypes
from ._runtime import (
async_main,
Actor,
@ -53,10 +53,10 @@ log = get_logger(__name__)
def _mp_main(
actor: Actor,
accept_addrs: list[UnwrappedAddress],
accept_addrs: list[AddressTypes],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: UnwrappedAddress | None = None,
parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False,
) -> None:
@ -207,7 +207,7 @@ def nest_from_op(
def _trio_main(
actor: Actor,
*,
parent_addr: UnwrappedAddress|None = None,
parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False,
) -> None:

View File

@ -23,6 +23,7 @@ import builtins
import importlib
from pprint import pformat
from pdb import bdb
import sys
from types import (
TracebackType,
)
@ -71,22 +72,8 @@ log = get_logger('tractor')
_this_mod = importlib.import_module(__name__)
class RuntimeFailure(RuntimeError):
'''
General `Actor`-runtime failure due to,
- a bad runtime-env,
- falied spawning (bad input to process),
- API usage.
'''
class ActorFailure(RuntimeFailure):
'''
`Actor` failed to boot before/after spawn
'''
class ActorFailure(Exception):
"General actor failure"
class InternalError(RuntimeError):
@ -139,12 +126,6 @@ class TrioTaskExited(Exception):
'''
class DebugRequestError(RuntimeError):
'''
Failed to request stdio lock from root actor!
'''
# NOTE: more or less should be close to these:
# 'boxed_type',
# 'src_type',
@ -210,8 +191,6 @@ def get_err_type(type_name: str) -> BaseException|None:
):
return type_ref
return None
def pack_from_raise(
local_err: (
@ -542,6 +521,7 @@ class RemoteActorError(Exception):
if val:
_repr += f'{key}={val_str}{end_char}'
return _repr
def reprol(self) -> str:
@ -620,9 +600,56 @@ class RemoteActorError(Exception):
the type name is already implicitly shown by python).
'''
header: str = ''
body: str = ''
message: str = ''
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(exc := sys.exception())
and
exc is self
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if with_type_header:
header: str = f'<{type(self).__name__}('
if message := self._message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
# IFF there is an embedded traceback-str we always
# draw the ascii-box around it.
body: str = ''
if tb_str := self.tb_str:
fields: str = self._mk_fields_str(
_body_fields
@ -643,15 +670,21 @@ class RemoteActorError(Exception):
boxer_header=self.relay_uid,
)
# !TODO, it'd be nice to import these top level without
# cycles!
from tractor.devx.pformat import (
pformat_exc,
)
return pformat_exc(
exc=self,
with_type_header=with_type_header,
body=body,
tail = ''
if (
with_type_header
and not message
):
tail: str = '>'
return (
header
+
message
+
f'{body}'
+
tail
)
__repr__ = pformat
@ -929,7 +962,7 @@ class StreamOverrun(
'''
class TransportClosed(Exception):
class TransportClosed(trio.BrokenResourceError):
'''
IPC transport (protocol) connection was closed or broke and
indicates that the wrapping communication `Channel` can no longer
@ -940,21 +973,16 @@ class TransportClosed(Exception):
self,
message: str,
loglevel: str = 'transport',
src_exc: Exception|None = None,
cause: BaseException|None = None,
raise_on_report: bool = False,
) -> None:
self.message: str = message
self._loglevel: str = loglevel
self._loglevel = loglevel
super().__init__(message)
self.src_exc = src_exc
if (
src_exc is not None
and
not self.__cause__
):
self.__cause__ = src_exc
if cause is not None:
self.__cause__ = cause
# flag to toggle whether the msg loop should raise
# the exc in its `TransportClosed` handler block.
@ -981,36 +1009,13 @@ class TransportClosed(Exception):
f' {cause}\n' # exc repr
)
getattr(
log,
self._loglevel
)(message)
getattr(log, self._loglevel)(message)
# some errors we want to blow up from
# inside the RPC msg loop
if self._raise_on_report:
raise self from cause
def pformat(self) -> str:
from tractor.devx.pformat import (
pformat_exc,
)
src_err: Exception|None = self.src_exc or '<unknown>'
src_msg: tuple[str] = src_err.args
src_exc_repr: str = (
f'{type(src_err).__name__}[ {src_msg} ]'
)
return pformat_exc(
exc=self,
# message=self.message, # implicit!
body=(
f' |_src_exc: {src_exc_repr}\n'
),
)
# delegate to `str`-ified pformat
__repr__ = pformat
class NoResult(RuntimeError):
"No final result is expected for this actor"

View File

@ -107,10 +107,6 @@ class Portal:
# point.
self._expect_result_ctx: Context|None = None
self._streams: set[MsgStream] = set()
# TODO, this should be PRIVATE (and never used publicly)! since it's just
# a cached ref to the local runtime instead of calling
# `current_actor()` everywhere.. XD
self.actor: Actor = current_actor()
@property
@ -175,7 +171,7 @@ class Portal:
# not expecting a "main" result
if self._expect_result_ctx is None:
log.warning(
f"Portal for {self.channel.aid} not expecting a final"
f"Portal for {self.channel.uid} not expecting a final"
" result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`")
return NoResult
@ -222,7 +218,7 @@ class Portal:
# IPC calls
if self._streams:
log.cancel(
f"Cancelling all streams with {self.channel.aid}")
f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy():
try:
await stream.aclose()
@ -267,7 +263,7 @@ class Portal:
return False
reminfo: str = (
f'c)=> {self.channel.aid}\n'
f'c)=> {self.channel.uid}\n'
f' |_{chan}\n'
)
log.cancel(
@ -310,7 +306,7 @@ class Portal:
):
log.debug(
'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.aid}\n'
f'{self.channel.uid}\n'
f' |_{self.channel}\n'
)
return False
@ -508,12 +504,8 @@ class LocalPortal:
return it's result.
'''
obj = (
self.actor
if ns == 'self'
else importlib.import_module(ns)
)
func: Callable = getattr(obj, func_name)
obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func_name)
return await func(**kwargs)
@ -551,10 +543,8 @@ async def open_portal(
await channel.connect()
was_connected = True
if channel.aid is None:
await channel._do_handshake(
aid=actor.aid,
)
if channel.uid is None:
await actor._do_handshake(channel)
msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop:

View File

@ -18,9 +18,7 @@
Root actor runtime ignition(s).
'''
from contextlib import (
asynccontextmanager as acm,
)
from contextlib import asynccontextmanager as acm
from functools import partial
import importlib
import inspect
@ -28,10 +26,7 @@ import logging
import os
import signal
import sys
from typing import (
Any,
Callable,
)
from typing import Callable
import warnings
@ -52,105 +47,28 @@ from .ipc import (
_connect_chan,
)
from ._addr import (
Address,
UnwrappedAddress,
default_lo_addrs,
mk_uuid,
AddressTypes,
wrap_address,
preferred_transport,
default_lo_addrs
)
from ._exceptions import (
RuntimeFailure,
is_multi_cancelled,
)
from ._exceptions import is_multi_cancelled
logger = log.get_logger('tractor')
# TODO: stick this in a `@acm` defined in `devx._debug`?
# -[ ] also maybe consider making this a `wrapt`-deco to
# save an indent level?
#
@acm
async def maybe_block_bp(
debug_mode: bool,
maybe_enable_greenback: bool,
) -> bool:
# Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler: Callable = sys.breakpointhook
orig_bp_path: str|None = os.environ.get(
'PYTHONBREAKPOINT',
None,
)
bp_blocked: bool
if (
debug_mode
and maybe_enable_greenback
and (
maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False,
)
)
):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin'
)
_state._runtime_vars['use_greenback'] = True
bp_blocked = False
else:
# TODO: disable `breakpoint()` by default (without
# `greenback`) since it will break any multi-actor
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'Trying to use `breakpoint()` eh?\n\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)
sys.breakpointhook = block_bps
# lol ok,
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
os.environ['PYTHONBREAKPOINT'] = "0"
bp_blocked = True
try:
yield bp_blocked
finally:
# restore any prior built-in `breakpoint()` hook state
if builtin_bp_handler is not None:
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT', None)
@acm
async def open_root_actor(
*,
# defaults are above
registry_addrs: list[UnwrappedAddress]|None = None,
registry_addrs: list[AddressTypes]|None = None,
# defaults are above
arbiter_addr: tuple[UnwrappedAddress]|None = None,
arbiter_addr: tuple[AddressTypes]|None = None,
enable_transports: list[
_state.TransportProtocolKey,
] = [_state._def_tpt_proto],
enable_transports: list[str] = [preferred_transport],
name: str|None = 'root',
@ -192,323 +110,351 @@ async def open_root_actor(
Runtime init entry point for ``tractor``.
'''
# XXX NEVER allow nested actor-trees!
if already_actor := _state.current_actor(err_on_no_runtime=False):
rtvs: dict[str, Any] = _state._runtime_vars
root_mailbox: list[str, int] = rtvs['_root_mailbox']
registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
raise RuntimeFailure(
f'A current actor already exists !?\n'
f'({already_actor}\n'
f'\n'
f'You can NOT open a second root actor from within '
f'an existing tree and the current root of this '
f'already exists !!\n'
f'\n'
f'_root_mailbox: {root_mailbox!r}\n'
f'_registry_addrs: {registry_addrs!r}\n'
)
_debug.hide_runtime_frames()
__tracebackhide__: bool = hide_tb
async with maybe_block_bp(
debug_mode=debug_mode,
maybe_enable_greenback=maybe_enable_greenback,
# TODO: stick this in a `@cm` defined in `devx._debug`?
#
# Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler: Callable = sys.breakpointhook
orig_bp_path: str|None = os.environ.get(
'PYTHONBREAKPOINT',
None,
)
if (
debug_mode
and maybe_enable_greenback
and (
maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False,
)
)
):
_debug.hide_runtime_frames()
__tracebackhide__: bool = hide_tb
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin'
)
_state._runtime_vars['use_greenback'] = True
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
else:
# TODO: disable `breakpoint()` by default (without
# `greenback`) since it will break any multi-actor
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'Trying to use `breakpoint()` eh?\n\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
sys.breakpointhook = block_bps
# lol ok,
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
os.environ['PYTHONBREAKPOINT'] = "0"
# caps based rpc list
enable_modules = (
enable_modules
or
[]
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
# caps based rpc list
enable_modules = (
enable_modules
or
[]
)
if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)
if start_method is not None:
_spawn.try_set_start_method(start_method)
if arbiter_addr is not None:
warnings.warn(
'`arbiter_addr` is now deprecated\n'
'Use `registry_addrs: list[tuple]` instead..',
DeprecationWarning,
stacklevel=2,
)
registry_addrs = [arbiter_addr]
if not registry_addrs:
registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports)
assert registry_addrs
loglevel = (
loglevel
or log._default_loglevel
).upper()
if (
debug_mode
and _spawn._spawn_method == 'trio'
):
_state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing for
# use of ``await tractor.pause()``
enable_modules.append('tractor.devx._debug')
# if debug mode get's enabled *at least* use that level of
# logging for some informative console prompts.
if (
logging.getLevelName(
# lul, need the upper case for the -> int map?
# sweet "dynamic function behaviour" stdlib...
loglevel,
) > logging.getLevelName('PDB')
):
loglevel = 'PDB'
elif debug_mode:
raise RuntimeError(
"Debug mode is only supported for the `trio` backend!"
)
if rpc_module_paths:
warnings.warn(
"`rpc_module_paths` is now deprecated, use "
" `enable_modules` instead.",
DeprecationWarning,
stacklevel=2,
)
enable_modules.extend(rpc_module_paths)
assert loglevel
_log = log.get_console_log(loglevel)
assert _log
if start_method is not None:
_spawn.try_set_start_method(start_method)
# TODO: factor this into `.devx._stackscope`!!
if (
debug_mode
and
enable_stack_on_sig
):
from .devx._stackscope import enable_stack_on_sig
enable_stack_on_sig()
# TODO! remove this ASAP!
if arbiter_addr is not None:
warnings.warn(
'`arbiter_addr` is now deprecated\n'
'Use `registry_addrs: list[tuple]` instead..',
DeprecationWarning,
stacklevel=2,
)
registry_addrs = [arbiter_addr]
# closed into below ping task-func
ponged_addrs: list[AddressTypes] = []
if not registry_addrs:
registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
enable_transports
async def ping_tpt_socket(
addr: AddressTypes,
timeout: float = 1,
) -> None:
'''
Attempt temporary connection to see if a registry is
listening at the requested address by a tranport layer
ping.
If a connection can't be made quickly we assume none no
server is listening at that addr.
'''
try:
# TODO: this connect-and-bail forces us to have to
# carefully rewrap TCP 104-connection-reset errors as
# EOF so as to avoid propagating cancel-causing errors
# to the channel-msg loop machinery. Likely it would
# be better to eventually have a "discovery" protocol
# with basic handshake instead?
with trio.move_on_after(timeout):
async with _connect_chan(addr):
ponged_addrs.append(addr)
except OSError:
# TODO: make this a "discovery" log level?
logger.info(
f'No actor registry found @ {addr}\n'
)
assert registry_addrs
async with trio.open_nursery() as tn:
for addr in registry_addrs:
tn.start_soon(
ping_tpt_socket,
addr,
)
loglevel = (
loglevel
or log._default_loglevel
).upper()
trans_bind_addrs: list[AddressTypes] = []
if (
debug_mode
and _spawn._spawn_method == 'trio'
):
_state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing for
# use of ``await tractor.pause()``
enable_modules.append('tractor.devx._debug')
# if debug mode get's enabled *at least* use that level of
# logging for some informative console prompts.
if (
logging.getLevelName(
# lul, need the upper case for the -> int map?
# sweet "dynamic function behaviour" stdlib...
loglevel,
) > logging.getLevelName('PDB')
):
loglevel = 'PDB'
elif debug_mode:
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
if ponged_addrs:
if ensure_registry:
raise RuntimeError(
"Debug mode is only supported for the `trio` backend!"
f'Failed to open `{name}`@{ponged_addrs}: '
'registry socket(s) already bound'
)
assert loglevel
_log = log.get_console_log(loglevel)
assert _log
# we were able to connect to an arbiter
logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
)
# TODO: factor this into `.devx._stackscope`!!
actor = Actor(
name=name or 'anonymous',
registry_addrs=ponged_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
for addr in ponged_addrs:
waddr = wrap_address(addr)
print(waddr)
trans_bind_addrs.append(
waddr.get_random(namespace=waddr.namespace)
)
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
else:
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
# - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
# actors are terminated (via SC supervision) or,
# a re-election process has taken place.
# NOTE: all of ^ which is not implemented yet - see:
# https://github.com/goodboy/tractor/issues/216
# https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296
actor = Arbiter(
name or 'registrar',
registry_addrs=registry_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
# XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
ml_addrs_str: str = '\n'.join(
f'@{addr}' for addr in trans_bind_addrs
)
logger.info(
f'Starting local {actor.uid} on the following transport addrs:\n'
f'{ml_addrs_str}'
)
# start the actor runtime in a new task
async with trio.open_nursery(
strict_exception_groups=False,
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
) as nursery:
# ``_runtime.async_main()`` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
# "end-to-end" structured concurrency throughout an
# entire hierarchical python sub-process set; all
# "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as
# well.
await nursery.start(
partial(
async_main,
actor,
accept_addrs=trans_bind_addrs,
parent_addr=None
)
)
try:
yield actor
except (
Exception,
BaseExceptionGroup,
) as err:
# TODO, in beginning to handle the subsubactor with
# crashed grandparent cases..
#
# was_locked: bool = await _debug.maybe_wait_for_debugger(
# child_in_debug=True,
# )
# XXX NOTE XXX see equiv note inside
# `._runtime.Actor._stream_handler()` where in the
# non-root or root-that-opened-this-mahually case we
# wait for the local actor-nursery to exit before
# exiting the transport channel handler.
entered: bool = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
debug_filter=debug_filter,
)
if (
not entered
and
not is_multi_cancelled(
err,
)
):
logger.exception('Root actor crashed\n')
# ALWAYS re-raise any error bubbled up from the
# runtime!
raise
finally:
# NOTE: not sure if we'll ever need this but it's
# possibly better for even more determinism?
# logger.cancel(
# f'Waiting on {len(nurseries)} nurseries in root..')
# nurseries = actor._actoruid2nursery.values()
# async with trio.open_nursery() as tempn:
# for an in nurseries:
# tempn.start_soon(an.exited.wait)
logger.info(
'Closing down root actor'
)
await actor.cancel(None) # self cancel
finally:
_state._current_actor = None
_state._last_actor_terminated = actor
# restore built-in `breakpoint()` hook state
if (
debug_mode
and
enable_stack_on_sig
maybe_enable_greenback
):
from .devx._stackscope import enable_stack_on_sig
enable_stack_on_sig()
if builtin_bp_handler is not None:
sys.breakpointhook = builtin_bp_handler
# closed into below ping task-func
ponged_addrs: list[UnwrappedAddress] = []
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
async def ping_tpt_socket(
addr: UnwrappedAddress,
timeout: float = 1,
) -> None:
'''
Attempt temporary connection to see if a registry is
listening at the requested address by a tranport layer
ping.
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT', None)
If a connection can't be made quickly we assume none no
server is listening at that addr.
'''
try:
# TODO: this connect-and-bail forces us to have to
# carefully rewrap TCP 104-connection-reset errors as
# EOF so as to avoid propagating cancel-causing errors
# to the channel-msg loop machinery. Likely it would
# be better to eventually have a "discovery" protocol
# with basic handshake instead?
with trio.move_on_after(timeout):
async with _connect_chan(addr):
ponged_addrs.append(addr)
except OSError:
# TODO: make this a "discovery" log level?
logger.info(
f'No actor registry found @ {addr}\n'
)
async with trio.open_nursery() as tn:
for addr in registry_addrs:
tn.start_soon(
ping_tpt_socket,
addr,
)
trans_bind_addrs: list[UnwrappedAddress] = []
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
if ponged_addrs:
if ensure_registry:
raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: '
'registry socket(s) already bound'
)
# we were able to connect to an arbiter
logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
)
actor = Actor(
name=name or 'anonymous',
uuid=mk_uuid(),
registry_addrs=ponged_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
for addr in ponged_addrs:
waddr: Address = wrap_address(addr)
trans_bind_addrs.append(
waddr.get_random(bindspace=waddr.bindspace)
)
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
else:
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
# - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
# actors are terminated (via SC supervision) or,
# a re-election process has taken place.
# NOTE: all of ^ which is not implemented yet - see:
# https://github.com/goodboy/tractor/issues/216
# https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296
actor = Arbiter(
name=name or 'registrar',
uuid=mk_uuid(),
registry_addrs=registry_addrs,
loglevel=loglevel,
enable_modules=enable_modules,
)
# XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
ml_addrs_str: str = '\n'.join(
f'@{addr}' for addr in trans_bind_addrs
)
logger.info(
f'Starting local {actor.uid} on the following transport addrs:\n'
f'{ml_addrs_str}'
)
# start the actor runtime in a new task
async with trio.open_nursery(
strict_exception_groups=False,
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
) as nursery:
# ``_runtime.async_main()`` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
# "end-to-end" structured concurrency throughout an
# entire hierarchical python sub-process set; all
# "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as
# well.
await nursery.start(
partial(
async_main,
actor,
accept_addrs=trans_bind_addrs,
parent_addr=None
)
)
try:
yield actor
except (
Exception,
BaseExceptionGroup,
) as err:
# TODO, in beginning to handle the subsubactor with
# crashed grandparent cases..
#
# was_locked: bool = await _debug.maybe_wait_for_debugger(
# child_in_debug=True,
# )
# XXX NOTE XXX see equiv note inside
# `._runtime.Actor._stream_handler()` where in the
# non-root or root-that-opened-this-mahually case we
# wait for the local actor-nursery to exit before
# exiting the transport channel handler.
entered: bool = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
debug_filter=debug_filter,
)
if (
not entered
and
not is_multi_cancelled(
err,
)
):
logger.exception(
'Root actor crashed\n'
f'>x)\n'
f' |_{actor}\n'
)
# ALWAYS re-raise any error bubbled up from the
# runtime!
raise
finally:
# NOTE: not sure if we'll ever need this but it's
# possibly better for even more determinism?
# logger.cancel(
# f'Waiting on {len(nurseries)} nurseries in root..')
# nurseries = actor._actoruid2nursery.values()
# async with trio.open_nursery() as tempn:
# for an in nurseries:
# tempn.start_soon(an.exited.wait)
logger.info(
f'Closing down root actor\n'
f'>)\n'
f'|_{actor}\n'
)
await actor.cancel(None) # self cancel
finally:
_state._current_actor = None
_state._last_actor_terminated = actor
logger.runtime(
f'Root actor terminated\n'
f')>\n'
f' |_{actor}\n'
)
logger.runtime("Root actor terminated")
def run_daemon(
@ -516,7 +462,7 @@ def run_daemon(
# runtime kwargs
name: str | None = 'root',
registry_addrs: list[UnwrappedAddress]|None = None,
registry_addrs: list[AddressTypes]|None = None,
start_method: str | None = None,
debug_mode: bool = False,

View File

@ -1156,7 +1156,7 @@ async def process_messages(
trio.Event(),
)
# XXX RUNTIME-SCOPED! remote (likely internal) error
# runtime-scoped remote (internal) error
# (^- bc no `Error.cid` -^)
#
# NOTE: this is the non-rpc error case, that
@ -1219,10 +1219,8 @@ async def process_messages(
# -[ ] figure out how this will break with other transports?
tc.report_n_maybe_raise(
message=(
f'peer IPC channel closed abruptly?\n'
f'\n'
f'<=x[\n'
f' {chan}\n'
f'peer IPC channel closed abruptly?\n\n'
f'<=x {chan}\n'
f' |_{chan.raddr}\n\n'
)
+

View File

@ -52,7 +52,6 @@ import sys
from typing import (
Any,
Callable,
Type,
TYPE_CHECKING,
)
import uuid
@ -76,11 +75,11 @@ from tractor.msg import (
)
from .ipc import Channel
from ._addr import (
UnwrappedAddress,
AddressTypes,
Address,
default_lo_addrs,
get_address_cls,
wrap_address,
preferred_transport,
default_lo_addrs
)
from ._context import (
mk_context,
@ -183,15 +182,15 @@ class Actor:
def __init__(
self,
name: str,
uuid: str,
*,
enable_modules: list[str] = [],
uid: str|None = None,
loglevel: str|None = None,
registry_addrs: list[UnwrappedAddress]|None = None,
registry_addrs: list[AddressTypes]|None = None,
spawn_method: str|None = None,
# TODO: remove!
arbiter_addr: UnwrappedAddress|None = None,
arbiter_addr: AddressTypes|None = None,
) -> None:
'''
@ -199,14 +198,12 @@ class Actor:
phase (aka before a new process is executed).
'''
self._aid = msgtypes.Aid(
name=name,
uuid=uuid,
pid=os.getpid(),
self.name = name
self.uid = (
name,
uid or str(uuid.uuid4())
)
self._task: trio.Task|None = None
# state
self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple]|None = None
self._cancel_called: bool = False
@ -233,7 +230,7 @@ class Actor:
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[UnwrappedAddress] = [arbiter_addr]
registry_addrs: list[AddressTypes] = [arbiter_addr]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -280,97 +277,13 @@ class Actor:
# when provided, init the registry addresses property from
# input via the validator.
self._reg_addrs: list[UnwrappedAddress] = []
self._reg_addrs: list[AddressTypes] = []
if registry_addrs:
self.reg_addrs: list[UnwrappedAddress] = registry_addrs
self.reg_addrs: list[AddressTypes] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs
@property
def aid(self) -> msgtypes.Aid:
'''
This process-singleton-actor's "unique actor ID" in struct form.
See the `tractor.msg.Aid` struct for details.
'''
return self._aid
@property
def name(self) -> str:
return self._aid.name
@property
def uid(self) -> tuple[str, str]:
'''
This process-singleton's "unique (cross-host) ID".
Delivered from the `.Aid.name/.uuid` fields as a `tuple` pair
and should be multi-host unique despite a large distributed
process plane.
'''
msg: str = (
f'`{type(self).__name__}.uid` is now deprecated.\n'
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
'which also provides additional named (optional) fields '
'beyond just the `.name` and `.uuid`.'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
return (
self._aid.name,
self._aid.uuid,
)
@property
def pid(self) -> int:
return self._aid.pid
def pformat(self) -> str:
ds: str = '='
parent_uid: tuple|None = None
if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid
peers: list[tuple] = list(self._peer_connected)
listen_addrs: str = pformat(self._listen_addrs)
fmtstr: str = (
f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n"
f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f" _listen_addrs{ds}'{listen_addrs}'\n"
f" _listeners{ds}'{self._listeners}'\n"
f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n'
f'\n'
f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n'
)
return (
'<Actor(\n'
+
fmtstr
+
')>\n'
)
__repr__ = pformat
@property
def reg_addrs(self) -> list[UnwrappedAddress]:
def reg_addrs(self) -> list[AddressTypes]:
'''
List of (socket) addresses for all known (and contactable)
registry actors.
@ -381,7 +294,7 @@ class Actor:
@reg_addrs.setter
def reg_addrs(
self,
addrs: list[UnwrappedAddress],
addrs: list[AddressTypes],
) -> None:
if not addrs:
log.warning(
@ -507,23 +420,14 @@ class Actor:
# send/receive initial handshake response
try:
peer_aid: msgtypes.Aid = await chan._do_handshake(
aid=self.aid,
)
uid: tuple|None = await self._do_handshake(chan)
except (
TransportClosed,
# ^XXX NOTE, the above wraps `trio` exc types raised
# during various `SocketStream.send/receive_xx()` calls
# under different fault conditions such as,
#
# we need this for ``msgspec`` for some reason?
# for now, it's been put in the stream backend.
# trio.BrokenResourceError,
# trio.ClosedResourceError,
#
# Inside our `.ipc._transport` layer we absorb and
# re-raise our own `TransportClosed` exc such that this
# higher level runtime code can only worry one
# "kinda-error" that we expect to tolerate during
# discovery-sys related pings, queires, DoS etc.
TransportClosed,
):
# XXX: This may propagate up from `Channel._aiter_recv()`
# and `MsgpackStream._inter_packets()` on a read from the
@ -538,12 +442,6 @@ class Actor:
)
return
uid: tuple[str, str] = (
peer_aid.name,
peer_aid.uuid,
)
# TODO, can we make this downstream peer tracking use the
# `peer_aid` instead?
familiar: str = 'new-peer'
if _pre_chan := self._peers.get(uid):
familiar: str = 'pre-existing-peer'
@ -1125,12 +1023,11 @@ class Actor:
async def _from_parent(
self,
parent_addr: UnwrappedAddress|None,
parent_addr: AddressTypes|None,
) -> tuple[
Channel,
list[UnwrappedAddress]|None,
list[str]|None, # preferred tpts
list[AddressTypes]|None,
]:
'''
Bootstrap this local actor's runtime config from its parent by
@ -1142,57 +1039,30 @@ class Actor:
# Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we
# attempt to ship the exception back to the parent.
chan = await Channel.from_addr(
addr=wrap_address(parent_addr)
)
assert isinstance(chan, Channel)
chan = await Channel.from_addr(wrap_address(parent_addr))
# TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await chan._do_handshake(aid=self.aid)
await self._do_handshake(chan)
accept_addrs: list[UnwrappedAddress]|None = None
accept_addrs: list[AddressTypes]|None = None
if self._spawn_method == "trio":
# Receive post-spawn runtime state from our parent.
spawnspec: msgtypes.SpawnSpec = await chan.recv()
match spawnspec:
case MsgTypeError():
raise spawnspec
case msgtypes.SpawnSpec():
self._spawn_spec = spawnspec
log.runtime(
'Received runtime spec from parent:\n\n'
self._spawn_spec = spawnspec
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pretty_struct.pformat(spawnspec)}\n'
)
log.runtime(
'Received runtime spec from parent:\n\n'
case _:
raise InternalError(
f'Received invalid non-`SpawnSpec` payload !?\n'
f'{spawnspec}\n'
)
# ^^TODO XXX!! when the `SpawnSpec` fails to decode
# the above will raise a `MsgTypeError` which if we
# do NOT ALSO RAISE it will tried to be pprinted in
# the log.runtime() below..
#
# SO we gotta look at how other `chan.recv()` calls
# are wrapped and do the same for this spec receive!
# -[ ] see `._rpc` likely has the answer?
#
# XXX NOTE, can't be called here in subactor
# bc we haven't yet received the
# `SpawnSpec._runtime_vars: dict` which would
# declare whether `debug_mode` is set!
# breakpoint()
# import pdbp; pdbp.set_trace()
accept_addrs: list[UnwrappedAddress] = spawnspec.bind_addrs
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pretty_struct.pformat(spawnspec)}\n'
)
accept_addrs: list[AddressTypes] = spawnspec.bind_addrs
# TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars
@ -1284,21 +1154,13 @@ class Actor:
return (
chan,
accept_addrs,
None,
# ^TODO, preferred tpts list from rent!
# -[ ] need to extend the `SpawnSpec` tho!
)
# failed to connect back?
except (
OSError,
ConnectionError,
):
except OSError: # failed to connect
log.warning(
f'Failed to connect to spawning parent actor!?\n'
f'\n'
f'x=> {parent_addr}\n'
f' |_{self}\n\n'
f'|_{self}\n\n'
)
await self.cancel(req_chan=None) # self cancel
raise
@ -1307,13 +1169,12 @@ class Actor:
self,
handler_nursery: Nursery,
*,
listen_addrs: list[UnwrappedAddress]|None = None,
listen_addrs: list[AddressTypes]|None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Start the IPC transport server, begin listening/accepting new
`trio.SocketStream` connections.
Start the IPC transport server, begin listening for new connections.
This will cause an actor to continue living (and thus
blocking at the process/OS-thread level) until
@ -1321,9 +1182,7 @@ class Actor:
'''
if listen_addrs is None:
listen_addrs = default_lo_addrs([
_state._def_tpt_proto
])
listen_addrs = default_lo_addrs([preferred_transport])
else:
listen_addrs: list[Address] = [
@ -1333,24 +1192,10 @@ class Actor:
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
listeners: list[trio.abc.Listener] = []
for addr in listen_addrs:
try:
listener: trio.abc.Listener = await addr.open_listener()
except OSError as oserr:
if (
'[Errno 98] Address already in use'
in
oserr.args#[0]
):
log.exception(
f'Address already in use?\n'
f'{addr}\n'
)
raise
listeners.append(listener)
listeners: list[trio.abc.Listener] = [
await addr.open_listener()
for addr in listen_addrs
]
await server_n.start(
partial(
trio.serve_listeners,
@ -1363,10 +1208,8 @@ class Actor:
handler_nursery=handler_nursery
)
)
# TODO, wow make this message better! XD
log.info(
log.runtime(
'Started server(s)\n'
+
'\n'.join([f'|_{addr}' for addr in listen_addrs])
)
self._listen_addrs.extend(listen_addrs)
@ -1375,10 +1218,8 @@ class Actor:
task_status.started(server_n)
finally:
addr: Address
for addr in listen_addrs:
addr.close_listener()
await addr.close_listener()
# signal the server is down since nursery above terminated
self._server_down.set()
@ -1485,13 +1326,8 @@ class Actor:
if self._server_down is not None:
await self._server_down.wait()
else:
tpt_protos: list[str] = []
addr: Address
for addr in self._listen_addrs:
tpt_protos.append(addr.proto_key)
log.warning(
'Transport server(s) may have been cancelled before started?\n'
f'protos: {tpt_protos!r}\n'
'Transport[TCP] server was cancelled start?'
)
# cancel all rpc tasks permanently
@ -1742,7 +1578,7 @@ class Actor:
return False
@property
def accept_addrs(self) -> list[UnwrappedAddress]:
def accept_addrs(self) -> list[AddressTypes]:
'''
All addresses to which the transport-channel server binds
and listens for new connections.
@ -1751,7 +1587,7 @@ class Actor:
return [a.unwrap() for a in self._listen_addrs]
@property
def accept_addr(self) -> UnwrappedAddress:
def accept_addr(self) -> AddressTypes:
'''
Primary address to which the IPC transport server is
bound and listening for new connections.
@ -1778,6 +1614,43 @@ class Actor:
'''
return self._peers[uid]
# TODO: move to `Channel.handshake(uid)`
async def _do_handshake(
self,
chan: Channel
) -> msgtypes.Aid:
'''
Exchange `(name, UUIDs)` identifiers as the first
communication step with any (peer) remote `Actor`.
These are essentially the "mailbox addresses" found in
"actor model" parlance.
'''
name, uuid = self.uid
await chan.send(
msgtypes.Aid(
name=name,
uuid=uuid,
)
)
aid: msgtypes.Aid = await chan.recv()
chan.aid = aid
uid: tuple[str, str] = (
# str(value[0]),
# str(value[1])
aid.name,
aid.uuid,
)
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
return uid
def is_infected_aio(self) -> bool:
'''
If `True`, this actor is running `trio` in guest mode on
@ -1791,7 +1664,7 @@ class Actor:
async def async_main(
actor: Actor,
accept_addrs: UnwrappedAddress|None = None,
accept_addrs: AddressTypes|None = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
@ -1800,7 +1673,7 @@ async def async_main(
# change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as
# a subactor.
parent_addr: UnwrappedAddress|None = None,
parent_addr: AddressTypes|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -1815,8 +1688,6 @@ async def async_main(
the actor's "runtime" and all thus all ongoing RPC tasks.
'''
actor._task: trio.Task = trio.lowlevel.current_task()
# attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
@ -1826,15 +1697,13 @@ async def async_main(
# establish primary connection with immediate parent
actor._parent_chan: Channel|None = None
if parent_addr is not None:
(
actor._parent_chan,
set_accept_addr_says_rent,
maybe_preferred_transports_says_rent,
) = await actor._from_parent(parent_addr)
accept_addrs: list[UnwrappedAddress] = []
# either it's passed in because we're not a child or
# because we're running in mp mode
if (
@ -1843,18 +1712,6 @@ async def async_main(
set_accept_addr_says_rent is not None
):
accept_addrs = set_accept_addr_says_rent
else:
enable_transports: list[str] = (
maybe_preferred_transports_says_rent
or
[_state._def_tpt_proto]
)
for transport_key in enable_transports:
transport_cls: Type[Address] = get_address_cls(
transport_key
)
addr: Address = transport_cls.get_random()
accept_addrs.append(addr.unwrap())
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
@ -1922,7 +1779,7 @@ async def async_main(
raise
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
accept_addrs: list[AddressTypes] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
@ -2135,15 +1992,15 @@ async def async_main(
log.info(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`!
# TODO: rename to `Registry` and move to `._discovery`!
class Arbiter(Actor):
'''
A special registrar (and for now..) `Actor` who can contact all
other actors within its immediate process tree and possibly keeps
a registry of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor" and
thus always has access to the top-most-level actor (process)
nursery.
A special registrar actor who can contact all other actors
within its immediate process tree and possibly keeps a registry
of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor"
and thus always has access to the top-most-level actor
(process) nursery.
By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime
@ -2163,12 +2020,6 @@ class Arbiter(Actor):
'''
is_arbiter = True
# TODO, implement this as a read on there existing a `._state` of
# some sort setup by whenever we impl this all as
# a `.discovery._registry.open_registry()` API
def is_registry(self) -> bool:
return self.is_arbiter
def __init__(
self,
*args,
@ -2177,7 +2028,7 @@ class Arbiter(Actor):
self._registry: dict[
tuple[str, str],
UnwrappedAddress,
AddressTypes,
] = {}
self._waiters: dict[
str,
@ -2193,7 +2044,7 @@ class Arbiter(Actor):
self,
name: str,
) -> UnwrappedAddress|None:
) -> AddressTypes|None:
for uid, addr in self._registry.items():
if name in uid:
@ -2204,7 +2055,7 @@ class Arbiter(Actor):
async def get_registry(
self
) -> dict[str, UnwrappedAddress]:
) -> dict[str, AddressTypes]:
'''
Return current name registry.
@ -2224,7 +2075,7 @@ class Arbiter(Actor):
self,
name: str,
) -> list[UnwrappedAddress]:
) -> list[AddressTypes]:
'''
Wait for a particular actor to register.
@ -2232,8 +2083,8 @@ class Arbiter(Actor):
registered.
'''
addrs: list[UnwrappedAddress] = []
addr: UnwrappedAddress
addrs: list[AddressTypes] = []
addr: AddressTypes
mailbox_info: str = 'Actor registry contact infos:\n'
for uid, addr in self._registry.items():
@ -2259,7 +2110,7 @@ class Arbiter(Actor):
async def register_actor(
self,
uid: tuple[str, str],
addr: UnwrappedAddress
addr: AddressTypes
) -> None:
uid = name, hash = (str(uid[0]), str(uid[1]))
waddr: Address = wrap_address(addr)

View File

@ -46,13 +46,12 @@ from tractor._state import (
_runtime_vars,
)
from tractor.log import get_logger
from tractor._addr import UnwrappedAddress
from tractor._addr import AddressTypes
from tractor._portal import Portal
from tractor._runtime import Actor
from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure
from tractor.msg.types import (
Aid,
SpawnSpec,
)
@ -165,7 +164,7 @@ async def exhaust_portal(
# TODO: merge with above?
log.warning(
'Cancelled portal result waiter task:\n'
f'uid: {portal.channel.aid}\n'
f'uid: {portal.channel.uid}\n'
f'error: {err}\n'
)
return err
@ -173,7 +172,7 @@ async def exhaust_portal(
else:
log.debug(
f'Returning final result from portal:\n'
f'uid: {portal.channel.aid}\n'
f'uid: {portal.channel.uid}\n'
f'result: {final}\n'
)
return final
@ -326,12 +325,12 @@ async def soft_kill(
see `.hard_kill()`).
'''
peer_aid: Aid = portal.channel.aid
uid: tuple[str, str] = portal.channel.uid
try:
log.cancel(
f'Soft killing sub-actor via portal request\n'
f'\n'
f'(c=> {peer_aid}\n'
f'(c=> {portal.chan.uid}\n'
f' |_{proc}\n'
)
# wait on sub-proc to signal termination
@ -380,7 +379,7 @@ async def soft_kill(
if proc.poll() is None: # type: ignore
log.warning(
'Subactor still alive after cancel request?\n\n'
f'uid: {peer_aid}\n'
f'uid: {uid}\n'
f'|_{proc}\n'
)
n.cancel_scope.cancel()
@ -394,8 +393,8 @@ async def new_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
parent_addr: UnwrappedAddress,
bind_addrs: list[AddressTypes],
parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@ -433,8 +432,8 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
parent_addr: UnwrappedAddress,
bind_addrs: list[AddressTypes],
parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
@ -461,9 +460,6 @@ async def trio_proc(
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
# TODO, how to pass this over "wire" encodings like
# cmdline args?
# -[ ] maybe we can add an `Aid.min_tuple()` ?
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
@ -643,8 +639,8 @@ async def mp_proc(
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
parent_addr: UnwrappedAddress,
bind_addrs: list[AddressTypes],
parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
@ -729,8 +725,7 @@ async def mp_proc(
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid,
)
subactor.uid)
# XXX: monkey patch poll API to match the ``subprocess`` API..
# not sure why they don't expose this but kk.

View File

@ -14,19 +14,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Per actor-process runtime state mgmt APIs.
"""
Per process state
'''
"""
from __future__ import annotations
from contextvars import (
ContextVar,
)
import os
from pathlib import Path
from typing import (
Any,
Literal,
TYPE_CHECKING,
)
@ -146,30 +143,3 @@ def current_ipc_ctx(
f'|_{current_task()}\n'
)
return ctx
# std ODE (mutable) app state location
_rtdir: Path = Path(os.environ['XDG_RUNTIME_DIR'])
def get_rt_dir(
subdir: str = 'tractor'
) -> Path:
'''
Return the user "runtime dir" where most userspace apps stick
their IPC and cache related system util-files; we take hold
of a `'XDG_RUNTIME_DIR'/tractor/` subdir by default.
'''
rtdir: Path = _rtdir / subdir
if not rtdir.is_dir():
rtdir.mkdir()
return rtdir
# default IPC transport protocol settings
TransportProtocolKey = Literal[
'tcp',
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'

View File

@ -437,23 +437,22 @@ class MsgStream(trio.abc.Channel):
message: str = (
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
# } bc a stream is a "scope"/msging-phase inside an IPC
f'c}}>\n'
f'x}}>\n'
f' |_{self}\n'
)
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
if (
(rx_chan := self._rx_chan)
and
(stats := rx_chan.statistics()).tasks_waiting_receive
):
message += (
f'AND there is still reader tasks,\n'
f'\n'
log.cancel(
f'Msg-stream is closing but there is still reader tasks,\n'
f'{stats}\n'
)
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <=
# if we're a bi-dir `MsgStream` BECAUSE this same
@ -596,17 +595,8 @@ class MsgStream(trio.abc.Channel):
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as _trans_err:
trans_err = _trans_err
if (
hide_tb
and
self._ctx.chan._exc is trans_err
# ^XXX, IOW, only if the channel is marked errored
# for the same reason as whatever its underlying
# transport raised, do we keep the full low-level tb
# suppressed from the user.
):
) as trans_err:
if hide_tb:
raise type(trans_err)(
*trans_err.args
) from trans_err
@ -812,12 +802,13 @@ async def open_stream_from_ctx(
# sanity, can remove?
assert eoc is stream._eoc
log.runtime(
log.warning(
'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but
# does show txt followed by IPC msg.
f'{str(eoc)}\n'
)
finally:
if ctx._portal:
try:

View File

@ -22,9 +22,7 @@ from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from pprint import pformat
from typing import (
TYPE_CHECKING,
)
from typing import TYPE_CHECKING
import typing
import warnings
@ -33,8 +31,9 @@ import trio
from .devx._debug import maybe_wait_for_debugger
from ._addr import (
UnwrappedAddress,
mk_uuid,
AddressTypes,
preferred_transport,
get_address_cls
)
from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
@ -44,9 +43,7 @@ from ._exceptions import (
is_multi_cancelled,
ContextCancelled,
)
from ._root import (
open_root_actor,
)
from ._root import open_root_actor
from . import _state
from . import _spawn
@ -137,9 +134,9 @@ class ActorNursery:
*,
bind_addrs: list[UnwrappedAddress]|None = None,
bind_addrs: list[AddressTypes]|None = None,
rpc_module_paths: list[str]|None = None,
enable_transports: list[str] = [_state._def_tpt_proto],
enable_transports: list[str] = [preferred_transport],
enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor
debug_mode: bool|None = None,
@ -164,6 +161,12 @@ class ActorNursery:
or get_loglevel()
)
if not bind_addrs:
bind_addrs: list[AddressTypes] = [
get_address_cls(transport).get_random().unwrap()
for transport in enable_transports
]
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
@ -186,9 +189,7 @@ class ActorNursery:
enable_modules.extend(rpc_module_paths)
subactor = Actor(
name=name,
uuid=mk_uuid(),
name,
# modules allowed to invoked funcs from
enable_modules=enable_modules,
loglevel=loglevel,
@ -196,7 +197,7 @@ class ActorNursery:
# verbatim relay this actor's registrar addresses
registry_addrs=current_actor().reg_addrs,
)
parent_addr: UnwrappedAddress = self._actor.accept_addr
parent_addr = self._actor.accept_addr
assert parent_addr
# start a task to spawn a process
@ -234,7 +235,7 @@ class ActorNursery:
*,
name: str | None = None,
bind_addrs: UnwrappedAddress|None = None,
bind_addrs: AddressTypes|None = None,
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor

View File

@ -73,7 +73,6 @@ from tractor.log import get_logger
from tractor._context import Context
from tractor import _state
from tractor._exceptions import (
DebugRequestError,
InternalError,
NoRuntime,
is_multi_cancelled,
@ -1741,6 +1740,13 @@ def sigint_shield(
_pause_msg: str = 'Opening a pdb REPL in paused actor'
class DebugRequestError(RuntimeError):
'''
Failed to request stdio lock from root actor!
'''
_repl_fail_msg: str|None = (
'Failed to REPl via `_pause()` '
)

View File

@ -19,7 +19,6 @@ Pretty formatters for use throughout the code base.
Mostly handy for logging and exception message content.
'''
import sys
import textwrap
import traceback
@ -116,85 +115,6 @@ def pformat_boxed_tb(
)
def pformat_exc(
exc: Exception,
header: str = '',
message: str = '',
body: str = '',
with_type_header: bool = True,
) -> str:
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(curr_exc := sys.exception())
and
curr_exc is exc
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if (
with_type_header
and
not header
):
header: str = f'<{type(exc).__name__}('
message: str = (
message
or
exc.message
)
if message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
tail: str = ''
if (
with_type_header
and
not message
):
tail: str = '>'
return (
header
+
message
+
f'{body}'
+
tail
)
def pformat_caller_frame(
stack_limit: int = 1,
box_tb: bool = True,

View File

@ -45,8 +45,6 @@ __all__ = ['pub']
log = get_logger('messaging')
# TODO! this needs to reworked to use the modern
# `Context`/`MsgStream` APIs!!
async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: dict[str, list],

View File

@ -30,7 +30,6 @@ import typing
from typing import (
Any,
)
import warnings
import trio
@ -40,20 +39,16 @@ from tractor.ipc._types import (
transport_from_stream,
)
from tractor._addr import (
is_wrapped_addr,
wrap_address,
Address,
UnwrappedAddress,
AddressTypes
)
from tractor.log import get_logger
from tractor._exceptions import (
MsgTypeError,
pack_from_raise,
)
from tractor.msg import (
Aid,
MsgCodec,
)
from tractor.msg import MsgCodec
log = get_logger(__name__)
@ -89,12 +84,11 @@ class Channel:
# user in ``.from_stream()``.
self._transport: MsgTransport|None = transport
# set after handshake - always info from peer end
self.aid: Aid|None = None
# set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None
self._aiter_msgs = self._iter_msgs()
self._exc: Exception|None = None
# ^XXX! ONLY set if a remote actor sends an `Error`-msg
self._exc: Exception|None = None # set if far end actor errors
self._closed: bool = False
# flag set by ``Portal.cancel_actor()`` indicating remote
@ -102,29 +96,6 @@ class Channel:
# runtime.
self._cancel_called: bool = False
@property
def uid(self) -> tuple[str, str]:
'''
Peer actor's unique id.
'''
msg: str = (
f'`{type(self).__name__}.uid` is now deprecated.\n'
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
'which also provides additional named (optional) fields '
'beyond just the `.name` and `.uuid`.'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
peer_aid: Aid = self.aid
return (
peer_aid.name,
peer_aid.uuid,
)
@property
def stream(self) -> trio.abc.Stream | None:
return self._transport.stream if self._transport else None
@ -153,26 +124,17 @@ class Channel:
@classmethod
async def from_addr(
cls,
addr: UnwrappedAddress,
addr: AddressTypes,
**kwargs
) -> Channel:
if not is_wrapped_addr(addr):
addr: Address = wrap_address(addr)
addr: Address = wrap_address(addr)
transport_cls = transport_from_addr(addr)
transport = await transport_cls.connect_to(
addr,
**kwargs,
transport = await transport_cls.connect_to(addr, **kwargs)
log.transport(
f'Opened channel[{type(transport)}]: {transport.laddr} -> {transport.raddr}'
)
assert transport.raddr == addr
chan = Channel(transport=transport)
log.runtime(
f'Connected channel IPC transport\n'
f'[>\n'
f' |_{chan}\n'
)
return chan
return Channel(transport=transport)
@cm
def apply_codec(
@ -192,48 +154,16 @@ class Channel:
self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs?
def pformat(self) -> str:
def __repr__(self) -> str:
if not self._transport:
return '<Channel with inactive transport?>'
tpt: MsgTransport = self._transport
tpt_name: str = type(tpt).__name__
tpt_status: str = (
'connected' if self.connected()
else 'closed'
return repr(
self._transport
).replace( # type: ignore
"socket.socket",
"Channel",
)
return (
f'<Channel(\n'
f' |_status: {tpt_status!r}\n'
f' _closed={self._closed}\n'
f' _cancel_called={self._cancel_called}\n'
f'\n'
f' |_peer: {self.aid}\n'
f'\n'
f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n'
f' layer={tpt.layer_key!r}\n'
f' laddr={tpt.laddr}\n'
f' raddr={tpt.raddr}\n'
f' codec={tpt.codec_key!r}\n'
f' stream={tpt.stream}\n'
f' maddr={tpt.maddr!r}\n'
f' drained={tpt.drained}\n'
f' _send_lock={tpt._send_lock.statistics()}\n'
f')>\n'
)
# NOTE: making this return a value that can be passed to
# `eval()` is entirely **optional** FYI!
# https://docs.python.org/3/library/functions.html#repr
# https://docs.python.org/3/reference/datamodel.html#object.__repr__
#
# Currently we target **readability** from a (console)
# logging perspective over `eval()`-ability since we do NOT
# target serializing non-struct instances!
# def __repr__(self) -> str:
__str__ = pformat
__repr__ = pformat
@property
def laddr(self) -> Address|None:
@ -305,7 +235,7 @@ class Channel:
async def aclose(self) -> None:
log.transport(
f'Closing channel to {self.aid} '
f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}'
)
assert self._transport
@ -405,33 +335,10 @@ class Channel:
def connected(self) -> bool:
return self._transport.connected() if self._transport else False
async def _do_handshake(
self,
aid: Aid,
) -> Aid:
'''
Exchange `(name, UUIDs)` identifiers as the first
communication step with any (peer) remote `Actor`.
These are essentially the "mailbox addresses" found in
"actor model" parlance.
'''
await self.send(aid)
peer_aid: Aid = await self.recv()
log.runtime(
f'Received hanshake with peer actor,\n'
f'{peer_aid}\n'
)
# NOTE, we always are referencing the remote peer!
self.aid = peer_aid
return peer_aid
@acm
async def _connect_chan(
addr: UnwrappedAddress
addr: AddressTypes
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager

View File

@ -50,10 +50,7 @@ if _USE_POSIX:
try:
import numpy as np
from numpy.lib import recfunctions as rfn
# TODO ruff complains with,
# warning| F401: `nptyping` imported but unused; consider using
# `importlib.util.find_spec` to test for availability
import nptyping # noqa
import nptyping
except ImportError:
pass

View File

@ -42,16 +42,25 @@ class MsgpackTCPStream(MsgpackTransport):
address_type = TCPAddress
layer_key: int = 4
# def __init__(
# self,
# stream: trio.SocketStream,
# prefix_size: int = 4,
# codec: CodecType = None,
# ) -> None:
# super().__init__(
# stream,
# prefix_size=prefix_size,
# codec=codec
# )
@property
def maddr(self) -> str:
host, port = self.raddr.unwrap()
return (
# TODO, use `ipaddress` from stdlib to handle
# first detecting which of `ipv4/6` before
# choosing the routing prefix part.
f'/ipv4/{host}'
f'/{self.address_type.proto_key}/{port}'
f'/{self.address_type.name_key}/{port}'
# f'/{self.chan.uid[0]}'
# f'/{self.cid}'
@ -85,15 +94,12 @@ class MsgpackTCPStream(MsgpackTransport):
cls,
stream: trio.SocketStream
) -> tuple[
TCPAddress,
TCPAddress,
tuple[str, int],
tuple[str, int]
]:
# TODO, what types are these?
lsockname = stream.socket.getsockname()
l_sockaddr: tuple[str, int] = tuple(lsockname[:2])
rsockname = stream.socket.getpeername()
r_sockaddr: tuple[str, int] = tuple(rsockname[:2])
return (
TCPAddress.from_addr(l_sockaddr),
TCPAddress.from_addr(r_sockaddr),
TCPAddress.from_addr(tuple(lsockname[:2])),
TCPAddress.from_addr(tuple(rsockname[:2])),
)

View File

@ -99,7 +99,7 @@ class MsgTransport(Protocol[MsgType]):
@classmethod
def key(cls) -> MsgTransportKey:
return cls.codec_key, cls.address_type.proto_key
return cls.codec_key, cls.address_type.name_key
@property
def laddr(self) -> Address:
@ -136,16 +136,6 @@ class MsgTransport(Protocol[MsgType]):
'''
...
# TODO, such that all `.raddr`s for each `SocketStream` are
# delivered?
# -[ ] move `.open_listener()` here and internally track the
# listener set, per address?
# def get_peers(
# self,
# ) -> list[Address]:
# ...
class MsgpackTransport(MsgTransport):
@ -167,10 +157,7 @@ class MsgpackTransport(MsgTransport):
) -> None:
self.stream = stream
(
self._laddr,
self._raddr,
) = self.get_stream_addrs(stream)
self._laddr, self._raddr = self.get_stream_addrs(stream)
# create read loop instance
self._aiter_pkts = self._iter_packets()
@ -208,7 +195,6 @@ class MsgpackTransport(MsgTransport):
'''
decodes_failed: int = 0
tpt_name: str = f'{type(self).__name__!r}'
while True:
try:
header: bytes = await self.recv_stream.receive_exactly(4)
@ -253,9 +239,10 @@ class MsgpackTransport(MsgTransport):
raise TransportClosed(
message=(
f'{tpt_name} already closed by peer\n'
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
),
src_exc=trans_err,
loglevel=loglevel,
) from trans_err
@ -267,17 +254,18 @@ class MsgpackTransport(MsgTransport):
#
# NOTE: as such we always re-raise this error from the
# RPC msg loop!
except trio.ClosedResourceError as cre:
closure_err = cre
except trio.ClosedResourceError as closure_err:
raise TransportClosed(
message=(
f'{tpt_name} was already closed locally ?\n'
f'IPC transport already manually closed locally?\n'
f'x)> {type(closure_err)} \n'
f' |_{self}\n'
),
src_exc=closure_err,
loglevel='error',
raise_on_report=(
'another task closed this fd' in closure_err.args
closure_err.args[0] == 'another task closed this fd'
or
closure_err.args[0] in ['another task closed this fd']
),
) from closure_err
@ -285,9 +273,12 @@ class MsgpackTransport(MsgTransport):
if header == b'':
raise TransportClosed(
message=(
f'{tpt_name} already gracefully closed\n'
f'IPC transport already gracefully closed\n'
f')>\n'
f'|_{self}\n'
),
loglevel='transport',
# cause=??? # handy or no?
)
size: int
@ -418,39 +409,7 @@ class MsgpackTransport(MsgTransport):
# supposedly the fastest says,
# https://stackoverflow.com/a/54027962
size: bytes = struct.pack("<I", len(bytes_data))
try:
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
) as trans_err:
loglevel = 'transport'
match trans_err:
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
):
raise TransportClosed(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
),
loglevel=loglevel,
) from trans_err
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
'Transport layer failed for {self.transport!r} ?\n'
)
raise trans_err
return await self.stream.send_all(size + bytes_data)
# ?TODO? does it help ever to dynamically show this
# frame?
@ -489,16 +448,3 @@ class MsgpackTransport(MsgTransport):
@property
def raddr(self) -> Address:
return self._raddr
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_task: {self._task}\n'
f'\n'
f' |_peers: 2\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
f')>\n'
)
__repr__ = __str__ = pformat

View File

@ -76,7 +76,7 @@ def transport_from_stream(
'''
transport = None
if isinstance(stream, trio.SocketStream):
sock: socket.socket = stream.socket
sock = stream.socket
match sock.family:
case socket.AF_INET | socket.AF_INET6:
transport = 'tcp'

View File

@ -18,111 +18,46 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
'''
from __future__ import annotations
from pathlib import Path
import os
from socket import (
# socket,
AF_UNIX,
SOCK_STREAM,
SO_PASSCRED,
SO_PEERCRED,
SOL_SOCKET,
)
import struct
import trio
from trio._highlevel_open_unix_stream import (
close_on_error,
has_unix,
)
from tractor.msg import MsgCodec
from tractor.log import get_logger
from tractor._addr import (
UDSAddress,
unwrap_sockpath,
)
from tractor._addr import UDSAddress
from tractor.ipc._transport import MsgpackTransport
log = get_logger(__name__)
async def open_unix_socket_w_passcred(
filename: str|bytes|os.PathLike[str]|os.PathLike[bytes],
) -> trio.SocketStream:
'''
Literally the exact same as `trio.open_unix_socket()` except we set the additiona
`socket.SO_PASSCRED` option to ensure the server side (the process calling `accept()`)
can extract the connecting peer's credentials, namely OS specific process
related IDs.
See this SO for "why" the extra opts,
- https://stackoverflow.com/a/7982749
'''
if not has_unix:
raise RuntimeError("Unix sockets are not supported on this platform")
# much more simplified logic vs tcp sockets - one socket type and only one
# possible location to connect to
sock = trio.socket.socket(AF_UNIX, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_PASSCRED, 1)
with close_on_error(sock):
await sock.connect(os.fspath(filename))
return trio.SocketStream(sock)
def get_peer_info(sock: trio.socket.socket) -> tuple[
int, # pid
int, # uid
int, # guid
]:
'''
Deliver the connecting peer's "credentials"-info as defined in
a very Linux specific way..
For more deats see,
- `man accept`,
- `man unix`,
this great online guide to all things sockets,
- https://beej.us/guide/bgnet/html/split-wide/man-pages.html#setsockoptman
AND this **wonderful SO answer**
- https://stackoverflow.com/a/7982749
'''
creds: bytes = sock.getsockopt(
SOL_SOCKET,
SO_PEERCRED,
struct.calcsize('3i')
)
# i.e a tuple of the fields,
# pid: int, "process"
# uid: int, "user"
# gid: int, "group"
return struct.unpack('3i', creds)
class MsgpackUDSStream(MsgpackTransport):
'''
A `trio.SocketStream` around a Unix-Domain-Socket transport
delivering `msgpack` encoded msgs using the `msgspec` codec lib.
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using the ``msgspec`` codec lib.
'''
address_type = UDSAddress
layer_key: int = 4
layer_key: int = 7
# def __init__(
# self,
# stream: trio.SocketStream,
# prefix_size: int = 4,
# codec: CodecType = None,
# ) -> None:
# super().__init__(
# stream,
# prefix_size=prefix_size,
# codec=codec
# )
@property
def maddr(self) -> str:
if not self.raddr:
return '<unknown-peer>'
filepath: Path = Path(self.raddr.unwrap()[0])
filepath = self.raddr.unwrap()
return (
f'/{self.address_type.proto_key}/{filepath}'
f'/ipv4/localhost'
f'/{self.address_type.name_key}/{filepath}'
# f'/{self.chan.uid[0]}'
# f'/{self.cid}'
@ -141,76 +76,22 @@ class MsgpackUDSStream(MsgpackTransport):
codec: MsgCodec|None = None,
**kwargs
) -> MsgpackUDSStream:
sockpath: Path = addr.sockpath
#
# ^XXX NOTE, we don't provide any out-of-band `.pid` info
# (like, over the socket as extra msgs) since the (augmented)
# `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via
# `get_peer_info()` above.
try:
stream = await open_unix_socket_w_passcred(
str(sockpath),
**kwargs
)
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
stream = MsgpackUDSStream(
stream = await trio.open_unix_socket(
addr.unwrap(),
**kwargs
)
return MsgpackUDSStream(
stream,
prefix_size=prefix_size,
codec=codec
)
stream._raddr = addr
return stream
@classmethod
def get_stream_addrs(
cls,
stream: trio.SocketStream
) -> tuple[
Path,
int,
]:
sock: trio.socket.socket = stream.socket
# NOTE XXX, it's unclear why one or the other ends up being
# `bytes` versus the socket-file-path, i presume it's
# something to do with who is the server (called `.listen()`)?
# maybe could be better implemented using another info-query
# on the socket like,
# https://beej.us/guide/bgnet/html/split-wide/system-calls-or-bust.html#gethostnamewho-am-i
sockname: str|bytes = sock.getsockname()
# https://beej.us/guide/bgnet/html/split-wide/system-calls-or-bust.html#getpeernamewho-are-you
peername: str|bytes = sock.getpeername()
match (peername, sockname):
case (str(), bytes()):
sock_path: Path = Path(peername)
case (bytes(), str()):
sock_path: Path = Path(sockname)
(
peer_pid,
_,
_,
) = get_peer_info(sock)
filedir, filename = unwrap_sockpath(sock_path)
laddr = UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=os.getpid(),
) -> tuple[UDSAddress, UDSAddress]:
return (
UDSAddress.from_addr(stream.socket.getsockname()),
UDSAddress.from_addr(stream.socket.getsockname()),
)
raddr = UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=peer_pid
)
return (laddr, raddr)

View File

@ -31,7 +31,6 @@ from typing import (
Type,
TypeVar,
TypeAlias,
# TYPE_CHECKING,
Union,
)
@ -48,7 +47,7 @@ from tractor.msg import (
pretty_struct,
)
from tractor.log import get_logger
from tractor._addr import UnwrappedAddress
from tractor._addr import AddressTypes
log = get_logger('tractor.msgspec')
@ -143,16 +142,9 @@ class Aid(
'''
name: str
uuid: str
pid: int|None = None
# TODO? can/should we extend this field set?
# -[ ] use built-in support for UUIDs? `uuid.UUID` which has
# multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid
#
# -[ ] as per the `.ipc._uds` / `._addr` comments, maybe we
# should also include at least `.pid` (equiv to port for tcp)
# and/or host-part always?
# TODO: use built-in support for UUIDs?
# -[ ] `uuid.UUID` which has multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid
class SpawnSpec(
@ -176,8 +168,8 @@ class SpawnSpec(
# TODO: not just sockaddr pairs?
# -[ ] abstract into a `TransportAddr` type?
reg_addrs: list[UnwrappedAddress]
bind_addrs: list[UnwrappedAddress]|None
reg_addrs: list[AddressTypes]
bind_addrs: list[AddressTypes]
# TODO: caps based RPC support in the payload?

28
uv.lock
View File

@ -11,15 +11,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/89/aa/ab0f7891a01eeb2d2e338ae8fecbe57fcebea1a24dbb64d45801bfab481d/attrs-24.3.0-py3-none-any.whl", hash = "sha256:ac96cd038792094f438ad1f6ff80837353805ac950cd2aa0e0625ef19850c308", size = 63397 },
]
[[package]]
name = "bidict"
version = "0.23.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/9a/6e/026678aa5a830e07cd9498a05d3e7e650a4f56a42f267a53d22bcda1bdc9/bidict-0.23.1.tar.gz", hash = "sha256:03069d763bc387bbd20e7d49914e75fc4132a41937fa3405417e1a5a2d006d71", size = 29093 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/99/37/e8730c3587a65eb5645d4aba2d27aae48e8003614d6aaf15dda67f702f1f/bidict-0.23.1-py3-none-any.whl", hash = "sha256:5dae8d4d79b552a71cbabc7deb25dfe8ce710b17ff41711e13010ead2abfc3e5", size = 32764 },
]
[[package]]
name = "cffi"
version = "1.17.1"
@ -257,21 +248,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e4/ea/d836f008d33151c7a1f62caf3d8dd782e4d15f6a43897f64480c2b8de2ad/prompt_toolkit-3.0.50-py3-none-any.whl", hash = "sha256:9b6427eb19e479d98acff65196a307c555eb567989e6d88ebbb1b509d9779198", size = 387816 },
]
[[package]]
name = "psutil"
version = "7.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051 },
{ url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535 },
{ url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004 },
{ url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986 },
{ url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544 },
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053 },
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885 },
]
[[package]]
name = "ptyprocess"
version = "0.7.0"
@ -373,7 +349,6 @@ name = "tractor"
version = "0.1.0a6.dev0"
source = { editable = "." }
dependencies = [
{ name = "bidict" },
{ name = "cffi" },
{ name = "colorlog" },
{ name = "msgspec" },
@ -388,7 +363,6 @@ dev = [
{ name = "greenback" },
{ name = "pexpect" },
{ name = "prompt-toolkit" },
{ name = "psutil" },
{ name = "pyperclip" },
{ name = "pytest" },
{ name = "stackscope" },
@ -397,7 +371,6 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "bidict", specifier = ">=0.23.1" },
{ name = "cffi", specifier = ">=1.17.1" },
{ name = "colorlog", specifier = ">=6.8.2,<7" },
{ name = "msgspec", specifier = ">=0.19.0" },
@ -412,7 +385,6 @@ dev = [
{ name = "greenback", specifier = ">=1.2.1,<2" },
{ name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pytest", specifier = ">=8.3.5" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },