Compare commits

..

No commits in common. "8fd7d1cec49b36c2b84b37818e494b20ad720a5a" and "1762b3eb64ec48b48dbe55286ffe6400e9563ffe" have entirely different histories.

33 changed files with 917 additions and 2115 deletions

View File

@ -120,7 +120,6 @@ async def main(
break_parent_ipc_after: int|bool = False, break_parent_ipc_after: int|bool = False,
break_child_ipc_after: int|bool = False, break_child_ipc_after: int|bool = False,
pre_close: bool = False, pre_close: bool = False,
tpt_proto: str = 'tcp',
) -> None: ) -> None:
@ -132,7 +131,6 @@ async def main(
# a hang since it never engages due to broken IPC # a hang since it never engages due to broken IPC
debug_mode=debug_mode, debug_mode=debug_mode,
loglevel=loglevel, loglevel=loglevel,
enable_transports=[tpt_proto],
) as an, ) as an,
): ):
@ -147,8 +145,7 @@ async def main(
_testing.expect_ctxc( _testing.expect_ctxc(
yay=( yay=(
break_parent_ipc_after break_parent_ipc_after
or or break_child_ipc_after
break_child_ipc_after
), ),
# TODO: we CAN'T remove this right? # TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either # since we need the ctxc to bubble up from either

View File

@ -46,7 +46,6 @@ dependencies = [
# typed IPC msging # typed IPC msging
"msgspec>=0.19.0", "msgspec>=0.19.0",
"cffi>=1.17.1", "cffi>=1.17.1",
"bidict>=0.23.1",
] ]
# ------ project ------ # ------ project ------
@ -64,7 +63,6 @@ dev = [
"pyperclip>=1.9.0", "pyperclip>=1.9.0",
"prompt-toolkit>=3.0.50", "prompt-toolkit>=3.0.50",
"xonsh>=0.19.2", "xonsh>=0.19.2",
"psutil>=7.0.0",
] ]
# TODO, add these with sane versions; were originally in # TODO, add these with sane versions; were originally in
# `requirements-docs.txt`.. # `requirements-docs.txt`..

View File

@ -1,8 +1,6 @@
""" """
Top level of the testing suites! ``tractor`` testing!!
""" """
from __future__ import annotations
import sys import sys
import subprocess import subprocess
import os import os
@ -32,11 +30,7 @@ else:
_KILL_SIGNAL = signal.SIGKILL _KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT _INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = ( _PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4
0.6
if sys.version_info < (3, 7)
else 0.4
)
no_windows = pytest.mark.skipif( no_windows = pytest.mark.skipif(
@ -45,9 +39,7 @@ no_windows = pytest.mark.skipif(
) )
def pytest_addoption( def pytest_addoption(parser):
parser: pytest.Parser,
):
parser.addoption( parser.addoption(
"--ll", "--ll",
action="store", action="store",
@ -64,8 +56,7 @@ def pytest_addoption(
) )
parser.addoption( parser.addoption(
"--tpdb", "--tpdb", "--debug-mode",
"--debug-mode",
action="store_true", action="store_true",
dest='tractor_debug_mode', dest='tractor_debug_mode',
# default=False, # default=False,
@ -76,17 +67,6 @@ def pytest_addoption(
), ),
) )
# provide which IPC transport protocols opting-in test suites
# should accumulatively run against.
parser.addoption(
"--tpt-proto",
nargs='+', # accumulate-multiple-args
action="store",
dest='tpt_protos',
default=['tcp'],
help="Transport protocol to use under the `tractor.ipc.Channel`",
)
def pytest_configure(config): def pytest_configure(config):
backend = config.option.spawn_backend backend = config.option.spawn_backend
@ -94,7 +74,7 @@ def pytest_configure(config):
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def debug_mode(request) -> bool: def debug_mode(request):
debug_mode: bool = request.config.option.tractor_debug_mode debug_mode: bool = request.config.option.tractor_debug_mode
# if debug_mode: # if debug_mode:
# breakpoint() # breakpoint()
@ -115,35 +95,11 @@ def spawn_backend(request) -> str:
return request.config.option.spawn_backend return request.config.option.spawn_backend
@pytest.fixture(scope='session') # @pytest.fixture(scope='function', autouse=True)
def tpt_protos(request) -> list[str]: # def debug_enabled(request) -> str:
# from tractor import _state
# allow quoting on CLI # if _state._runtime_vars['_debug_mode']:
proto_keys: list[str] = [ # breakpoint()
proto_key.replace('"', '').replace("'", "")
for proto_key in request.config.option.tpt_protos
]
# ?TODO, eventually support multiple protos per test-sesh?
if len(proto_keys) > 1:
pytest.fail(
'We only support one `--tpt-proto <key>` atm!\n'
)
# XXX ensure we support the protocol by name via lookup!
for proto_key in proto_keys:
addr_type = tractor._addr._address_types[proto_key]
assert addr_type.proto_key == proto_key
yield proto_keys
@pytest.fixture(scope='session')
def tpt_proto(
tpt_protos: list[str],
) -> str:
yield tpt_protos[0]
_ci_env: bool = os.environ.get('CI', False) _ci_env: bool = os.environ.get('CI', False)
@ -151,7 +107,7 @@ _ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def ci_env() -> bool: def ci_env() -> bool:
''' '''
Detect CI environment. Detect CI envoirment.
''' '''
return _ci_env return _ci_env
@ -159,45 +115,30 @@ def ci_env() -> bool:
# TODO: also move this to `._testing` for now? # TODO: also move this to `._testing` for now?
# -[ ] possibly generalize and re-use for multi-tree spawning # -[ ] possibly generalize and re-use for multi-tree spawning
# along with the new stuff for multi-addrs? # along with the new stuff for multi-addrs in distribute_dis
# branch?
# #
# choose random port at import time # choose randomly at import time
_rando_port: str = random.randint(1000, 9999) _reg_addr: tuple[str, int] = (
'127.0.0.1',
random.randint(1000, 9999),
)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def reg_addr( def reg_addr() -> tuple[str, int]:
tpt_proto: str,
) -> tuple[str, int|str]:
# globally override the runtime to the per-test-session-dynamic # globally override the runtime to the per-test-session-dynamic
# addr so that all tests never conflict with any other actor # addr so that all tests never conflict with any other actor
# tree using the default. # tree using the default.
from tractor import ( from tractor import _root
_addr, _root._default_lo_addrs = [_reg_addr]
)
addr_type = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
testrun_reg_addr: tuple[str, int] return _reg_addr
match tpt_proto:
case 'tcp':
testrun_reg_addr = (
addr_type.def_bindspace,
_rando_port,
)
# NOTE, file-name uniqueness (no-collisions) will be based on
# the runtime-directory and root (pytest-proc's) pid.
case 'uds':
testrun_reg_addr = addr_type.get_random().unwrap()
assert def_reg_addr != testrun_reg_addr
return testrun_reg_addr
def pytest_generate_tests(metafunc): def pytest_generate_tests(metafunc):
spawn_backend: str = metafunc.config.option.spawn_backend spawn_backend = metafunc.config.option.spawn_backend
if not spawn_backend: if not spawn_backend:
# XXX some weird windows bug with `pytest`? # XXX some weird windows bug with `pytest`?
@ -210,53 +151,45 @@ def pytest_generate_tests(metafunc):
'trio', 'trio',
) )
# NOTE: used-to-be-used-to dyanmically parametrize tests for when # NOTE: used to be used to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect # you just passed --spawn-backend=`mp` on the cli, but now we expect
# that cli input to be manually specified, BUT, maybe we'll do # that cli input to be manually specified, BUT, maybe we'll do
# something like this again in the future? # something like this again in the future?
if 'start_method' in metafunc.fixturenames: if 'start_method' in metafunc.fixturenames:
metafunc.parametrize( metafunc.parametrize("start_method", [spawn_backend], scope='module')
"start_method",
[spawn_backend],
scope='module',
)
# TODO, parametrize any `tpt_proto: str` declaring tests!
# proto_tpts: list[str] = metafunc.config.option.proto_tpts # TODO: a way to let test scripts (like from `examples/`)
# if 'tpt_proto' in metafunc.fixturenames: # guarantee they won't registry addr collide!
# metafunc.parametrize( # @pytest.fixture
# 'tpt_proto', # def open_test_runtime(
# proto_tpts, # TODO, double check this list usage! # reg_addr: tuple,
# scope='module', # ) -> AsyncContextManager:
# return partial(
# tractor.open_nursery,
# registry_addrs=[reg_addr],
# ) # )
def sig_prog( def sig_prog(proc, sig):
proc: subprocess.Popen,
sig: int,
canc_timeout: float = 0.1,
) -> int:
"Kill the actor-process with ``sig``." "Kill the actor-process with ``sig``."
proc.send_signal(sig) proc.send_signal(sig)
time.sleep(canc_timeout) time.sleep(0.1)
if not proc.poll(): if not proc.poll():
# TODO: why sometimes does SIGINT not work on teardown? # TODO: why sometimes does SIGINT not work on teardown?
# seems to happen only when trace logging enabled? # seems to happen only when trace logging enabled?
proc.send_signal(_KILL_SIGNAL) proc.send_signal(_KILL_SIGNAL)
ret: int = proc.wait() ret = proc.wait()
assert ret assert ret
# TODO: factor into @cm and move to `._testing`? # TODO: factor into @cm and move to `._testing`?
@pytest.fixture @pytest.fixture
def daemon( def daemon(
debug_mode: bool,
loglevel: str, loglevel: str,
testdir, testdir,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
tpt_proto: str, ):
) -> subprocess.Popen:
''' '''
Run a daemon root actor as a separate actor-process tree and Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests. "remote registrar" for discovery-protocol related tests.
@ -268,99 +201,27 @@ def daemon(
code: str = ( code: str = (
"import tractor; " "import tractor; "
"tractor.run_daemon([], " "tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
"registry_addrs={reg_addrs}, "
"debug_mode={debug_mode}, "
"loglevel={ll})"
).format( ).format(
reg_addrs=str([reg_addr]), reg_addrs=str([reg_addr]),
ll="'{}'".format(loglevel) if loglevel else None, ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode,
) )
cmd: list[str] = [ cmd: list[str] = [
sys.executable, sys.executable,
'-c', code, '-c', code,
] ]
# breakpoint()
kwargs = {} kwargs = {}
if platform.system() == 'Windows': if platform.system() == 'Windows':
# without this, tests hang on windows forever # without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc: subprocess.Popen = testdir.popen( proc = testdir.popen(
cmd, cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs, **kwargs,
) )
# UDS sockets are **really** fast to bind()/listen()/connect()
# so it's often required that we delay a bit more starting
# the first actor-tree..
if tpt_proto == 'uds':
global _PROC_SPAWN_WAIT
_PROC_SPAWN_WAIT = 0.6
time.sleep(_PROC_SPAWN_WAIT)
assert not proc.returncode assert not proc.returncode
time.sleep(_PROC_SPAWN_WAIT)
yield proc yield proc
sig_prog(proc, _INT_SIGNAL) sig_prog(proc, _INT_SIGNAL)
# XXX! yeah.. just be reaaal careful with this bc sometimes it
# can lock up on the `_io.BufferedReader` and hang..
stderr: str = proc.stderr.read().decode()
if stderr:
print(
f'Daemon actor tree produced STDERR:\n'
f'{proc.args}\n'
f'\n'
f'{stderr}\n'
)
if proc.returncode != -2:
raise RuntimeError(
'Daemon actor tree failed !?\n'
f'{proc.args}\n'
)
# @pytest.fixture(autouse=True)
# def shared_last_failed(pytestconfig):
# val = pytestconfig.cache.get("example/value", None)
# breakpoint()
# if val is None:
# pytestconfig.cache.set("example/value", val)
# return val
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't `registry_addrs` collide!
# -[ ] maybe use some kinda standard `def main()` arg-spec that
# we can introspect from a fixture that is called from the test
# body?
# -[ ] test and figure out typing for below prototype! Bp
#
# @pytest.fixture
# def set_script_runtime_args(
# reg_addr: tuple,
# ) -> Callable[[...], None]:
# def import_n_partial_in_args_n_triorun(
# script: Path, # under examples?
# **runtime_args,
# ) -> Callable[[], Any]: # a `partial`-ed equiv of `trio.run()`
# # NOTE, below is taken from
# # `.test_advanced_faults.test_ipc_channel_break_during_stream`
# mod: ModuleType = import_path(
# examples_dir() / 'advanced_faults'
# / 'ipc_failure_during_stream.py',
# root=examples_dir(),
# consider_namespace_packages=False,
# )
# return partial(
# trio.run,
# partial(
# mod.main,
# **runtime_args,
# )
# )
# return import_n_partial_in_args_n_triorun

View File

@ -10,9 +10,6 @@ import pytest
from _pytest.pathlib import import_path from _pytest.pathlib import import_path
import trio import trio
import tractor import tractor
from tractor import (
TransportClosed,
)
from tractor._testing import ( from tractor._testing import (
examples_dir, examples_dir,
break_ipc, break_ipc,
@ -77,7 +74,6 @@ def test_ipc_channel_break_during_stream(
spawn_backend: str, spawn_backend: str,
ipc_break: dict|None, ipc_break: dict|None,
pre_aclose_msgstream: bool, pre_aclose_msgstream: bool,
tpt_proto: str,
): ):
''' '''
Ensure we can have an IPC channel break its connection during Ensure we can have an IPC channel break its connection during
@ -95,7 +91,7 @@ def test_ipc_channel_break_during_stream(
# non-`trio` spawners should never hit the hang condition that # non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree. # requires the user to do ctl-c to cancel the actor tree.
# expect_final_exc = trio.ClosedResourceError # expect_final_exc = trio.ClosedResourceError
expect_final_exc = TransportClosed expect_final_exc = tractor.TransportClosed
mod: ModuleType = import_path( mod: ModuleType = import_path(
examples_dir() / 'advanced_faults' examples_dir() / 'advanced_faults'
@ -108,8 +104,6 @@ def test_ipc_channel_break_during_stream(
# period" wherein the user eventually hits ctl-c to kill the # period" wherein the user eventually hits ctl-c to kill the
# root-actor tree. # root-actor tree.
expect_final_exc: BaseException = KeyboardInterrupt expect_final_exc: BaseException = KeyboardInterrupt
expect_final_cause: BaseException|None = None
if ( if (
# only expect EoC if trans is broken on the child side, # only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
@ -144,9 +138,6 @@ def test_ipc_channel_break_during_stream(
# a user sending ctl-c by raising a KBI. # a user sending ctl-c by raising a KBI.
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# XXX OLD XXX # XXX OLD XXX
# if child calls `MsgStream.aclose()` then expect EoC. # if child calls `MsgStream.aclose()` then expect EoC.
@ -166,10 +157,6 @@ def test_ipc_channel_break_during_stream(
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# NOTE when the parent IPC side dies (even if the child does as well # NOTE when the parent IPC side dies (even if the child does as well
# but the child fails BEFORE the parent) we always expect the # but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect # IPC layer to raise a closed-resource, NEVER do we expect
@ -182,8 +169,8 @@ def test_ipc_channel_break_during_stream(
and and
ipc_break['break_child_ipc_after'] is False ipc_break['break_child_ipc_after'] is False
): ):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed expect_final_exc = tractor.TransportClosed
expect_final_cause = trio.ClosedResourceError
# BOTH but, PARENT breaks FIRST # BOTH but, PARENT breaks FIRST
elif ( elif (
@ -194,8 +181,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after'] ipc_break['break_parent_ipc_after']
) )
): ):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed expect_final_exc = tractor.TransportClosed
expect_final_cause = trio.ClosedResourceError
with pytest.raises( with pytest.raises(
expected_exception=( expected_exception=(
@ -211,7 +198,6 @@ def test_ipc_channel_break_during_stream(
start_method=spawn_backend, start_method=spawn_backend,
loglevel=loglevel, loglevel=loglevel,
pre_close=pre_aclose_msgstream, pre_close=pre_aclose_msgstream,
tpt_proto=tpt_proto,
**ipc_break, **ipc_break,
) )
) )
@ -234,15 +220,10 @@ def test_ipc_channel_break_during_stream(
) )
cause: Exception = tc.__cause__ cause: Exception = tc.__cause__
assert ( assert (
# type(cause) is trio.ClosedResourceError type(cause) is trio.ClosedResourceError
type(cause) is expect_final_cause and
cause.args[0] == 'another task closed this fd'
# TODO, should we expect a certain exc-message (per
# tpt) as well??
# and
# cause.args[0] == 'another task closed this fd'
) )
raise raise
# get raw instance from pytest wrapper # get raw instance from pytest wrapper

View File

@ -7,9 +7,7 @@ import platform
from functools import partial from functools import partial
import itertools import itertools
import psutil
import pytest import pytest
import subprocess
import tractor import tractor
from tractor._testing import tractor_test from tractor._testing import tractor_test
import trio import trio
@ -154,23 +152,13 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry( async def spawn_and_check_registry(
reg_addr: tuple, reg_addr: tuple,
use_signal: bool, use_signal: bool,
debug_mode: bool = False,
remote_arbiter: bool = False, remote_arbiter: bool = False,
with_streaming: bool = False, with_streaming: bool = False,
maybe_daemon: tuple[
subprocess.Popen,
psutil.Process,
]|None = None,
) -> None: ) -> None:
if maybe_daemon:
popen, proc = maybe_daemon
# breakpoint()
async with tractor.open_root_actor( async with tractor.open_root_actor(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
debug_mode=debug_mode,
): ):
async with tractor.get_registry(reg_addr) as portal: async with tractor.get_registry(reg_addr) as portal:
# runtime needs to be up to call this # runtime needs to be up to call this
@ -188,11 +176,11 @@ async def spawn_and_check_registry(
extra = 2 # local root actor + remote arbiter extra = 2 # local root actor + remote arbiter
# ensure current actor is registered # ensure current actor is registered
registry: dict = await get_reg() registry = await get_reg()
assert actor.uid in registry assert actor.uid in registry
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as n:
async with trio.open_nursery( async with trio.open_nursery(
strict_exception_groups=False, strict_exception_groups=False,
) as trion: ) as trion:
@ -201,17 +189,17 @@ async def spawn_and_check_registry(
for i in range(3): for i in range(3):
name = f'a{i}' name = f'a{i}'
if with_streaming: if with_streaming:
portals[name] = await an.start_actor( portals[name] = await n.start_actor(
name=name, enable_modules=[__name__]) name=name, enable_modules=[__name__])
else: # no streaming else: # no streaming
portals[name] = await an.run_in_actor( portals[name] = await n.run_in_actor(
trio.sleep_forever, name=name) trio.sleep_forever, name=name)
# wait on last actor to come up # wait on last actor to come up
async with tractor.wait_for_actor(name): async with tractor.wait_for_actor(name):
registry = await get_reg() registry = await get_reg()
for uid in an._children: for uid in n._children:
assert uid in registry assert uid in registry
assert len(portals) + extra == len(registry) assert len(portals) + extra == len(registry)
@ -244,7 +232,6 @@ async def spawn_and_check_registry(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True]) @pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel( def test_subactors_unregister_on_cancel(
debug_mode: bool,
start_method, start_method,
use_signal, use_signal,
reg_addr, reg_addr,
@ -261,7 +248,6 @@ def test_subactors_unregister_on_cancel(
spawn_and_check_registry, spawn_and_check_registry,
reg_addr, reg_addr,
use_signal, use_signal,
debug_mode=debug_mode,
remote_arbiter=False, remote_arbiter=False,
with_streaming=with_streaming, with_streaming=with_streaming,
), ),
@ -271,8 +257,7 @@ def test_subactors_unregister_on_cancel(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True]) @pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel_remote_daemon( def test_subactors_unregister_on_cancel_remote_daemon(
daemon: subprocess.Popen, daemon,
debug_mode: bool,
start_method, start_method,
use_signal, use_signal,
reg_addr, reg_addr,
@ -288,13 +273,8 @@ def test_subactors_unregister_on_cancel_remote_daemon(
spawn_and_check_registry, spawn_and_check_registry,
reg_addr, reg_addr,
use_signal, use_signal,
debug_mode=debug_mode,
remote_arbiter=True, remote_arbiter=True,
with_streaming=with_streaming, with_streaming=with_streaming,
maybe_daemon=(
daemon,
psutil.Process(daemon.pid)
),
), ),
) )
@ -393,7 +373,7 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter( def test_close_channel_explicit_remote_arbiter(
daemon: subprocess.Popen, daemon,
start_method, start_method,
use_signal, use_signal,
reg_addr, reg_addr,

View File

@ -11,9 +11,6 @@ from tractor.ipc import (
) )
from tractor._testing.samples import generate_sample_messages from tractor._testing.samples import generate_sample_messages
# in case you don't want to melt your cores, uncomment dis!
pytestmark = pytest.mark.skip
@tractor.context @tractor.context
async def child_read_shm( async def child_read_shm(

View File

@ -1,85 +0,0 @@
'''
Runtime boot/init sanity.
'''
import pytest
import trio
import tractor
from tractor._exceptions import RuntimeFailure
@tractor.context
async def open_new_root_in_sub(
ctx: tractor.Context,
) -> None:
async with tractor.open_root_actor():
pass
@pytest.mark.parametrize(
'open_root_in',
['root', 'sub'],
ids='open_2nd_root_in={}'.format,
)
def test_only_one_root_actor(
open_root_in: str,
reg_addr: tuple,
debug_mode: bool
):
'''
Verify we specially fail whenever more then one root actor
is attempted to be opened within an already opened tree.
'''
async def main():
async with tractor.open_nursery() as an:
if open_root_in == 'root':
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
pass
ptl: tractor.Portal = await an.start_actor(
name='bad_rooty_boi',
enable_modules=[__name__],
)
async with ptl.open_context(
open_new_root_in_sub,
) as (ctx, first):
pass
if open_root_in == 'root':
with pytest.raises(
RuntimeFailure
) as excinfo:
trio.run(main)
else:
with pytest.raises(
tractor.RemoteActorError,
) as excinfo:
trio.run(main)
assert excinfo.value.boxed_type is RuntimeFailure
def test_implicit_root_via_first_nursery(
reg_addr: tuple,
debug_mode: bool
):
'''
The first `ActorNursery` open should implicitly call
`_root.open_root_actor()`.
'''
async def main():
async with tractor.open_nursery() as an:
assert an._implicit_runtime_started
assert tractor.current_actor().aid.name == 'root'
trio.run(main)

View File

@ -2,7 +2,6 @@
Spawning basics Spawning basics
""" """
from functools import partial
from typing import ( from typing import (
Any, Any,
) )
@ -13,95 +12,74 @@ import tractor
from tractor._testing import tractor_test from tractor._testing import tractor_test
data_to_pass_down = { data_to_pass_down = {'doggy': 10, 'kitty': 4}
'doggy': 10,
'kitty': 4,
}
async def spawn( async def spawn(
should_be_root: bool, is_arbiter: bool,
data: dict, data: dict,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
debug_mode: bool = False,
): ):
namespaces = [__name__]
await trio.sleep(0.1) await trio.sleep(0.1)
actor = tractor.current_actor(err_on_no_runtime=False)
if should_be_root: async with tractor.open_root_actor(
assert actor is None # no runtime yet
async with (
tractor.open_root_actor(
arbiter_addr=reg_addr, arbiter_addr=reg_addr,
),
tractor.open_nursery() as an,
): ):
# now runtime exists actor = tractor.current_actor()
actor: tractor.Actor = tractor.current_actor() assert actor.is_arbiter == is_arbiter
assert actor.is_arbiter == should_be_root data = data_to_pass_down
# spawns subproc here if actor.is_arbiter:
portal: tractor.Portal = await an.run_in_actor( async with tractor.open_nursery() as nursery:
fn=spawn,
# spawning args # forks here
portal = await nursery.run_in_actor(
spawn,
is_arbiter=False,
name='sub-actor', name='sub-actor',
enable_modules=[__name__], data=data,
# passed to a subactor-recursive RPC invoke
# of this same `spawn()` fn.
should_be_root=False,
data=data_to_pass_down,
reg_addr=reg_addr, reg_addr=reg_addr,
enable_modules=namespaces,
) )
assert len(an._children) == 1 assert len(nursery._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers assert portal.channel.uid in tractor.current_actor()._peers
# be sure we can still get the result
# get result from child subactor
result = await portal.result() result = await portal.result()
assert result == 10 assert result == 10
return result return result
else: else:
assert actor.is_arbiter == should_be_root
return 10 return 10
def test_run_in_actor_same_func_in_child( def test_local_arbiter_subactor_global_state(
reg_addr: tuple, reg_addr,
debug_mode: bool,
): ):
result = trio.run( result = trio.run(
partial(
spawn, spawn,
should_be_root=True, True,
data=data_to_pass_down, data_to_pass_down,
reg_addr=reg_addr, reg_addr,
debug_mode=debug_mode,
)
) )
assert result == 10 assert result == 10
async def movie_theatre_question(): async def movie_theatre_question():
''' """A question asked in a dark theatre, in a tangent
A question asked in a dark theatre, in a tangent
(errr, I mean different) process. (errr, I mean different) process.
"""
'''
return 'have you ever seen a portal?' return 'have you ever seen a portal?'
@tractor_test @tractor_test
async def test_movie_theatre_convo(start_method): async def test_movie_theatre_convo(start_method):
''' """The main ``tractor`` routine.
The main ``tractor`` routine. """
async with tractor.open_nursery(debug_mode=True) as n:
''' portal = await n.start_actor(
async with tractor.open_nursery(debug_mode=True) as an:
portal = await an.start_actor(
'frank', 'frank',
# enable the actor to run funcs from this current module # enable the actor to run funcs from this current module
enable_modules=[__name__], enable_modules=[__name__],
@ -140,8 +118,8 @@ async def test_most_beautiful_word(
with trio.fail_after(1): with trio.fail_after(1):
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as n:
portal = await an.run_in_actor( portal = await n.run_in_actor(
cellar_door, cellar_door,
return_value=return_value, return_value=return_value,
name='some_linguist', name='some_linguist',

View File

@ -14,151 +14,56 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations from __future__ import annotations
from pathlib import Path
import os import os
# import tempfile import tempfile
from uuid import uuid4 from uuid import uuid4
from typing import ( from typing import (
Protocol, Protocol,
ClassVar, ClassVar,
# TypeVar, TypeVar,
# Union, Union,
Type, Type
TYPE_CHECKING,
) )
from bidict import bidict import trio
# import trio from trio import socket
from trio import (
socket,
SocketListener,
open_tcp_listeners,
)
from .log import get_logger
from ._state import (
get_rt_dir,
current_actor,
is_root_process,
_def_tpt_proto,
)
if TYPE_CHECKING:
from ._runtime import Actor
log = get_logger(__name__)
# TODO, maybe breakout the netns key to a struct? NamespaceType = TypeVar('NamespaceType')
# class NetNs(Struct)[str, int]: AddressType = TypeVar('AddressType')
# ... StreamType = TypeVar('StreamType')
ListenerType = TypeVar('ListenerType')
# TODO, can't we just use a type alias
# for this? namely just some `tuple[str, int, str, str]`?
#
# -[ ] would also just be simpler to keep this as SockAddr[tuple]
# or something, implying it's just a simple pair of values which can
# presumably be mapped to all transports?
# -[ ] `pydoc socket.socket.getsockname()` delivers a 4-tuple for
# ipv6 `(hostaddr, port, flowinfo, scope_id)`.. so how should we
# handle that?
# -[ ] as a further alternative to this wrap()/unwrap() approach we
# could just implement `enc/dec_hook()`s for the `Address`-types
# and just deal with our internal objs directly and always and
# leave it to the codec layer to figure out marshalling?
# |_ would mean only one spot to do the `.unwrap()` (which we may
# end up needing to call from the hook()s anyway?)
# -[x] rename to `UnwrappedAddress[Descriptor]` ??
# seems like the right name as per,
# https://www.geeksforgeeks.org/introduction-to-address-descriptor/
#
UnwrappedAddress = (
# tcp/udp/uds
tuple[
str, # host/domain(tcp), filesys-dir(uds)
int|str, # port/path(uds)
]
# ?TODO? should we also include another 2 fields from
# our `Aid` msg such that we include the runtime `Actor.uid`
# of `.name` and `.uuid`?
# - would ensure uniqueness across entire net?
# - allows for easier runtime-level filtering of "actors by
# service name"
)
# TODO, maybe rename to `SocketAddress`? class Address(Protocol[
class Address(Protocol): NamespaceType,
proto_key: ClassVar[str] AddressType,
unwrapped_type: ClassVar[UnwrappedAddress] StreamType,
ListenerType
]):
name_key: ClassVar[str]
address_type: ClassVar[Type[AddressType]]
# TODO, i feel like an `.is_bound()` is a better thing to
# support?
# Lke, what use does this have besides a noop and if it's not
# valid why aren't we erroring on creation/use?
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
... ...
# TODO, maybe `.netns` is a better name?
@property @property
def namespace(self) -> tuple[str, int]|None: def namespace(self) -> NamespaceType|None:
'''
The if-available, OS-specific "network namespace" key.
'''
...
@property
def bindspace(self) -> str:
'''
Deliver the socket address' "bindable space" from
a `socket.socket.bind()` and thus from the perspective of
specific transport protocol domain.
I.e. for most (layer-4) network-socket protocols this is
normally the ipv4/6 address, for UDS this is normally
a filesystem (sub-directory).
For (distributed) network protocols this is normally the routing
layer's domain/(ip-)address, though it might also include a "network namespace"
key different then the default.
For local-host-only transports this is either an explicit
namespace (with types defined by the OS: netns, Cgroup, IPC,
pid, etc. on linux) or failing that the sub-directory in the
filesys in which socket/shm files are located *under*.
'''
... ...
@classmethod @classmethod
def from_addr(cls, addr: UnwrappedAddress) -> Address: def from_addr(cls, addr: AddressType) -> Address:
... ...
def unwrap(self) -> UnwrappedAddress: def unwrap(self) -> AddressType:
'''
Deliver the underying minimum field set in
a primitive python data type-structure.
'''
... ...
@classmethod @classmethod
def get_random( def get_random(cls, namespace: NamespaceType | None = None) -> Address:
cls,
current_actor: Actor,
bindspace: str|None = None,
) -> Address:
... ...
# TODO, this should be something like a `.get_def_registar_addr()`
# or similar since,
# - it should be a **host singleton** (not root/tree singleton)
# - we **only need this value** when one isn't provided to the
# runtime at boot and we want to implicitly provide a host-wide
# registrar.
# - each rooted-actor-tree should likely have its own
# micro-registry (likely the root being it), also see
@classmethod @classmethod
def get_root(cls) -> Address: def get_root(cls) -> Address:
... ...
@ -169,20 +74,25 @@ class Address(Protocol):
def __eq__(self, other) -> bool: def __eq__(self, other) -> bool:
... ...
async def open_listener( async def open_stream(self, **kwargs) -> StreamType:
self, ...
**kwargs,
) -> SocketListener: async def open_listener(self, **kwargs) -> ListenerType:
... ...
async def close_listener(self): async def close_listener(self):
... ...
class TCPAddress(Address): class TCPAddress(Address[
proto_key: str = 'tcp' str,
unwrapped_type: type = tuple[str, int] tuple[str, int],
def_bindspace: str = '127.0.0.1' trio.SocketStream,
trio.SocketListener
]):
name_key: str = 'tcp'
address_type: type = tuple[str, int]
def __init__( def __init__(
self, self,
@ -194,63 +104,35 @@ class TCPAddress(Address):
or or
not isinstance(port, int) not isinstance(port, int)
): ):
raise TypeError( raise TypeError(f'Expected host {host} to be str and port {port} to be int')
f'Expected host {host!r} to be str and port {port!r} to be int' self._host = host
) self._port = port
self._host: str = host
self._port: int = port
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
return self._port != 0 return self._port != 0
@property @property
def bindspace(self) -> str: def namespace(self) -> str:
return self._host
@property
def domain(self) -> str:
return self._host return self._host
@classmethod @classmethod
def from_addr( def from_addr(cls, addr: tuple[str, int]) -> TCPAddress:
cls,
addr: tuple[str, int]
) -> TCPAddress:
match addr:
case (str(), int()):
return TCPAddress(addr[0], addr[1]) return TCPAddress(addr[0], addr[1])
case _:
raise ValueError(
f'Invalid unwrapped address for {cls}\n'
f'{addr}\n'
)
def unwrap(self) -> tuple[str, int]: def unwrap(self) -> tuple[str, int]:
return ( return self._host, self._port
self._host,
self._port,
)
@classmethod @classmethod
def get_random( def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress:
cls, return TCPAddress(namespace, 0)
bindspace: str = def_bindspace,
) -> TCPAddress:
return TCPAddress(bindspace, 0)
@classmethod @classmethod
def get_root(cls) -> Address: def get_root(cls) -> Address:
return TCPAddress( return TCPAddress('127.0.0.1', 1616)
'127.0.0.1',
1616,
)
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return f'{type(self)} @ {self.unwrap()}'
f'{type(self).__name__}[{self.unwrap()}]'
)
def __eq__(self, other) -> bool: def __eq__(self, other) -> bool:
if not isinstance(other, TCPAddress): if not isinstance(other, TCPAddress):
@ -264,11 +146,17 @@ class TCPAddress(Address):
self._port == other._port self._port == other._port
) )
async def open_listener( async def open_stream(self, **kwargs) -> trio.SocketStream:
self, stream = await trio.open_tcp_stream(
**kwargs, self._host,
) -> SocketListener: self._port,
listeners: list[SocketListener] = await open_tcp_listeners( **kwargs
)
self._host, self._port = stream.socket.getsockname()[:2]
return stream
async def open_listener(self, **kwargs) -> trio.SocketListener:
listeners = await trio.open_tcp_listeners(
host=self._host, host=self._host,
port=self._port, port=self._port,
**kwargs **kwargs
@ -282,156 +170,47 @@ class TCPAddress(Address):
... ...
def unwrap_sockpath( class UDSAddress(Address[
sockpath: Path, None,
) -> tuple[Path, Path]: str,
return ( trio.SocketStream,
sockpath.parent, trio.SocketListener
sockpath.name, ]):
)
name_key: str = 'uds'
class UDSAddress(Address): address_type: type = str
# TODO, maybe we should use better field and value
# -[x] really this is a `.protocol_key` not a "name" of anything.
# -[ ] consider a 'unix' proto-key instead?
# -[ ] need to check what other mult-transport frameworks do
# like zmq, nng, uri-spec et al!
proto_key: str = 'uds'
unwrapped_type: type = tuple[str, int]
def_bindspace: Path = get_rt_dir()
def __init__( def __init__(
self, self,
filedir: Path|str|None, filepath: str
# TODO, i think i want `.filename` here?
filename: str|Path,
# XXX, in the sense you can also pass
# a "non-real-world-process-id" such as is handy to represent
# our host-local default "port-like" key for the very first
# root actor to create a registry address.
maybe_pid: int|None = None,
): ):
fdir = self._filedir = Path( self._filepath = filepath
filedir
or
self.def_bindspace
).absolute()
fpath = self._filename = Path(filename)
fp: Path = fdir / fpath
assert (
fp.is_absolute()
and
fp == self.sockpath
)
# to track which "side" is the peer process by reading socket
# credentials-info.
self._pid: int = maybe_pid
@property
def sockpath(self) -> Path:
return self._filedir / self._filename
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
''' return True
We block socket files not allocated under the runtime subdir.
'''
return self.bindspace in self.sockpath.parents
@property @property
def bindspace(self) -> Path: def namespace(self) -> None:
''' return
We replicate the "ip-set-of-hosts" part of a UDS socket as
just the sub-directory in which we allocate socket files.
'''
return self._filedir or self.def_bindspace
@classmethod @classmethod
def from_addr( def from_addr(cls, filepath: str) -> UDSAddress:
cls, return UDSAddress(filepath)
addr: (
tuple[Path|str, Path|str]|Path|str
),
) -> UDSAddress:
match addr:
case tuple()|list():
filedir = Path(addr[0])
filename = Path(addr[1])
# sockpath: Path = Path(addr[0])
# filedir, filename = unwrap_sockpath(sockpath)
# pid: int = addr[1]
return UDSAddress(
filedir=filedir,
filename=filename,
# maybe_pid=pid,
)
# NOTE, in case we ever decide to just `.unwrap()`
# to a `Path|str`?
case str()|Path():
sockpath: Path = Path(addr)
return UDSAddress(*unwrap_sockpath(sockpath))
case _:
# import pdbp; pdbp.set_trace()
raise TypeError(
f'Bad unwrapped-address for {cls} !\n'
f'{addr!r}\n'
)
def unwrap(self) -> tuple[str, int]: def unwrap(self) -> str:
# XXX NOTE, since this gets passed DIRECTLY to return self._filepath
# `.ipc._uds.open_unix_socket_w_passcred()`
return (
str(self._filedir),
str(self._filename),
)
@classmethod @classmethod
def get_random( def get_random(cls, namespace: None = None) -> UDSAddress:
cls, return UDSAddress(f'{tempfile.gettempdir()}/{uuid4()}.sock')
bindspace: Path|None = None, # default netns
) -> UDSAddress:
filedir: Path = bindspace or cls.def_bindspace
pid: int = os.getpid()
actor: Actor|None = current_actor(
err_on_no_runtime=False,
)
if actor:
sockname: str = '::'.join(actor.uid) + f'@{pid}'
else:
prefix: str = '<unknown-actor>'
if is_root_process():
prefix: str = 'root'
sockname: str = f'{prefix}@{pid}'
sockpath: Path = Path(f'{sockname}.sock')
return UDSAddress(
filedir=filedir,
filename=sockpath,
maybe_pid=pid,
)
@classmethod @classmethod
def get_root(cls) -> Address: def get_root(cls) -> Address:
def_uds_filename: Path = 'registry@1616.sock' return UDSAddress('tractor.sock')
return UDSAddress(
filedir=None,
filename=def_uds_filename,
# maybe_pid=1616,
)
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return f'{type(self)} @ {self._filepath}'
f'{type(self).__name__}'
f'['
f'({self._filedir}, {self._filename})'
f']'
)
def __eq__(self, other) -> bool: def __eq__(self, other) -> bool:
if not isinstance(other, UDSAddress): if not isinstance(other, UDSAddress):
@ -439,144 +218,92 @@ class UDSAddress(Address):
f'Can not compare {type(other)} with {type(self)}' f'Can not compare {type(other)} with {type(self)}'
) )
return self.sockpath == other.sockpath return self._filepath == other._filepath
# async def open_listener(self, **kwargs) -> SocketListener: async def open_stream(self, **kwargs) -> trio.SocketStream:
async def open_listener( stream = await trio.open_unix_socket(
self, self._filepath,
**kwargs, **kwargs
) -> SocketListener:
sock = self._sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM
)
log.info(
f'Attempting to bind UDS socket\n'
f'>[\n'
f'|_{self}\n'
) )
return stream
bindpath: Path = self.sockpath async def open_listener(self, **kwargs) -> trio.SocketListener:
await sock.bind(str(bindpath)) self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.listen(1) await self._sock.bind(self._filepath)
log.info( self._sock.listen(1)
f'Listening on UDS socket\n' return trio.SocketListener(self._sock)
f'[>\n'
f' |_{self}\n'
)
return SocketListener(self._sock)
def close_listener(self): async def close_listener(self):
self._sock.close() self._sock.close()
os.unlink(self.sockpath) os.unlink(self._filepath)
_address_types: bidict[str, Type[Address]] = { preferred_transport = 'uds'
'tcp': TCPAddress,
'uds': UDSAddress
_address_types = (
TCPAddress,
UDSAddress
)
_default_addrs: dict[str, Type[Address]] = {
cls.name_key: cls
for cls in _address_types
} }
# TODO! really these are discovery sys default addrs ONLY useful for AddressTypes = Union[
# when none is provided to a root actor on first boot. tuple([
cls.address_type
for cls in _address_types
])
]
_default_lo_addrs: dict[ _default_lo_addrs: dict[
str, str,
UnwrappedAddress AddressTypes
] = { ] = {
'tcp': TCPAddress.get_root().unwrap(), cls.name_key: cls.get_root().unwrap()
'uds': UDSAddress.get_root().unwrap(), for cls in _address_types
} }
def get_address_cls(name: str) -> Type[Address]: def get_address_cls(name: str) -> Type[Address]:
return _address_types[name] return _default_addrs[name]
def is_wrapped_addr(addr: any) -> bool: def is_wrapped_addr(addr: any) -> bool:
return type(addr) in _address_types.values() return type(addr) in _address_types
def mk_uuid() -> str: def wrap_address(addr: AddressTypes) -> Address:
'''
Encapsulate creation of a uuid4 as `str` as used
for creating `Actor.uid: tuple[str, str]` and/or
`.msg.types.Aid`.
'''
return str(uuid4())
def wrap_address(
addr: UnwrappedAddress
) -> Address:
'''
Wrap an `UnwrappedAddress` as an `Address`-type based
on matching builtin python data-structures which we adhoc
use for each.
XXX NOTE, careful care must be placed to ensure
`UnwrappedAddress` cases are **definitely unique** otherwise the
wrong transport backend may be loaded and will break many
low-level things in our runtime in a not-fun-to-debug way!
XD
'''
if is_wrapped_addr(addr): if is_wrapped_addr(addr):
return addr return addr
cls: Type|None = None cls = None
# if 'sock' in addr[0]:
# import pdbp; pdbp.set_trace()
match addr: match addr:
case str():
# classic network socket-address as tuple/list
case (
(str(), int())
|
[str(), int()]
):
cls = TCPAddress
case (
# (str()|Path(), str()|Path()),
# ^TODO? uhh why doesn't this work!?
(_, filename)
) if type(filename) is str:
cls = UDSAddress cls = UDSAddress
# likely an unset UDS or TCP reg address as defaulted in case tuple() | list():
# `_state._runtime_vars['_root_mailbox']` cls = TCPAddress
#
# TODO? figure out when/if we even need this? case None:
case ( cls = get_address_cls(preferred_transport)
None addr = cls.get_root().unwrap()
|
[None, None]
):
cls: Type[Address] = get_address_cls(_def_tpt_proto)
addr: UnwrappedAddress = cls.get_root().unwrap()
case _: case _:
# import pdbp; pdbp.set_trace()
raise TypeError( raise TypeError(
f'Can not wrap unwrapped-address ??\n' f'Can not wrap addr {addr} of type {type(addr)}'
f'type(addr): {type(addr)!r}\n'
f'addr: {addr!r}\n'
) )
return cls.from_addr(addr) return cls.from_addr(addr)
def default_lo_addrs( def default_lo_addrs(transports: list[str]) -> list[AddressTypes]:
transports: list[str],
) -> list[Type[Address]]:
'''
Return the default, host-singleton, registry address
for an input transport key set.
'''
return [ return [
_default_lo_addrs[transport] _default_lo_addrs[transport]
for transport in transports for transport in transports

View File

@ -50,8 +50,8 @@ if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
subactor = Actor( subactor = Actor(
name=args.uid[0], args.uid[0],
uuid=args.uid[1], uid=args.uid[1],
loglevel=args.loglevel, loglevel=args.loglevel,
spawn_method="trio" spawn_method="trio"
) )

View File

@ -366,7 +366,7 @@ class Context:
# f' ---\n' # f' ---\n'
f' |_ipc: {self.dst_maddr}\n' f' |_ipc: {self.dst_maddr}\n'
# f' dst_maddr{ds}{self.dst_maddr}\n' # f' dst_maddr{ds}{self.dst_maddr}\n'
f" uid{ds}'{self.chan.aid}'\n" f" uid{ds}'{self.chan.uid}'\n"
f" cid{ds}'{self.cid}'\n" f" cid{ds}'{self.cid}'\n"
# f' ---\n' # f' ---\n'
f'\n' f'\n'
@ -945,9 +945,9 @@ class Context:
reminfo: str = ( reminfo: str = (
# ' =>\n' # ' =>\n'
# f'Context.cancel() => {self.chan.uid}\n' # f'Context.cancel() => {self.chan.uid}\n'
f'\n'
f'c)=> {self.chan.uid}\n' f'c)=> {self.chan.uid}\n'
f' |_[{self.dst_maddr}\n' # f'{self.chan.uid}\n'
f' |_ @{self.dst_maddr}\n'
f' >> {self.repr_rpc}\n' f' >> {self.repr_rpc}\n'
# f' >> {self._nsf}() -> {codec}[dict]:\n\n' # f' >> {self._nsf}() -> {codec}[dict]:\n\n'
# TODO: pull msg-type from spec re #320 # TODO: pull msg-type from spec re #320

View File

@ -31,8 +31,9 @@ from tractor.log import get_logger
from .trionics import gather_contexts from .trionics import gather_contexts
from .ipc import _connect_chan, Channel from .ipc import _connect_chan, Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, AddressTypes,
Address, Address,
preferred_transport,
wrap_address wrap_address
) )
from ._portal import ( from ._portal import (
@ -43,7 +44,6 @@ from ._portal import (
from ._state import ( from ._state import (
current_actor, current_actor,
_runtime_vars, _runtime_vars,
_def_tpt_proto,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -54,9 +54,7 @@ log = get_logger(__name__)
@acm @acm
async def get_registry( async def get_registry(addr: AddressTypes | None = None) -> AsyncGenerator[
addr: UnwrappedAddress|None = None,
) -> AsyncGenerator[
Portal | LocalPortal | None, Portal | LocalPortal | None,
None, None,
]: ]:
@ -73,9 +71,7 @@ async def get_registry(
# (likely a re-entrant call from the arbiter actor) # (likely a re-entrant call from the arbiter actor)
yield LocalPortal( yield LocalPortal(
actor, actor,
Channel(transport=None) await Channel.from_addr(addr)
# ^XXX, we DO NOT actually provide nor connect an
# underlying transport since this is merely an API shim.
) )
else: else:
# TODO: try to look pre-existing connection from # TODO: try to look pre-existing connection from
@ -139,10 +135,10 @@ def get_peer_by_name(
@acm @acm
async def query_actor( async def query_actor(
name: str, name: str,
regaddr: UnwrappedAddress|None = None, regaddr: AddressTypes|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
UnwrappedAddress|None, AddressTypes|None,
None, None,
]: ]:
''' '''
@ -172,7 +168,7 @@ async def query_actor(
async with get_registry(regaddr) as reg_portal: async with get_registry(regaddr) as reg_portal:
# TODO: return portals to all available actors - for now # TODO: return portals to all available actors - for now
# just the last one that registered # just the last one that registered
addr: UnwrappedAddress = await reg_portal.run_from_ns( addr: AddressTypes = await reg_portal.run_from_ns(
'self', 'self',
'find_actor', 'find_actor',
name=name, name=name,
@ -182,7 +178,7 @@ async def query_actor(
@acm @acm
async def maybe_open_portal( async def maybe_open_portal(
addr: UnwrappedAddress, addr: AddressTypes,
name: str, name: str,
): ):
async with query_actor( async with query_actor(
@ -202,8 +198,8 @@ async def maybe_open_portal(
@acm @acm
async def find_actor( async def find_actor(
name: str, name: str,
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[AddressTypes]|None = None,
enable_transports: list[str] = [_def_tpt_proto], enable_transports: list[str] = [preferred_transport],
only_first: bool = True, only_first: bool = True,
raise_on_none: bool = False, raise_on_none: bool = False,
@ -238,7 +234,7 @@ async def find_actor(
) )
maybe_portals: list[ maybe_portals: list[
AsyncContextManager[UnwrappedAddress] AsyncContextManager[AddressTypes]
] = list( ] = list(
maybe_open_portal( maybe_open_portal(
addr=addr, addr=addr,
@ -280,7 +276,7 @@ async def find_actor(
@acm @acm
async def wait_for_actor( async def wait_for_actor(
name: str, name: str,
registry_addr: UnwrappedAddress | None = None, registry_addr: AddressTypes | None = None,
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
''' '''
@ -297,7 +293,7 @@ async def wait_for_actor(
yield peer_portal yield peer_portal
return return
regaddr: UnwrappedAddress = ( regaddr: AddressTypes = (
registry_addr registry_addr
or or
actor.reg_addrs[0] actor.reg_addrs[0]
@ -314,7 +310,7 @@ async def wait_for_actor(
# get latest registered addr by default? # get latest registered addr by default?
# TODO: offer multi-portal yields in multi-homed case? # TODO: offer multi-portal yields in multi-homed case?
addr: UnwrappedAddress = addrs[-1] addr: AddressTypes = addrs[-1]
async with _connect_chan(addr) as chan: async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:

View File

@ -37,7 +37,7 @@ from .log import (
from . import _state from . import _state
from .devx import _debug from .devx import _debug
from .to_asyncio import run_as_asyncio_guest from .to_asyncio import run_as_asyncio_guest
from ._addr import UnwrappedAddress from ._addr import AddressTypes
from ._runtime import ( from ._runtime import (
async_main, async_main,
Actor, Actor,
@ -53,10 +53,10 @@ log = get_logger(__name__)
def _mp_main( def _mp_main(
actor: Actor, actor: Actor,
accept_addrs: list[UnwrappedAddress], accept_addrs: list[AddressTypes],
forkserver_info: tuple[Any, Any, Any, Any, Any], forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey, start_method: SpawnMethodKey,
parent_addr: UnwrappedAddress | None = None, parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> None: ) -> None:
@ -207,7 +207,7 @@ def nest_from_op(
def _trio_main( def _trio_main(
actor: Actor, actor: Actor,
*, *,
parent_addr: UnwrappedAddress|None = None, parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> None: ) -> None:

View File

@ -23,6 +23,7 @@ import builtins
import importlib import importlib
from pprint import pformat from pprint import pformat
from pdb import bdb from pdb import bdb
import sys
from types import ( from types import (
TracebackType, TracebackType,
) )
@ -71,22 +72,8 @@ log = get_logger('tractor')
_this_mod = importlib.import_module(__name__) _this_mod = importlib.import_module(__name__)
class RuntimeFailure(RuntimeError): class ActorFailure(Exception):
''' "General actor failure"
General `Actor`-runtime failure due to,
- a bad runtime-env,
- falied spawning (bad input to process),
- API usage.
'''
class ActorFailure(RuntimeFailure):
'''
`Actor` failed to boot before/after spawn
'''
class InternalError(RuntimeError): class InternalError(RuntimeError):
@ -139,12 +126,6 @@ class TrioTaskExited(Exception):
''' '''
class DebugRequestError(RuntimeError):
'''
Failed to request stdio lock from root actor!
'''
# NOTE: more or less should be close to these: # NOTE: more or less should be close to these:
# 'boxed_type', # 'boxed_type',
# 'src_type', # 'src_type',
@ -210,8 +191,6 @@ def get_err_type(type_name: str) -> BaseException|None:
): ):
return type_ref return type_ref
return None
def pack_from_raise( def pack_from_raise(
local_err: ( local_err: (
@ -542,6 +521,7 @@ class RemoteActorError(Exception):
if val: if val:
_repr += f'{key}={val_str}{end_char}' _repr += f'{key}={val_str}{end_char}'
return _repr return _repr
def reprol(self) -> str: def reprol(self) -> str:
@ -620,9 +600,56 @@ class RemoteActorError(Exception):
the type name is already implicitly shown by python). the type name is already implicitly shown by python).
''' '''
header: str = ''
body: str = ''
message: str = ''
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(exc := sys.exception())
and
exc is self
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if with_type_header:
header: str = f'<{type(self).__name__}('
if message := self._message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
# IFF there is an embedded traceback-str we always # IFF there is an embedded traceback-str we always
# draw the ascii-box around it. # draw the ascii-box around it.
body: str = ''
if tb_str := self.tb_str: if tb_str := self.tb_str:
fields: str = self._mk_fields_str( fields: str = self._mk_fields_str(
_body_fields _body_fields
@ -643,15 +670,21 @@ class RemoteActorError(Exception):
boxer_header=self.relay_uid, boxer_header=self.relay_uid,
) )
# !TODO, it'd be nice to import these top level without tail = ''
# cycles! if (
from tractor.devx.pformat import ( with_type_header
pformat_exc, and not message
) ):
return pformat_exc( tail: str = '>'
exc=self,
with_type_header=with_type_header, return (
body=body, header
+
message
+
f'{body}'
+
tail
) )
__repr__ = pformat __repr__ = pformat
@ -929,7 +962,7 @@ class StreamOverrun(
''' '''
class TransportClosed(Exception): class TransportClosed(trio.BrokenResourceError):
''' '''
IPC transport (protocol) connection was closed or broke and IPC transport (protocol) connection was closed or broke and
indicates that the wrapping communication `Channel` can no longer indicates that the wrapping communication `Channel` can no longer
@ -940,21 +973,16 @@ class TransportClosed(Exception):
self, self,
message: str, message: str,
loglevel: str = 'transport', loglevel: str = 'transport',
src_exc: Exception|None = None, cause: BaseException|None = None,
raise_on_report: bool = False, raise_on_report: bool = False,
) -> None: ) -> None:
self.message: str = message self.message: str = message
self._loglevel: str = loglevel self._loglevel = loglevel
super().__init__(message) super().__init__(message)
self.src_exc = src_exc if cause is not None:
if ( self.__cause__ = cause
src_exc is not None
and
not self.__cause__
):
self.__cause__ = src_exc
# flag to toggle whether the msg loop should raise # flag to toggle whether the msg loop should raise
# the exc in its `TransportClosed` handler block. # the exc in its `TransportClosed` handler block.
@ -981,36 +1009,13 @@ class TransportClosed(Exception):
f' {cause}\n' # exc repr f' {cause}\n' # exc repr
) )
getattr( getattr(log, self._loglevel)(message)
log,
self._loglevel
)(message)
# some errors we want to blow up from # some errors we want to blow up from
# inside the RPC msg loop # inside the RPC msg loop
if self._raise_on_report: if self._raise_on_report:
raise self from cause raise self from cause
def pformat(self) -> str:
from tractor.devx.pformat import (
pformat_exc,
)
src_err: Exception|None = self.src_exc or '<unknown>'
src_msg: tuple[str] = src_err.args
src_exc_repr: str = (
f'{type(src_err).__name__}[ {src_msg} ]'
)
return pformat_exc(
exc=self,
# message=self.message, # implicit!
body=(
f' |_src_exc: {src_exc_repr}\n'
),
)
# delegate to `str`-ified pformat
__repr__ = pformat
class NoResult(RuntimeError): class NoResult(RuntimeError):
"No final result is expected for this actor" "No final result is expected for this actor"

View File

@ -107,10 +107,6 @@ class Portal:
# point. # point.
self._expect_result_ctx: Context|None = None self._expect_result_ctx: Context|None = None
self._streams: set[MsgStream] = set() self._streams: set[MsgStream] = set()
# TODO, this should be PRIVATE (and never used publicly)! since it's just
# a cached ref to the local runtime instead of calling
# `current_actor()` everywhere.. XD
self.actor: Actor = current_actor() self.actor: Actor = current_actor()
@property @property
@ -175,7 +171,7 @@ class Portal:
# not expecting a "main" result # not expecting a "main" result
if self._expect_result_ctx is None: if self._expect_result_ctx is None:
log.warning( log.warning(
f"Portal for {self.channel.aid} not expecting a final" f"Portal for {self.channel.uid} not expecting a final"
" result?\nresult() should only be called if subactor" " result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`") " was spawned with `ActorNursery.run_in_actor()`")
return NoResult return NoResult
@ -222,7 +218,7 @@ class Portal:
# IPC calls # IPC calls
if self._streams: if self._streams:
log.cancel( log.cancel(
f"Cancelling all streams with {self.channel.aid}") f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy(): for stream in self._streams.copy():
try: try:
await stream.aclose() await stream.aclose()
@ -267,7 +263,7 @@ class Portal:
return False return False
reminfo: str = ( reminfo: str = (
f'c)=> {self.channel.aid}\n' f'c)=> {self.channel.uid}\n'
f' |_{chan}\n' f' |_{chan}\n'
) )
log.cancel( log.cancel(
@ -310,7 +306,7 @@ class Portal:
): ):
log.debug( log.debug(
'IPC chan for actor already closed or broken?\n\n' 'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.aid}\n' f'{self.channel.uid}\n'
f' |_{self.channel}\n' f' |_{self.channel}\n'
) )
return False return False
@ -508,12 +504,8 @@ class LocalPortal:
return it's result. return it's result.
''' '''
obj = ( obj = self.actor if ns == 'self' else importlib.import_module(ns)
self.actor func = getattr(obj, func_name)
if ns == 'self'
else importlib.import_module(ns)
)
func: Callable = getattr(obj, func_name)
return await func(**kwargs) return await func(**kwargs)
@ -551,10 +543,8 @@ async def open_portal(
await channel.connect() await channel.connect()
was_connected = True was_connected = True
if channel.aid is None: if channel.uid is None:
await channel._do_handshake( await actor._do_handshake(channel)
aid=actor.aid,
)
msg_loop_cs: trio.CancelScope|None = None msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop: if start_msg_loop:

View File

@ -18,9 +18,7 @@
Root actor runtime ignition(s). Root actor runtime ignition(s).
''' '''
from contextlib import ( from contextlib import asynccontextmanager as acm
asynccontextmanager as acm,
)
from functools import partial from functools import partial
import importlib import importlib
import inspect import inspect
@ -28,10 +26,7 @@ import logging
import os import os
import signal import signal
import sys import sys
from typing import ( from typing import Callable
Any,
Callable,
)
import warnings import warnings
@ -52,105 +47,28 @@ from .ipc import (
_connect_chan, _connect_chan,
) )
from ._addr import ( from ._addr import (
Address, AddressTypes,
UnwrappedAddress,
default_lo_addrs,
mk_uuid,
wrap_address, wrap_address,
preferred_transport,
default_lo_addrs
) )
from ._exceptions import ( from ._exceptions import is_multi_cancelled
RuntimeFailure,
is_multi_cancelled,
)
logger = log.get_logger('tractor') logger = log.get_logger('tractor')
# TODO: stick this in a `@acm` defined in `devx._debug`?
# -[ ] also maybe consider making this a `wrapt`-deco to
# save an indent level?
#
@acm
async def maybe_block_bp(
debug_mode: bool,
maybe_enable_greenback: bool,
) -> bool:
# Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler: Callable = sys.breakpointhook
orig_bp_path: str|None = os.environ.get(
'PYTHONBREAKPOINT',
None,
)
bp_blocked: bool
if (
debug_mode
and maybe_enable_greenback
and (
maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False,
)
)
):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin'
)
_state._runtime_vars['use_greenback'] = True
bp_blocked = False
else:
# TODO: disable `breakpoint()` by default (without
# `greenback`) since it will break any multi-actor
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'Trying to use `breakpoint()` eh?\n\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)
sys.breakpointhook = block_bps
# lol ok,
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
os.environ['PYTHONBREAKPOINT'] = "0"
bp_blocked = True
try:
yield bp_blocked
finally:
# restore any prior built-in `breakpoint()` hook state
if builtin_bp_handler is not None:
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT', None)
@acm @acm
async def open_root_actor( async def open_root_actor(
*, *,
# defaults are above # defaults are above
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[AddressTypes]|None = None,
# defaults are above # defaults are above
arbiter_addr: tuple[UnwrappedAddress]|None = None, arbiter_addr: tuple[AddressTypes]|None = None,
enable_transports: list[ enable_transports: list[str] = [preferred_transport],
_state.TransportProtocolKey,
] = [_state._def_tpt_proto],
name: str|None = 'root', name: str|None = 'root',
@ -192,30 +110,55 @@ async def open_root_actor(
Runtime init entry point for ``tractor``. Runtime init entry point for ``tractor``.
''' '''
# XXX NEVER allow nested actor-trees!
if already_actor := _state.current_actor(err_on_no_runtime=False):
rtvs: dict[str, Any] = _state._runtime_vars
root_mailbox: list[str, int] = rtvs['_root_mailbox']
registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
raise RuntimeFailure(
f'A current actor already exists !?\n'
f'({already_actor}\n'
f'\n'
f'You can NOT open a second root actor from within '
f'an existing tree and the current root of this '
f'already exists !!\n'
f'\n'
f'_root_mailbox: {root_mailbox!r}\n'
f'_registry_addrs: {registry_addrs!r}\n'
)
async with maybe_block_bp(
debug_mode=debug_mode,
maybe_enable_greenback=maybe_enable_greenback,
):
_debug.hide_runtime_frames() _debug.hide_runtime_frames()
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# TODO: stick this in a `@cm` defined in `devx._debug`?
#
# Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler: Callable = sys.breakpointhook
orig_bp_path: str|None = os.environ.get(
'PYTHONBREAKPOINT',
None,
)
if (
debug_mode
and maybe_enable_greenback
and (
maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False,
)
)
):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin'
)
_state._runtime_vars['use_greenback'] = True
else:
# TODO: disable `breakpoint()` by default (without
# `greenback`) since it will break any multi-actor
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'Trying to use `breakpoint()` eh?\n\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)
sys.breakpointhook = block_bps
# lol ok,
# https://docs.python.org/3/library/sys.html#sys.breakpointhook
os.environ['PYTHONBREAKPOINT'] = "0"
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger lock state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
@ -242,7 +185,6 @@ async def open_root_actor(
if start_method is not None: if start_method is not None:
_spawn.try_set_start_method(start_method) _spawn.try_set_start_method(start_method)
# TODO! remove this ASAP!
if arbiter_addr is not None: if arbiter_addr is not None:
warnings.warn( warnings.warn(
'`arbiter_addr` is now deprecated\n' '`arbiter_addr` is now deprecated\n'
@ -253,9 +195,7 @@ async def open_root_actor(
registry_addrs = [arbiter_addr] registry_addrs = [arbiter_addr]
if not registry_addrs: if not registry_addrs:
registry_addrs: list[UnwrappedAddress] = default_lo_addrs( registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports)
enable_transports
)
assert registry_addrs assert registry_addrs
@ -305,10 +245,10 @@ async def open_root_actor(
enable_stack_on_sig() enable_stack_on_sig()
# closed into below ping task-func # closed into below ping task-func
ponged_addrs: list[UnwrappedAddress] = [] ponged_addrs: list[AddressTypes] = []
async def ping_tpt_socket( async def ping_tpt_socket(
addr: UnwrappedAddress, addr: AddressTypes,
timeout: float = 1, timeout: float = 1,
) -> None: ) -> None:
''' '''
@ -344,7 +284,7 @@ async def open_root_actor(
addr, addr,
) )
trans_bind_addrs: list[UnwrappedAddress] = [] trans_bind_addrs: list[AddressTypes] = []
# Create a new local root-actor instance which IS NOT THE # Create a new local root-actor instance which IS NOT THE
# REGISTRAR # REGISTRAR
@ -362,7 +302,6 @@ async def open_root_actor(
actor = Actor( actor = Actor(
name=name or 'anonymous', name=name or 'anonymous',
uuid=mk_uuid(),
registry_addrs=ponged_addrs, registry_addrs=ponged_addrs,
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
@ -370,9 +309,10 @@ async def open_root_actor(
# DO NOT use the registry_addrs as the transport server # DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor. # addrs for this new non-registar, root-actor.
for addr in ponged_addrs: for addr in ponged_addrs:
waddr: Address = wrap_address(addr) waddr = wrap_address(addr)
print(waddr)
trans_bind_addrs.append( trans_bind_addrs.append(
waddr.get_random(bindspace=waddr.bindspace) waddr.get_random(namespace=waddr.namespace)
) )
# Start this local actor as the "registrar", aka a regular # Start this local actor as the "registrar", aka a regular
@ -396,8 +336,7 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/issues/296 # https://github.com/goodboy/tractor/issues/296
actor = Arbiter( actor = Arbiter(
name=name or 'registrar', name or 'registrar',
uuid=mk_uuid(),
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
@ -475,11 +414,7 @@ async def open_root_actor(
err, err,
) )
): ):
logger.exception( logger.exception('Root actor crashed\n')
'Root actor crashed\n'
f'>x)\n'
f' |_{actor}\n'
)
# ALWAYS re-raise any error bubbled up from the # ALWAYS re-raise any error bubbled up from the
# runtime! # runtime!
@ -496,19 +431,30 @@ async def open_root_actor(
# tempn.start_soon(an.exited.wait) # tempn.start_soon(an.exited.wait)
logger.info( logger.info(
f'Closing down root actor\n' 'Closing down root actor'
f'>)\n'
f'|_{actor}\n'
) )
await actor.cancel(None) # self cancel await actor.cancel(None) # self cancel
finally: finally:
_state._current_actor = None _state._current_actor = None
_state._last_actor_terminated = actor _state._last_actor_terminated = actor
logger.runtime(
f'Root actor terminated\n' # restore built-in `breakpoint()` hook state
f')>\n' if (
f' |_{actor}\n' debug_mode
) and
maybe_enable_greenback
):
if builtin_bp_handler is not None:
sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT', None)
logger.runtime("Root actor terminated")
def run_daemon( def run_daemon(
@ -516,7 +462,7 @@ def run_daemon(
# runtime kwargs # runtime kwargs
name: str | None = 'root', name: str | None = 'root',
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[AddressTypes]|None = None,
start_method: str | None = None, start_method: str | None = None,
debug_mode: bool = False, debug_mode: bool = False,

View File

@ -1156,7 +1156,7 @@ async def process_messages(
trio.Event(), trio.Event(),
) )
# XXX RUNTIME-SCOPED! remote (likely internal) error # runtime-scoped remote (internal) error
# (^- bc no `Error.cid` -^) # (^- bc no `Error.cid` -^)
# #
# NOTE: this is the non-rpc error case, that # NOTE: this is the non-rpc error case, that
@ -1219,10 +1219,8 @@ async def process_messages(
# -[ ] figure out how this will break with other transports? # -[ ] figure out how this will break with other transports?
tc.report_n_maybe_raise( tc.report_n_maybe_raise(
message=( message=(
f'peer IPC channel closed abruptly?\n' f'peer IPC channel closed abruptly?\n\n'
f'\n' f'<=x {chan}\n'
f'<=x[\n'
f' {chan}\n'
f' |_{chan.raddr}\n\n' f' |_{chan.raddr}\n\n'
) )
+ +

View File

@ -52,7 +52,6 @@ import sys
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Type,
TYPE_CHECKING, TYPE_CHECKING,
) )
import uuid import uuid
@ -76,11 +75,11 @@ from tractor.msg import (
) )
from .ipc import Channel from .ipc import Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, AddressTypes,
Address, Address,
default_lo_addrs,
get_address_cls,
wrap_address, wrap_address,
preferred_transport,
default_lo_addrs
) )
from ._context import ( from ._context import (
mk_context, mk_context,
@ -183,15 +182,15 @@ class Actor:
def __init__( def __init__(
self, self,
name: str, name: str,
uuid: str,
*, *,
enable_modules: list[str] = [], enable_modules: list[str] = [],
uid: str|None = None,
loglevel: str|None = None, loglevel: str|None = None,
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[AddressTypes]|None = None,
spawn_method: str|None = None, spawn_method: str|None = None,
# TODO: remove! # TODO: remove!
arbiter_addr: UnwrappedAddress|None = None, arbiter_addr: AddressTypes|None = None,
) -> None: ) -> None:
''' '''
@ -199,14 +198,12 @@ class Actor:
phase (aka before a new process is executed). phase (aka before a new process is executed).
''' '''
self._aid = msgtypes.Aid( self.name = name
name=name, self.uid = (
uuid=uuid, name,
pid=os.getpid(), uid or str(uuid.uuid4())
) )
self._task: trio.Task|None = None
# state
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple]|None = None self._cancel_called_by_remote: tuple[str, tuple]|None = None
self._cancel_called: bool = False self._cancel_called: bool = False
@ -233,7 +230,7 @@ class Actor:
DeprecationWarning, DeprecationWarning,
stacklevel=2, stacklevel=2,
) )
registry_addrs: list[UnwrappedAddress] = [arbiter_addr] registry_addrs: list[AddressTypes] = [arbiter_addr]
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -280,97 +277,13 @@ class Actor:
# when provided, init the registry addresses property from # when provided, init the registry addresses property from
# input via the validator. # input via the validator.
self._reg_addrs: list[UnwrappedAddress] = [] self._reg_addrs: list[AddressTypes] = []
if registry_addrs: if registry_addrs:
self.reg_addrs: list[UnwrappedAddress] = registry_addrs self.reg_addrs: list[AddressTypes] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs _state._runtime_vars['_registry_addrs'] = registry_addrs
@property @property
def aid(self) -> msgtypes.Aid: def reg_addrs(self) -> list[AddressTypes]:
'''
This process-singleton-actor's "unique actor ID" in struct form.
See the `tractor.msg.Aid` struct for details.
'''
return self._aid
@property
def name(self) -> str:
return self._aid.name
@property
def uid(self) -> tuple[str, str]:
'''
This process-singleton's "unique (cross-host) ID".
Delivered from the `.Aid.name/.uuid` fields as a `tuple` pair
and should be multi-host unique despite a large distributed
process plane.
'''
msg: str = (
f'`{type(self).__name__}.uid` is now deprecated.\n'
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
'which also provides additional named (optional) fields '
'beyond just the `.name` and `.uuid`.'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
return (
self._aid.name,
self._aid.uuid,
)
@property
def pid(self) -> int:
return self._aid.pid
def pformat(self) -> str:
ds: str = '='
parent_uid: tuple|None = None
if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid
peers: list[tuple] = list(self._peer_connected)
listen_addrs: str = pformat(self._listen_addrs)
fmtstr: str = (
f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n"
f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f" _listen_addrs{ds}'{listen_addrs}'\n"
f" _listeners{ds}'{self._listeners}'\n"
f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n'
f'\n'
f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n'
)
return (
'<Actor(\n'
+
fmtstr
+
')>\n'
)
__repr__ = pformat
@property
def reg_addrs(self) -> list[UnwrappedAddress]:
''' '''
List of (socket) addresses for all known (and contactable) List of (socket) addresses for all known (and contactable)
registry actors. registry actors.
@ -381,7 +294,7 @@ class Actor:
@reg_addrs.setter @reg_addrs.setter
def reg_addrs( def reg_addrs(
self, self,
addrs: list[UnwrappedAddress], addrs: list[AddressTypes],
) -> None: ) -> None:
if not addrs: if not addrs:
log.warning( log.warning(
@ -507,23 +420,14 @@ class Actor:
# send/receive initial handshake response # send/receive initial handshake response
try: try:
peer_aid: msgtypes.Aid = await chan._do_handshake( uid: tuple|None = await self._do_handshake(chan)
aid=self.aid,
)
except ( except (
TransportClosed, # we need this for ``msgspec`` for some reason?
# ^XXX NOTE, the above wraps `trio` exc types raised # for now, it's been put in the stream backend.
# during various `SocketStream.send/receive_xx()` calls
# under different fault conditions such as,
#
# trio.BrokenResourceError, # trio.BrokenResourceError,
# trio.ClosedResourceError, # trio.ClosedResourceError,
#
# Inside our `.ipc._transport` layer we absorb and TransportClosed,
# re-raise our own `TransportClosed` exc such that this
# higher level runtime code can only worry one
# "kinda-error" that we expect to tolerate during
# discovery-sys related pings, queires, DoS etc.
): ):
# XXX: This may propagate up from `Channel._aiter_recv()` # XXX: This may propagate up from `Channel._aiter_recv()`
# and `MsgpackStream._inter_packets()` on a read from the # and `MsgpackStream._inter_packets()` on a read from the
@ -538,12 +442,6 @@ class Actor:
) )
return return
uid: tuple[str, str] = (
peer_aid.name,
peer_aid.uuid,
)
# TODO, can we make this downstream peer tracking use the
# `peer_aid` instead?
familiar: str = 'new-peer' familiar: str = 'new-peer'
if _pre_chan := self._peers.get(uid): if _pre_chan := self._peers.get(uid):
familiar: str = 'pre-existing-peer' familiar: str = 'pre-existing-peer'
@ -1125,12 +1023,11 @@ class Actor:
async def _from_parent( async def _from_parent(
self, self,
parent_addr: UnwrappedAddress|None, parent_addr: AddressTypes|None,
) -> tuple[ ) -> tuple[
Channel, Channel,
list[UnwrappedAddress]|None, list[AddressTypes]|None,
list[str]|None, # preferred tpts
]: ]:
''' '''
Bootstrap this local actor's runtime config from its parent by Bootstrap this local actor's runtime config from its parent by
@ -1142,25 +1039,20 @@ class Actor:
# Connect back to the parent actor and conduct initial # Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we # handshake. From this point on if we error, we
# attempt to ship the exception back to the parent. # attempt to ship the exception back to the parent.
chan = await Channel.from_addr( chan = await Channel.from_addr(wrap_address(parent_addr))
addr=wrap_address(parent_addr)
)
assert isinstance(chan, Channel)
# TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names. # Initial handshake: swap names.
await chan._do_handshake(aid=self.aid) await self._do_handshake(chan)
accept_addrs: list[UnwrappedAddress]|None = None accept_addrs: list[AddressTypes]|None = None
if self._spawn_method == "trio": if self._spawn_method == "trio":
# Receive post-spawn runtime state from our parent. # Receive post-spawn runtime state from our parent.
spawnspec: msgtypes.SpawnSpec = await chan.recv() spawnspec: msgtypes.SpawnSpec = await chan.recv()
match spawnspec:
case MsgTypeError():
raise spawnspec
case msgtypes.SpawnSpec():
self._spawn_spec = spawnspec self._spawn_spec = spawnspec
log.runtime( log.runtime(
'Received runtime spec from parent:\n\n' 'Received runtime spec from parent:\n\n'
@ -1170,29 +1062,7 @@ class Actor:
# if "trace"/"util" mode is enabled? # if "trace"/"util" mode is enabled?
f'{pretty_struct.pformat(spawnspec)}\n' f'{pretty_struct.pformat(spawnspec)}\n'
) )
accept_addrs: list[AddressTypes] = spawnspec.bind_addrs
case _:
raise InternalError(
f'Received invalid non-`SpawnSpec` payload !?\n'
f'{spawnspec}\n'
)
# ^^TODO XXX!! when the `SpawnSpec` fails to decode
# the above will raise a `MsgTypeError` which if we
# do NOT ALSO RAISE it will tried to be pprinted in
# the log.runtime() below..
#
# SO we gotta look at how other `chan.recv()` calls
# are wrapped and do the same for this spec receive!
# -[ ] see `._rpc` likely has the answer?
#
# XXX NOTE, can't be called here in subactor
# bc we haven't yet received the
# `SpawnSpec._runtime_vars: dict` which would
# declare whether `debug_mode` is set!
# breakpoint()
# import pdbp; pdbp.set_trace()
accept_addrs: list[UnwrappedAddress] = spawnspec.bind_addrs
# TODO: another `Struct` for rtvs.. # TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars rvs: dict[str, Any] = spawnspec._runtime_vars
@ -1284,19 +1154,11 @@ class Actor:
return ( return (
chan, chan,
accept_addrs, accept_addrs,
None,
# ^TODO, preferred tpts list from rent!
# -[ ] need to extend the `SpawnSpec` tho!
) )
# failed to connect back? except OSError: # failed to connect
except (
OSError,
ConnectionError,
):
log.warning( log.warning(
f'Failed to connect to spawning parent actor!?\n' f'Failed to connect to spawning parent actor!?\n'
f'\n'
f'x=> {parent_addr}\n' f'x=> {parent_addr}\n'
f'|_{self}\n\n' f'|_{self}\n\n'
) )
@ -1307,13 +1169,12 @@ class Actor:
self, self,
handler_nursery: Nursery, handler_nursery: Nursery,
*, *,
listen_addrs: list[UnwrappedAddress]|None = None, listen_addrs: list[AddressTypes]|None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
Start the IPC transport server, begin listening/accepting new Start the IPC transport server, begin listening for new connections.
`trio.SocketStream` connections.
This will cause an actor to continue living (and thus This will cause an actor to continue living (and thus
blocking at the process/OS-thread level) until blocking at the process/OS-thread level) until
@ -1321,9 +1182,7 @@ class Actor:
''' '''
if listen_addrs is None: if listen_addrs is None:
listen_addrs = default_lo_addrs([ listen_addrs = default_lo_addrs([preferred_transport])
_state._def_tpt_proto
])
else: else:
listen_addrs: list[Address] = [ listen_addrs: list[Address] = [
@ -1333,24 +1192,10 @@ class Actor:
self._server_down = trio.Event() self._server_down = trio.Event()
try: try:
async with trio.open_nursery() as server_n: async with trio.open_nursery() as server_n:
listeners: list[trio.abc.Listener] = [
listeners: list[trio.abc.Listener] = [] await addr.open_listener()
for addr in listen_addrs: for addr in listen_addrs
try: ]
listener: trio.abc.Listener = await addr.open_listener()
except OSError as oserr:
if (
'[Errno 98] Address already in use'
in
oserr.args#[0]
):
log.exception(
f'Address already in use?\n'
f'{addr}\n'
)
raise
listeners.append(listener)
await server_n.start( await server_n.start(
partial( partial(
trio.serve_listeners, trio.serve_listeners,
@ -1363,10 +1208,8 @@ class Actor:
handler_nursery=handler_nursery handler_nursery=handler_nursery
) )
) )
# TODO, wow make this message better! XD log.runtime(
log.info(
'Started server(s)\n' 'Started server(s)\n'
+
'\n'.join([f'|_{addr}' for addr in listen_addrs]) '\n'.join([f'|_{addr}' for addr in listen_addrs])
) )
self._listen_addrs.extend(listen_addrs) self._listen_addrs.extend(listen_addrs)
@ -1375,10 +1218,8 @@ class Actor:
task_status.started(server_n) task_status.started(server_n)
finally: finally:
addr: Address
for addr in listen_addrs: for addr in listen_addrs:
addr.close_listener() await addr.close_listener()
# signal the server is down since nursery above terminated # signal the server is down since nursery above terminated
self._server_down.set() self._server_down.set()
@ -1485,13 +1326,8 @@ class Actor:
if self._server_down is not None: if self._server_down is not None:
await self._server_down.wait() await self._server_down.wait()
else: else:
tpt_protos: list[str] = []
addr: Address
for addr in self._listen_addrs:
tpt_protos.append(addr.proto_key)
log.warning( log.warning(
'Transport server(s) may have been cancelled before started?\n' 'Transport[TCP] server was cancelled start?'
f'protos: {tpt_protos!r}\n'
) )
# cancel all rpc tasks permanently # cancel all rpc tasks permanently
@ -1742,7 +1578,7 @@ class Actor:
return False return False
@property @property
def accept_addrs(self) -> list[UnwrappedAddress]: def accept_addrs(self) -> list[AddressTypes]:
''' '''
All addresses to which the transport-channel server binds All addresses to which the transport-channel server binds
and listens for new connections. and listens for new connections.
@ -1751,7 +1587,7 @@ class Actor:
return [a.unwrap() for a in self._listen_addrs] return [a.unwrap() for a in self._listen_addrs]
@property @property
def accept_addr(self) -> UnwrappedAddress: def accept_addr(self) -> AddressTypes:
''' '''
Primary address to which the IPC transport server is Primary address to which the IPC transport server is
bound and listening for new connections. bound and listening for new connections.
@ -1778,6 +1614,43 @@ class Actor:
''' '''
return self._peers[uid] return self._peers[uid]
# TODO: move to `Channel.handshake(uid)`
async def _do_handshake(
self,
chan: Channel
) -> msgtypes.Aid:
'''
Exchange `(name, UUIDs)` identifiers as the first
communication step with any (peer) remote `Actor`.
These are essentially the "mailbox addresses" found in
"actor model" parlance.
'''
name, uuid = self.uid
await chan.send(
msgtypes.Aid(
name=name,
uuid=uuid,
)
)
aid: msgtypes.Aid = await chan.recv()
chan.aid = aid
uid: tuple[str, str] = (
# str(value[0]),
# str(value[1])
aid.name,
aid.uuid,
)
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
return uid
def is_infected_aio(self) -> bool: def is_infected_aio(self) -> bool:
''' '''
If `True`, this actor is running `trio` in guest mode on If `True`, this actor is running `trio` in guest mode on
@ -1791,7 +1664,7 @@ class Actor:
async def async_main( async def async_main(
actor: Actor, actor: Actor,
accept_addrs: UnwrappedAddress|None = None, accept_addrs: AddressTypes|None = None,
# XXX: currently ``parent_addr`` is only needed for the # XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to # ``multiprocessing`` backend (which pickles state sent to
@ -1800,7 +1673,7 @@ async def async_main(
# change this to a simple ``is_subactor: bool`` which will # change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as # be False when running as root actor and True when as
# a subactor. # a subactor.
parent_addr: UnwrappedAddress|None = None, parent_addr: AddressTypes|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1815,8 +1688,6 @@ async def async_main(
the actor's "runtime" and all thus all ongoing RPC tasks. the actor's "runtime" and all thus all ongoing RPC tasks.
''' '''
actor._task: trio.Task = trio.lowlevel.current_task()
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger state. # on our debugger state.
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
@ -1826,15 +1697,13 @@ async def async_main(
# establish primary connection with immediate parent # establish primary connection with immediate parent
actor._parent_chan: Channel|None = None actor._parent_chan: Channel|None = None
if parent_addr is not None: if parent_addr is not None:
( (
actor._parent_chan, actor._parent_chan,
set_accept_addr_says_rent, set_accept_addr_says_rent,
maybe_preferred_transports_says_rent,
) = await actor._from_parent(parent_addr) ) = await actor._from_parent(parent_addr)
accept_addrs: list[UnwrappedAddress] = []
# either it's passed in because we're not a child or # either it's passed in because we're not a child or
# because we're running in mp mode # because we're running in mp mode
if ( if (
@ -1843,18 +1712,6 @@ async def async_main(
set_accept_addr_says_rent is not None set_accept_addr_says_rent is not None
): ):
accept_addrs = set_accept_addr_says_rent accept_addrs = set_accept_addr_says_rent
else:
enable_transports: list[str] = (
maybe_preferred_transports_says_rent
or
[_state._def_tpt_proto]
)
for transport_key in enable_transports:
transport_cls: Type[Address] = get_address_cls(
transport_key
)
addr: Address = transport_cls.get_random()
accept_addrs.append(addr.unwrap())
# The "root" nursery ensures the channel with the immediate # The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
@ -1922,7 +1779,7 @@ async def async_main(
raise raise
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs accept_addrs: list[AddressTypes] = actor.accept_addrs
# NOTE: only set the loopback addr for the # NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since # process-tree-global "root" mailbox since
@ -2135,15 +1992,15 @@ async def async_main(
log.info(teardown_report) log.info(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`! # TODO: rename to `Registry` and move to `._discovery`!
class Arbiter(Actor): class Arbiter(Actor):
''' '''
A special registrar (and for now..) `Actor` who can contact all A special registrar actor who can contact all other actors
other actors within its immediate process tree and possibly keeps within its immediate process tree and possibly keeps a registry
a registry of others meant to be discoverable in a distributed of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor" and application. Normally the registrar is also the "root actor"
thus always has access to the top-most-level actor (process) and thus always has access to the top-most-level actor
nursery. (process) nursery.
By default, the registrar is always initialized when and if no By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime other registrar socket addrs have been specified to runtime
@ -2163,12 +2020,6 @@ class Arbiter(Actor):
''' '''
is_arbiter = True is_arbiter = True
# TODO, implement this as a read on there existing a `._state` of
# some sort setup by whenever we impl this all as
# a `.discovery._registry.open_registry()` API
def is_registry(self) -> bool:
return self.is_arbiter
def __init__( def __init__(
self, self,
*args, *args,
@ -2177,7 +2028,7 @@ class Arbiter(Actor):
self._registry: dict[ self._registry: dict[
tuple[str, str], tuple[str, str],
UnwrappedAddress, AddressTypes,
] = {} ] = {}
self._waiters: dict[ self._waiters: dict[
str, str,
@ -2193,7 +2044,7 @@ class Arbiter(Actor):
self, self,
name: str, name: str,
) -> UnwrappedAddress|None: ) -> AddressTypes|None:
for uid, addr in self._registry.items(): for uid, addr in self._registry.items():
if name in uid: if name in uid:
@ -2204,7 +2055,7 @@ class Arbiter(Actor):
async def get_registry( async def get_registry(
self self
) -> dict[str, UnwrappedAddress]: ) -> dict[str, AddressTypes]:
''' '''
Return current name registry. Return current name registry.
@ -2224,7 +2075,7 @@ class Arbiter(Actor):
self, self,
name: str, name: str,
) -> list[UnwrappedAddress]: ) -> list[AddressTypes]:
''' '''
Wait for a particular actor to register. Wait for a particular actor to register.
@ -2232,8 +2083,8 @@ class Arbiter(Actor):
registered. registered.
''' '''
addrs: list[UnwrappedAddress] = [] addrs: list[AddressTypes] = []
addr: UnwrappedAddress addr: AddressTypes
mailbox_info: str = 'Actor registry contact infos:\n' mailbox_info: str = 'Actor registry contact infos:\n'
for uid, addr in self._registry.items(): for uid, addr in self._registry.items():
@ -2259,7 +2110,7 @@ class Arbiter(Actor):
async def register_actor( async def register_actor(
self, self,
uid: tuple[str, str], uid: tuple[str, str],
addr: UnwrappedAddress addr: AddressTypes
) -> None: ) -> None:
uid = name, hash = (str(uid[0]), str(uid[1])) uid = name, hash = (str(uid[0]), str(uid[1]))
waddr: Address = wrap_address(addr) waddr: Address = wrap_address(addr)

View File

@ -46,13 +46,12 @@ from tractor._state import (
_runtime_vars, _runtime_vars,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor._addr import UnwrappedAddress from tractor._addr import AddressTypes
from tractor._portal import Portal from tractor._portal import Portal
from tractor._runtime import Actor from tractor._runtime import Actor
from tractor._entry import _mp_main from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure from tractor._exceptions import ActorFailure
from tractor.msg.types import ( from tractor.msg.types import (
Aid,
SpawnSpec, SpawnSpec,
) )
@ -165,7 +164,7 @@ async def exhaust_portal(
# TODO: merge with above? # TODO: merge with above?
log.warning( log.warning(
'Cancelled portal result waiter task:\n' 'Cancelled portal result waiter task:\n'
f'uid: {portal.channel.aid}\n' f'uid: {portal.channel.uid}\n'
f'error: {err}\n' f'error: {err}\n'
) )
return err return err
@ -173,7 +172,7 @@ async def exhaust_portal(
else: else:
log.debug( log.debug(
f'Returning final result from portal:\n' f'Returning final result from portal:\n'
f'uid: {portal.channel.aid}\n' f'uid: {portal.channel.uid}\n'
f'result: {final}\n' f'result: {final}\n'
) )
return final return final
@ -326,12 +325,12 @@ async def soft_kill(
see `.hard_kill()`). see `.hard_kill()`).
''' '''
peer_aid: Aid = portal.channel.aid uid: tuple[str, str] = portal.channel.uid
try: try:
log.cancel( log.cancel(
f'Soft killing sub-actor via portal request\n' f'Soft killing sub-actor via portal request\n'
f'\n' f'\n'
f'(c=> {peer_aid}\n' f'(c=> {portal.chan.uid}\n'
f' |_{proc}\n' f' |_{proc}\n'
) )
# wait on sub-proc to signal termination # wait on sub-proc to signal termination
@ -380,7 +379,7 @@ async def soft_kill(
if proc.poll() is None: # type: ignore if proc.poll() is None: # type: ignore
log.warning( log.warning(
'Subactor still alive after cancel request?\n\n' 'Subactor still alive after cancel request?\n\n'
f'uid: {peer_aid}\n' f'uid: {uid}\n'
f'|_{proc}\n' f'|_{proc}\n'
) )
n.cancel_scope.cancel() n.cancel_scope.cancel()
@ -394,8 +393,8 @@ async def new_proc(
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addrs: list[UnwrappedAddress], bind_addrs: list[AddressTypes],
parent_addr: UnwrappedAddress, parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
@ -433,8 +432,8 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addrs: list[UnwrappedAddress], bind_addrs: list[AddressTypes],
parent_addr: UnwrappedAddress, parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False, infect_asyncio: bool = False,
@ -461,9 +460,6 @@ async def trio_proc(
# the OS; it otherwise can be passed via the parent channel if # the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy). # we prefer in the future (for privacy).
"--uid", "--uid",
# TODO, how to pass this over "wire" encodings like
# cmdline args?
# -[ ] maybe we can add an `Aid.min_tuple()` ?
str(subactor.uid), str(subactor.uid),
# Address the child must connect to on startup # Address the child must connect to on startup
"--parent_addr", "--parent_addr",
@ -643,8 +639,8 @@ async def mp_proc(
subactor: Actor, subactor: Actor,
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addrs: list[UnwrappedAddress], bind_addrs: list[AddressTypes],
parent_addr: UnwrappedAddress, parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False, infect_asyncio: bool = False,
@ -729,8 +725,7 @@ async def mp_proc(
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid, subactor.uid)
)
# XXX: monkey patch poll API to match the ``subprocess`` API.. # XXX: monkey patch poll API to match the ``subprocess`` API..
# not sure why they don't expose this but kk. # not sure why they don't expose this but kk.

View File

@ -14,19 +14,16 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' """
Per actor-process runtime state mgmt APIs. Per process state
''' """
from __future__ import annotations from __future__ import annotations
from contextvars import ( from contextvars import (
ContextVar, ContextVar,
) )
import os
from pathlib import Path
from typing import ( from typing import (
Any, Any,
Literal,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -146,30 +143,3 @@ def current_ipc_ctx(
f'|_{current_task()}\n' f'|_{current_task()}\n'
) )
return ctx return ctx
# std ODE (mutable) app state location
_rtdir: Path = Path(os.environ['XDG_RUNTIME_DIR'])
def get_rt_dir(
subdir: str = 'tractor'
) -> Path:
'''
Return the user "runtime dir" where most userspace apps stick
their IPC and cache related system util-files; we take hold
of a `'XDG_RUNTIME_DIR'/tractor/` subdir by default.
'''
rtdir: Path = _rtdir / subdir
if not rtdir.is_dir():
rtdir.mkdir()
return rtdir
# default IPC transport protocol settings
TransportProtocolKey = Literal[
'tcp',
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'

View File

@ -437,23 +437,22 @@ class MsgStream(trio.abc.Channel):
message: str = ( message: str = (
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n' f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
# } bc a stream is a "scope"/msging-phase inside an IPC # } bc a stream is a "scope"/msging-phase inside an IPC
f'c}}>\n' f'x}}>\n'
f' |_{self}\n' f' |_{self}\n'
) )
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
if ( if (
(rx_chan := self._rx_chan) (rx_chan := self._rx_chan)
and and
(stats := rx_chan.statistics()).tasks_waiting_receive (stats := rx_chan.statistics()).tasks_waiting_receive
): ):
message += ( log.cancel(
f'AND there is still reader tasks,\n' f'Msg-stream is closing but there is still reader tasks,\n'
f'\n'
f'{stats}\n' f'{stats}\n'
) )
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <= # => NO, DEFINITELY NOT! <=
# if we're a bi-dir `MsgStream` BECAUSE this same # if we're a bi-dir `MsgStream` BECAUSE this same
@ -596,17 +595,8 @@ class MsgStream(trio.abc.Channel):
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
BrokenPipeError, BrokenPipeError,
) as _trans_err: ) as trans_err:
trans_err = _trans_err if hide_tb:
if (
hide_tb
and
self._ctx.chan._exc is trans_err
# ^XXX, IOW, only if the channel is marked errored
# for the same reason as whatever its underlying
# transport raised, do we keep the full low-level tb
# suppressed from the user.
):
raise type(trans_err)( raise type(trans_err)(
*trans_err.args *trans_err.args
) from trans_err ) from trans_err
@ -812,12 +802,13 @@ async def open_stream_from_ctx(
# sanity, can remove? # sanity, can remove?
assert eoc is stream._eoc assert eoc is stream._eoc
log.runtime( log.warning(
'Stream was terminated by EoC\n\n' 'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but # NOTE: won't show the error <Type> but
# does show txt followed by IPC msg. # does show txt followed by IPC msg.
f'{str(eoc)}\n' f'{str(eoc)}\n'
) )
finally: finally:
if ctx._portal: if ctx._portal:
try: try:

View File

@ -22,9 +22,7 @@ from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import inspect import inspect
from pprint import pformat from pprint import pformat
from typing import ( from typing import TYPE_CHECKING
TYPE_CHECKING,
)
import typing import typing
import warnings import warnings
@ -33,8 +31,9 @@ import trio
from .devx._debug import maybe_wait_for_debugger from .devx._debug import maybe_wait_for_debugger
from ._addr import ( from ._addr import (
UnwrappedAddress, AddressTypes,
mk_uuid, preferred_transport,
get_address_cls
) )
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
@ -44,9 +43,7 @@ from ._exceptions import (
is_multi_cancelled, is_multi_cancelled,
ContextCancelled, ContextCancelled,
) )
from ._root import ( from ._root import open_root_actor
open_root_actor,
)
from . import _state from . import _state
from . import _spawn from . import _spawn
@ -137,9 +134,9 @@ class ActorNursery:
*, *,
bind_addrs: list[UnwrappedAddress]|None = None, bind_addrs: list[AddressTypes]|None = None,
rpc_module_paths: list[str]|None = None, rpc_module_paths: list[str]|None = None,
enable_transports: list[str] = [_state._def_tpt_proto], enable_transports: list[str] = [preferred_transport],
enable_modules: list[str]|None = None, enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor loglevel: str|None = None, # set log level per subactor
debug_mode: bool|None = None, debug_mode: bool|None = None,
@ -164,6 +161,12 @@ class ActorNursery:
or get_loglevel() or get_loglevel()
) )
if not bind_addrs:
bind_addrs: list[AddressTypes] = [
get_address_cls(transport).get_random().unwrap()
for transport in enable_transports
]
# configure and pass runtime state # configure and pass runtime state
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
@ -186,9 +189,7 @@ class ActorNursery:
enable_modules.extend(rpc_module_paths) enable_modules.extend(rpc_module_paths)
subactor = Actor( subactor = Actor(
name=name, name,
uuid=mk_uuid(),
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
enable_modules=enable_modules, enable_modules=enable_modules,
loglevel=loglevel, loglevel=loglevel,
@ -196,7 +197,7 @@ class ActorNursery:
# verbatim relay this actor's registrar addresses # verbatim relay this actor's registrar addresses
registry_addrs=current_actor().reg_addrs, registry_addrs=current_actor().reg_addrs,
) )
parent_addr: UnwrappedAddress = self._actor.accept_addr parent_addr = self._actor.accept_addr
assert parent_addr assert parent_addr
# start a task to spawn a process # start a task to spawn a process
@ -234,7 +235,7 @@ class ActorNursery:
*, *,
name: str | None = None, name: str | None = None,
bind_addrs: UnwrappedAddress|None = None, bind_addrs: AddressTypes|None = None,
rpc_module_paths: list[str] | None = None, rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None, enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor loglevel: str | None = None, # set log level per subactor

View File

@ -73,7 +73,6 @@ from tractor.log import get_logger
from tractor._context import Context from tractor._context import Context
from tractor import _state from tractor import _state
from tractor._exceptions import ( from tractor._exceptions import (
DebugRequestError,
InternalError, InternalError,
NoRuntime, NoRuntime,
is_multi_cancelled, is_multi_cancelled,
@ -1741,6 +1740,13 @@ def sigint_shield(
_pause_msg: str = 'Opening a pdb REPL in paused actor' _pause_msg: str = 'Opening a pdb REPL in paused actor'
class DebugRequestError(RuntimeError):
'''
Failed to request stdio lock from root actor!
'''
_repl_fail_msg: str|None = ( _repl_fail_msg: str|None = (
'Failed to REPl via `_pause()` ' 'Failed to REPl via `_pause()` '
) )

View File

@ -19,7 +19,6 @@ Pretty formatters for use throughout the code base.
Mostly handy for logging and exception message content. Mostly handy for logging and exception message content.
''' '''
import sys
import textwrap import textwrap
import traceback import traceback
@ -116,85 +115,6 @@ def pformat_boxed_tb(
) )
def pformat_exc(
exc: Exception,
header: str = '',
message: str = '',
body: str = '',
with_type_header: bool = True,
) -> str:
# XXX when the currently raised exception is this instance,
# we do not ever use the "type header" style repr.
is_being_raised: bool = False
if (
(curr_exc := sys.exception())
and
curr_exc is exc
):
is_being_raised: bool = True
with_type_header: bool = (
with_type_header
and
not is_being_raised
)
# <RemoteActorError( .. )> style
if (
with_type_header
and
not header
):
header: str = f'<{type(exc).__name__}('
message: str = (
message
or
exc.message
)
if message:
# split off the first line so, if needed, it isn't
# indented the same like the "boxed content" which
# since there is no `.tb_str` is just the `.message`.
lines: list[str] = message.splitlines()
first: str = lines[0]
message: str = message.removeprefix(first)
# with a type-style header we,
# - have no special message "first line" extraction/handling
# - place the message a space in from the header:
# `MsgTypeError( <message> ..`
# ^-here
# - indent the `.message` inside the type body.
if with_type_header:
first = f' {first} )>'
message: str = textwrap.indent(
message,
prefix=' '*2,
)
message: str = first + message
tail: str = ''
if (
with_type_header
and
not message
):
tail: str = '>'
return (
header
+
message
+
f'{body}'
+
tail
)
def pformat_caller_frame( def pformat_caller_frame(
stack_limit: int = 1, stack_limit: int = 1,
box_tb: bool = True, box_tb: bool = True,

View File

@ -45,8 +45,6 @@ __all__ = ['pub']
log = get_logger('messaging') log = get_logger('messaging')
# TODO! this needs to reworked to use the modern
# `Context`/`MsgStream` APIs!!
async def fan_out_to_ctxs( async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: dict[str, list], topics2ctxs: dict[str, list],

View File

@ -30,7 +30,6 @@ import typing
from typing import ( from typing import (
Any, Any,
) )
import warnings
import trio import trio
@ -40,20 +39,16 @@ from tractor.ipc._types import (
transport_from_stream, transport_from_stream,
) )
from tractor._addr import ( from tractor._addr import (
is_wrapped_addr,
wrap_address, wrap_address,
Address, Address,
UnwrappedAddress, AddressTypes
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor._exceptions import ( from tractor._exceptions import (
MsgTypeError, MsgTypeError,
pack_from_raise, pack_from_raise,
) )
from tractor.msg import ( from tractor.msg import MsgCodec
Aid,
MsgCodec,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -89,12 +84,11 @@ class Channel:
# user in ``.from_stream()``. # user in ``.from_stream()``.
self._transport: MsgTransport|None = transport self._transport: MsgTransport|None = transport
# set after handshake - always info from peer end # set after handshake - always uid of far end
self.aid: Aid|None = None self.uid: tuple[str, str]|None = None
self._aiter_msgs = self._iter_msgs() self._aiter_msgs = self._iter_msgs()
self._exc: Exception|None = None self._exc: Exception|None = None # set if far end actor errors
# ^XXX! ONLY set if a remote actor sends an `Error`-msg
self._closed: bool = False self._closed: bool = False
# flag set by ``Portal.cancel_actor()`` indicating remote # flag set by ``Portal.cancel_actor()`` indicating remote
@ -102,29 +96,6 @@ class Channel:
# runtime. # runtime.
self._cancel_called: bool = False self._cancel_called: bool = False
@property
def uid(self) -> tuple[str, str]:
'''
Peer actor's unique id.
'''
msg: str = (
f'`{type(self).__name__}.uid` is now deprecated.\n'
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
'which also provides additional named (optional) fields '
'beyond just the `.name` and `.uuid`.'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
peer_aid: Aid = self.aid
return (
peer_aid.name,
peer_aid.uuid,
)
@property @property
def stream(self) -> trio.abc.Stream | None: def stream(self) -> trio.abc.Stream | None:
return self._transport.stream if self._transport else None return self._transport.stream if self._transport else None
@ -153,26 +124,17 @@ class Channel:
@classmethod @classmethod
async def from_addr( async def from_addr(
cls, cls,
addr: UnwrappedAddress, addr: AddressTypes,
**kwargs **kwargs
) -> Channel: ) -> Channel:
if not is_wrapped_addr(addr):
addr: Address = wrap_address(addr) addr: Address = wrap_address(addr)
transport_cls = transport_from_addr(addr) transport_cls = transport_from_addr(addr)
transport = await transport_cls.connect_to( transport = await transport_cls.connect_to(addr, **kwargs)
addr,
**kwargs, log.transport(
f'Opened channel[{type(transport)}]: {transport.laddr} -> {transport.raddr}'
) )
assert transport.raddr == addr return Channel(transport=transport)
chan = Channel(transport=transport)
log.runtime(
f'Connected channel IPC transport\n'
f'[>\n'
f' |_{chan}\n'
)
return chan
@cm @cm
def apply_codec( def apply_codec(
@ -192,48 +154,16 @@ class Channel:
self._transport.codec = orig self._transport.codec = orig
# TODO: do a .src/.dst: str for maddrs? # TODO: do a .src/.dst: str for maddrs?
def pformat(self) -> str: def __repr__(self) -> str:
if not self._transport: if not self._transport:
return '<Channel with inactive transport?>' return '<Channel with inactive transport?>'
tpt: MsgTransport = self._transport return repr(
tpt_name: str = type(tpt).__name__ self._transport
tpt_status: str = ( ).replace( # type: ignore
'connected' if self.connected() "socket.socket",
else 'closed' "Channel",
) )
return (
f'<Channel(\n'
f' |_status: {tpt_status!r}\n'
f' _closed={self._closed}\n'
f' _cancel_called={self._cancel_called}\n'
f'\n'
f' |_peer: {self.aid}\n'
f'\n'
f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n'
f' layer={tpt.layer_key!r}\n'
f' laddr={tpt.laddr}\n'
f' raddr={tpt.raddr}\n'
f' codec={tpt.codec_key!r}\n'
f' stream={tpt.stream}\n'
f' maddr={tpt.maddr!r}\n'
f' drained={tpt.drained}\n'
f' _send_lock={tpt._send_lock.statistics()}\n'
f')>\n'
)
# NOTE: making this return a value that can be passed to
# `eval()` is entirely **optional** FYI!
# https://docs.python.org/3/library/functions.html#repr
# https://docs.python.org/3/reference/datamodel.html#object.__repr__
#
# Currently we target **readability** from a (console)
# logging perspective over `eval()`-ability since we do NOT
# target serializing non-struct instances!
# def __repr__(self) -> str:
__str__ = pformat
__repr__ = pformat
@property @property
def laddr(self) -> Address|None: def laddr(self) -> Address|None:
@ -305,7 +235,7 @@ class Channel:
async def aclose(self) -> None: async def aclose(self) -> None:
log.transport( log.transport(
f'Closing channel to {self.aid} ' f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}' f'{self.laddr} -> {self.raddr}'
) )
assert self._transport assert self._transport
@ -405,33 +335,10 @@ class Channel:
def connected(self) -> bool: def connected(self) -> bool:
return self._transport.connected() if self._transport else False return self._transport.connected() if self._transport else False
async def _do_handshake(
self,
aid: Aid,
) -> Aid:
'''
Exchange `(name, UUIDs)` identifiers as the first
communication step with any (peer) remote `Actor`.
These are essentially the "mailbox addresses" found in
"actor model" parlance.
'''
await self.send(aid)
peer_aid: Aid = await self.recv()
log.runtime(
f'Received hanshake with peer actor,\n'
f'{peer_aid}\n'
)
# NOTE, we always are referencing the remote peer!
self.aid = peer_aid
return peer_aid
@acm @acm
async def _connect_chan( async def _connect_chan(
addr: UnwrappedAddress addr: AddressTypes
) -> typing.AsyncGenerator[Channel, None]: ) -> typing.AsyncGenerator[Channel, None]:
''' '''
Create and connect a channel with disconnect on context manager Create and connect a channel with disconnect on context manager

View File

@ -50,10 +50,7 @@ if _USE_POSIX:
try: try:
import numpy as np import numpy as np
from numpy.lib import recfunctions as rfn from numpy.lib import recfunctions as rfn
# TODO ruff complains with, import nptyping
# warning| F401: `nptyping` imported but unused; consider using
# `importlib.util.find_spec` to test for availability
import nptyping # noqa
except ImportError: except ImportError:
pass pass

View File

@ -42,16 +42,25 @@ class MsgpackTCPStream(MsgpackTransport):
address_type = TCPAddress address_type = TCPAddress
layer_key: int = 4 layer_key: int = 4
# def __init__(
# self,
# stream: trio.SocketStream,
# prefix_size: int = 4,
# codec: CodecType = None,
# ) -> None:
# super().__init__(
# stream,
# prefix_size=prefix_size,
# codec=codec
# )
@property @property
def maddr(self) -> str: def maddr(self) -> str:
host, port = self.raddr.unwrap() host, port = self.raddr.unwrap()
return ( return (
# TODO, use `ipaddress` from stdlib to handle
# first detecting which of `ipv4/6` before
# choosing the routing prefix part.
f'/ipv4/{host}' f'/ipv4/{host}'
f'/{self.address_type.name_key}/{port}'
f'/{self.address_type.proto_key}/{port}'
# f'/{self.chan.uid[0]}' # f'/{self.chan.uid[0]}'
# f'/{self.cid}' # f'/{self.cid}'
@ -85,15 +94,12 @@ class MsgpackTCPStream(MsgpackTransport):
cls, cls,
stream: trio.SocketStream stream: trio.SocketStream
) -> tuple[ ) -> tuple[
TCPAddress, tuple[str, int],
TCPAddress, tuple[str, int]
]: ]:
# TODO, what types are these?
lsockname = stream.socket.getsockname() lsockname = stream.socket.getsockname()
l_sockaddr: tuple[str, int] = tuple(lsockname[:2])
rsockname = stream.socket.getpeername() rsockname = stream.socket.getpeername()
r_sockaddr: tuple[str, int] = tuple(rsockname[:2])
return ( return (
TCPAddress.from_addr(l_sockaddr), TCPAddress.from_addr(tuple(lsockname[:2])),
TCPAddress.from_addr(r_sockaddr), TCPAddress.from_addr(tuple(rsockname[:2])),
) )

View File

@ -99,7 +99,7 @@ class MsgTransport(Protocol[MsgType]):
@classmethod @classmethod
def key(cls) -> MsgTransportKey: def key(cls) -> MsgTransportKey:
return cls.codec_key, cls.address_type.proto_key return cls.codec_key, cls.address_type.name_key
@property @property
def laddr(self) -> Address: def laddr(self) -> Address:
@ -136,16 +136,6 @@ class MsgTransport(Protocol[MsgType]):
''' '''
... ...
# TODO, such that all `.raddr`s for each `SocketStream` are
# delivered?
# -[ ] move `.open_listener()` here and internally track the
# listener set, per address?
# def get_peers(
# self,
# ) -> list[Address]:
# ...
class MsgpackTransport(MsgTransport): class MsgpackTransport(MsgTransport):
@ -167,10 +157,7 @@ class MsgpackTransport(MsgTransport):
) -> None: ) -> None:
self.stream = stream self.stream = stream
( self._laddr, self._raddr = self.get_stream_addrs(stream)
self._laddr,
self._raddr,
) = self.get_stream_addrs(stream)
# create read loop instance # create read loop instance
self._aiter_pkts = self._iter_packets() self._aiter_pkts = self._iter_packets()
@ -208,7 +195,6 @@ class MsgpackTransport(MsgTransport):
''' '''
decodes_failed: int = 0 decodes_failed: int = 0
tpt_name: str = f'{type(self).__name__!r}'
while True: while True:
try: try:
header: bytes = await self.recv_stream.receive_exactly(4) header: bytes = await self.recv_stream.receive_exactly(4)
@ -253,9 +239,10 @@ class MsgpackTransport(MsgTransport):
raise TransportClosed( raise TransportClosed(
message=( message=(
f'{tpt_name} already closed by peer\n' f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
), ),
src_exc=trans_err,
loglevel=loglevel, loglevel=loglevel,
) from trans_err ) from trans_err
@ -267,17 +254,18 @@ class MsgpackTransport(MsgTransport):
# #
# NOTE: as such we always re-raise this error from the # NOTE: as such we always re-raise this error from the
# RPC msg loop! # RPC msg loop!
except trio.ClosedResourceError as cre: except trio.ClosedResourceError as closure_err:
closure_err = cre
raise TransportClosed( raise TransportClosed(
message=( message=(
f'{tpt_name} was already closed locally ?\n' f'IPC transport already manually closed locally?\n'
f'x)> {type(closure_err)} \n'
f' |_{self}\n'
), ),
src_exc=closure_err,
loglevel='error', loglevel='error',
raise_on_report=( raise_on_report=(
'another task closed this fd' in closure_err.args closure_err.args[0] == 'another task closed this fd'
or
closure_err.args[0] in ['another task closed this fd']
), ),
) from closure_err ) from closure_err
@ -285,9 +273,12 @@ class MsgpackTransport(MsgTransport):
if header == b'': if header == b'':
raise TransportClosed( raise TransportClosed(
message=( message=(
f'{tpt_name} already gracefully closed\n' f'IPC transport already gracefully closed\n'
f')>\n'
f'|_{self}\n'
), ),
loglevel='transport', loglevel='transport',
# cause=??? # handy or no?
) )
size: int size: int
@ -418,39 +409,7 @@ class MsgpackTransport(MsgTransport):
# supposedly the fastest says, # supposedly the fastest says,
# https://stackoverflow.com/a/54027962 # https://stackoverflow.com/a/54027962
size: bytes = struct.pack("<I", len(bytes_data)) size: bytes = struct.pack("<I", len(bytes_data))
try:
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
) as trans_err:
loglevel = 'transport'
match trans_err:
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
):
raise TransportClosed(
message=(
f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n'
f' |_{self}\n'
),
loglevel=loglevel,
) from trans_err
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
'Transport layer failed for {self.transport!r} ?\n'
)
raise trans_err
# ?TODO? does it help ever to dynamically show this # ?TODO? does it help ever to dynamically show this
# frame? # frame?
@ -489,16 +448,3 @@ class MsgpackTransport(MsgTransport):
@property @property
def raddr(self) -> Address: def raddr(self) -> Address:
return self._raddr return self._raddr
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_task: {self._task}\n'
f'\n'
f' |_peers: 2\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
f')>\n'
)
__repr__ = __str__ = pformat

View File

@ -76,7 +76,7 @@ def transport_from_stream(
''' '''
transport = None transport = None
if isinstance(stream, trio.SocketStream): if isinstance(stream, trio.SocketStream):
sock: socket.socket = stream.socket sock = stream.socket
match sock.family: match sock.family:
case socket.AF_INET | socket.AF_INET6: case socket.AF_INET | socket.AF_INET6:
transport = 'tcp' transport = 'tcp'

View File

@ -18,111 +18,46 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
''' '''
from __future__ import annotations from __future__ import annotations
from pathlib import Path
import os
from socket import (
# socket,
AF_UNIX,
SOCK_STREAM,
SO_PASSCRED,
SO_PEERCRED,
SOL_SOCKET,
)
import struct
import trio import trio
from trio._highlevel_open_unix_stream import (
close_on_error,
has_unix,
)
from tractor.msg import MsgCodec from tractor.msg import MsgCodec
from tractor.log import get_logger from tractor.log import get_logger
from tractor._addr import ( from tractor._addr import UDSAddress
UDSAddress,
unwrap_sockpath,
)
from tractor.ipc._transport import MsgpackTransport from tractor.ipc._transport import MsgpackTransport
log = get_logger(__name__) log = get_logger(__name__)
async def open_unix_socket_w_passcred(
filename: str|bytes|os.PathLike[str]|os.PathLike[bytes],
) -> trio.SocketStream:
'''
Literally the exact same as `trio.open_unix_socket()` except we set the additiona
`socket.SO_PASSCRED` option to ensure the server side (the process calling `accept()`)
can extract the connecting peer's credentials, namely OS specific process
related IDs.
See this SO for "why" the extra opts,
- https://stackoverflow.com/a/7982749
'''
if not has_unix:
raise RuntimeError("Unix sockets are not supported on this platform")
# much more simplified logic vs tcp sockets - one socket type and only one
# possible location to connect to
sock = trio.socket.socket(AF_UNIX, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_PASSCRED, 1)
with close_on_error(sock):
await sock.connect(os.fspath(filename))
return trio.SocketStream(sock)
def get_peer_info(sock: trio.socket.socket) -> tuple[
int, # pid
int, # uid
int, # guid
]:
'''
Deliver the connecting peer's "credentials"-info as defined in
a very Linux specific way..
For more deats see,
- `man accept`,
- `man unix`,
this great online guide to all things sockets,
- https://beej.us/guide/bgnet/html/split-wide/man-pages.html#setsockoptman
AND this **wonderful SO answer**
- https://stackoverflow.com/a/7982749
'''
creds: bytes = sock.getsockopt(
SOL_SOCKET,
SO_PEERCRED,
struct.calcsize('3i')
)
# i.e a tuple of the fields,
# pid: int, "process"
# uid: int, "user"
# gid: int, "group"
return struct.unpack('3i', creds)
class MsgpackUDSStream(MsgpackTransport): class MsgpackUDSStream(MsgpackTransport):
''' '''
A `trio.SocketStream` around a Unix-Domain-Socket transport A ``trio.SocketStream`` delivering ``msgpack`` formatted data
delivering `msgpack` encoded msgs using the `msgspec` codec lib. using the ``msgspec`` codec lib.
''' '''
address_type = UDSAddress address_type = UDSAddress
layer_key: int = 4 layer_key: int = 7
# def __init__(
# self,
# stream: trio.SocketStream,
# prefix_size: int = 4,
# codec: CodecType = None,
# ) -> None:
# super().__init__(
# stream,
# prefix_size=prefix_size,
# codec=codec
# )
@property @property
def maddr(self) -> str: def maddr(self) -> str:
if not self.raddr: filepath = self.raddr.unwrap()
return '<unknown-peer>'
filepath: Path = Path(self.raddr.unwrap()[0])
return ( return (
f'/{self.address_type.proto_key}/{filepath}' f'/ipv4/localhost'
f'/{self.address_type.name_key}/{filepath}'
# f'/{self.chan.uid[0]}' # f'/{self.chan.uid[0]}'
# f'/{self.cid}' # f'/{self.cid}'
@ -141,76 +76,22 @@ class MsgpackUDSStream(MsgpackTransport):
codec: MsgCodec|None = None, codec: MsgCodec|None = None,
**kwargs **kwargs
) -> MsgpackUDSStream: ) -> MsgpackUDSStream:
stream = await trio.open_unix_socket(
addr.unwrap(),
sockpath: Path = addr.sockpath
#
# ^XXX NOTE, we don't provide any out-of-band `.pid` info
# (like, over the socket as extra msgs) since the (augmented)
# `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via
# `get_peer_info()` above.
try:
stream = await open_unix_socket_w_passcred(
str(sockpath),
**kwargs **kwargs
) )
except ( return MsgpackUDSStream(
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
stream = MsgpackUDSStream(
stream, stream,
prefix_size=prefix_size, prefix_size=prefix_size,
codec=codec codec=codec
) )
stream._raddr = addr
return stream
@classmethod @classmethod
def get_stream_addrs( def get_stream_addrs(
cls, cls,
stream: trio.SocketStream stream: trio.SocketStream
) -> tuple[ ) -> tuple[UDSAddress, UDSAddress]:
Path, return (
int, UDSAddress.from_addr(stream.socket.getsockname()),
]: UDSAddress.from_addr(stream.socket.getsockname()),
sock: trio.socket.socket = stream.socket
# NOTE XXX, it's unclear why one or the other ends up being
# `bytes` versus the socket-file-path, i presume it's
# something to do with who is the server (called `.listen()`)?
# maybe could be better implemented using another info-query
# on the socket like,
# https://beej.us/guide/bgnet/html/split-wide/system-calls-or-bust.html#gethostnamewho-am-i
sockname: str|bytes = sock.getsockname()
# https://beej.us/guide/bgnet/html/split-wide/system-calls-or-bust.html#getpeernamewho-are-you
peername: str|bytes = sock.getpeername()
match (peername, sockname):
case (str(), bytes()):
sock_path: Path = Path(peername)
case (bytes(), str()):
sock_path: Path = Path(sockname)
(
peer_pid,
_,
_,
) = get_peer_info(sock)
filedir, filename = unwrap_sockpath(sock_path)
laddr = UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=os.getpid(),
) )
raddr = UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=peer_pid
)
return (laddr, raddr)

View File

@ -31,7 +31,6 @@ from typing import (
Type, Type,
TypeVar, TypeVar,
TypeAlias, TypeAlias,
# TYPE_CHECKING,
Union, Union,
) )
@ -48,7 +47,7 @@ from tractor.msg import (
pretty_struct, pretty_struct,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor._addr import UnwrappedAddress from tractor._addr import AddressTypes
log = get_logger('tractor.msgspec') log = get_logger('tractor.msgspec')
@ -143,16 +142,9 @@ class Aid(
''' '''
name: str name: str
uuid: str uuid: str
pid: int|None = None # TODO: use built-in support for UUIDs?
# -[ ] `uuid.UUID` which has multi-protocol support
# TODO? can/should we extend this field set?
# -[ ] use built-in support for UUIDs? `uuid.UUID` which has
# multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid # https://jcristharif.com/msgspec/supported-types.html#uuid
#
# -[ ] as per the `.ipc._uds` / `._addr` comments, maybe we
# should also include at least `.pid` (equiv to port for tcp)
# and/or host-part always?
class SpawnSpec( class SpawnSpec(
@ -176,8 +168,8 @@ class SpawnSpec(
# TODO: not just sockaddr pairs? # TODO: not just sockaddr pairs?
# -[ ] abstract into a `TransportAddr` type? # -[ ] abstract into a `TransportAddr` type?
reg_addrs: list[UnwrappedAddress] reg_addrs: list[AddressTypes]
bind_addrs: list[UnwrappedAddress]|None bind_addrs: list[AddressTypes]
# TODO: caps based RPC support in the payload? # TODO: caps based RPC support in the payload?

28
uv.lock
View File

@ -11,15 +11,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/89/aa/ab0f7891a01eeb2d2e338ae8fecbe57fcebea1a24dbb64d45801bfab481d/attrs-24.3.0-py3-none-any.whl", hash = "sha256:ac96cd038792094f438ad1f6ff80837353805ac950cd2aa0e0625ef19850c308", size = 63397 }, { url = "https://files.pythonhosted.org/packages/89/aa/ab0f7891a01eeb2d2e338ae8fecbe57fcebea1a24dbb64d45801bfab481d/attrs-24.3.0-py3-none-any.whl", hash = "sha256:ac96cd038792094f438ad1f6ff80837353805ac950cd2aa0e0625ef19850c308", size = 63397 },
] ]
[[package]]
name = "bidict"
version = "0.23.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/9a/6e/026678aa5a830e07cd9498a05d3e7e650a4f56a42f267a53d22bcda1bdc9/bidict-0.23.1.tar.gz", hash = "sha256:03069d763bc387bbd20e7d49914e75fc4132a41937fa3405417e1a5a2d006d71", size = 29093 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/99/37/e8730c3587a65eb5645d4aba2d27aae48e8003614d6aaf15dda67f702f1f/bidict-0.23.1-py3-none-any.whl", hash = "sha256:5dae8d4d79b552a71cbabc7deb25dfe8ce710b17ff41711e13010ead2abfc3e5", size = 32764 },
]
[[package]] [[package]]
name = "cffi" name = "cffi"
version = "1.17.1" version = "1.17.1"
@ -257,21 +248,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e4/ea/d836f008d33151c7a1f62caf3d8dd782e4d15f6a43897f64480c2b8de2ad/prompt_toolkit-3.0.50-py3-none-any.whl", hash = "sha256:9b6427eb19e479d98acff65196a307c555eb567989e6d88ebbb1b509d9779198", size = 387816 }, { url = "https://files.pythonhosted.org/packages/e4/ea/d836f008d33151c7a1f62caf3d8dd782e4d15f6a43897f64480c2b8de2ad/prompt_toolkit-3.0.50-py3-none-any.whl", hash = "sha256:9b6427eb19e479d98acff65196a307c555eb567989e6d88ebbb1b509d9779198", size = 387816 },
] ]
[[package]]
name = "psutil"
version = "7.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/2a/80/336820c1ad9286a4ded7e845b2eccfcb27851ab8ac6abece774a6ff4d3de/psutil-7.0.0.tar.gz", hash = "sha256:7be9c3eba38beccb6495ea33afd982a44074b78f28c434a1f51cc07fd315c456", size = 497003 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ed/e6/2d26234410f8b8abdbf891c9da62bee396583f713fb9f3325a4760875d22/psutil-7.0.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:101d71dc322e3cffd7cea0650b09b3d08b8e7c4109dd6809fe452dfd00e58b25", size = 238051 },
{ url = "https://files.pythonhosted.org/packages/04/8b/30f930733afe425e3cbfc0e1468a30a18942350c1a8816acfade80c005c4/psutil-7.0.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:39db632f6bb862eeccf56660871433e111b6ea58f2caea825571951d4b6aa3da", size = 239535 },
{ url = "https://files.pythonhosted.org/packages/2a/ed/d362e84620dd22876b55389248e522338ed1bf134a5edd3b8231d7207f6d/psutil-7.0.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fcee592b4c6f146991ca55919ea3d1f8926497a713ed7faaf8225e174581e91", size = 275004 },
{ url = "https://files.pythonhosted.org/packages/bf/b9/b0eb3f3cbcb734d930fdf839431606844a825b23eaf9a6ab371edac8162c/psutil-7.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b1388a4f6875d7e2aff5c4ca1cc16c545ed41dd8bb596cefea80111db353a34", size = 277986 },
{ url = "https://files.pythonhosted.org/packages/eb/a2/709e0fe2f093556c17fbafda93ac032257242cabcc7ff3369e2cb76a97aa/psutil-7.0.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a5f098451abc2828f7dc6b58d44b532b22f2088f4999a937557b603ce72b1993", size = 279544 },
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053 },
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885 },
]
[[package]] [[package]]
name = "ptyprocess" name = "ptyprocess"
version = "0.7.0" version = "0.7.0"
@ -373,7 +349,6 @@ name = "tractor"
version = "0.1.0a6.dev0" version = "0.1.0a6.dev0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "bidict" },
{ name = "cffi" }, { name = "cffi" },
{ name = "colorlog" }, { name = "colorlog" },
{ name = "msgspec" }, { name = "msgspec" },
@ -388,7 +363,6 @@ dev = [
{ name = "greenback" }, { name = "greenback" },
{ name = "pexpect" }, { name = "pexpect" },
{ name = "prompt-toolkit" }, { name = "prompt-toolkit" },
{ name = "psutil" },
{ name = "pyperclip" }, { name = "pyperclip" },
{ name = "pytest" }, { name = "pytest" },
{ name = "stackscope" }, { name = "stackscope" },
@ -397,7 +371,6 @@ dev = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "bidict", specifier = ">=0.23.1" },
{ name = "cffi", specifier = ">=1.17.1" }, { name = "cffi", specifier = ">=1.17.1" },
{ name = "colorlog", specifier = ">=6.8.2,<7" }, { name = "colorlog", specifier = ">=6.8.2,<7" },
{ name = "msgspec", specifier = ">=0.19.0" }, { name = "msgspec", specifier = ">=0.19.0" },
@ -412,7 +385,6 @@ dev = [
{ name = "greenback", specifier = ">=1.2.1,<2" }, { name = "greenback", specifier = ">=1.2.1,<2" },
{ name = "pexpect", specifier = ">=4.9.0,<5" }, { name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "prompt-toolkit", specifier = ">=3.0.50" }, { name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pytest", specifier = ">=8.3.5" }, { name = "pytest", specifier = ">=8.3.5" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" }, { name = "stackscope", specifier = ">=0.2.2,<0.3" },