Finally switch to using address protocol in all runtime

Guillermo Rodriguez 2025-03-23 00:14:04 -03:00
parent 7400f89753
commit 34a2f0c1f3
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
23 changed files with 590 additions and 304 deletions

View File

@ -9,7 +9,7 @@ async def main(service_name):
async with tractor.open_nursery() as an:
await an.start_actor(service_name)
async with tractor.get_registry('127.0.0.1', 1616) as portal:
async with tractor.get_registry(('127.0.0.1', 1616)) as portal:
print(f"Arbiter is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr:

View File

@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr):
portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid
async with tractor.get_registry(*reg_addr) as aportal:
async with tractor.get_registry(reg_addr) as aportal:
# this local actor should be the arbiter
assert actor is aportal.actor
@ -160,7 +160,7 @@ async def spawn_and_check_registry(
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
async with tractor.get_registry(*reg_addr) as portal:
async with tractor.get_registry(reg_addr) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
@ -300,7 +300,7 @@ async def close_chans_before_nursery(
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
async with tractor.get_registry(*reg_addr) as aportal:
async with tractor.get_registry(reg_addr) as aportal:
try:
get_reg = partial(unpack_reg, aportal)

View File

@ -871,7 +871,7 @@ async def serve_subactors(
)
await ipc.send((
peer.chan.uid,
peer.chan.raddr,
peer.chan.raddr.unwrap(),
))
print('Spawner exiting spawn serve loop!')

View File

@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
async with tractor.get_registry(*reg_addr) as portal:
async with tractor.get_registry(reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2):

View File

@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon):
@tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter
async with tractor.get_registry(*reg_addr) as portal:
async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
# no arbiter socket should exist
with pytest.raises(OSError):
async with tractor.get_registry(*reg_addr) as portal:
async with tractor.get_registry(reg_addr) as portal:
pass

View File

@ -77,7 +77,7 @@ async def movie_theatre_question():
async def test_movie_theatre_convo(start_method):
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
async with tractor.open_nursery(debug_mode=True) as n:
portal = await n.start_actor(
'frank',

301
tractor/_addr.py 100644
View File

@ -0,0 +1,301 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
import tempfile
from uuid import uuid4
from typing import (
Protocol,
ClassVar,
TypeVar,
Union,
Type
)
import trio
from trio import socket
NamespaceType = TypeVar('NamespaceType')
AddressType = TypeVar('AddressType')
StreamType = TypeVar('StreamType')
ListenerType = TypeVar('ListenerType')
class Address(Protocol[
NamespaceType,
AddressType,
StreamType,
ListenerType
]):
name_key: ClassVar[str]
address_type: ClassVar[Type[AddressType]]
@property
def is_valid(self) -> bool:
...
@property
def namespace(self) -> NamespaceType|None:
...
@classmethod
def from_addr(cls, addr: AddressType) -> Address:
...
def unwrap(self) -> AddressType:
...
@classmethod
def get_random(cls, namespace: NamespaceType | None = None) -> Address:
...
@classmethod
def get_root(cls) -> Address:
...
def __repr__(self) -> str:
...
def __eq__(self, other) -> bool:
...
async def open_stream(self, **kwargs) -> StreamType:
...
async def open_listener(self, **kwargs) -> ListenerType:
...
class TCPAddress(Address[
str,
tuple[str, int],
trio.SocketStream,
trio.SocketListener
]):
name_key: str = 'tcp'
address_type: type = tuple[str, int]
def __init__(
self,
host: str,
port: int
):
if (
not isinstance(host, str)
or
not isinstance(port, int)
):
raise TypeError(f'Expected host {host} to be str and port {port} to be int')
self._host = host
self._port = port
@property
def is_valid(self) -> bool:
return self._port != 0
@property
def namespace(self) -> str:
return self._host
@classmethod
def from_addr(cls, addr: tuple[str, int]) -> TCPAddress:
return TCPAddress(addr[0], addr[1])
def unwrap(self) -> tuple[str, int]:
return self._host, self._port
@classmethod
def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress:
return TCPAddress(namespace, 0)
@classmethod
def get_root(cls) -> Address:
return TCPAddress('127.0.0.1', 1616)
def __repr__(self) -> str:
return f'{type(self)} @ {self.unwrap()}'
def __eq__(self, other) -> bool:
if not isinstance(other, TCPAddress):
raise TypeError(
f'Can not compare {type(other)} with {type(self)}'
)
return (
self._host == other._host
and
self._port == other._port
)
async def open_stream(self, **kwargs) -> trio.SocketStream:
stream = await trio.open_tcp_stream(
self._host,
self._port,
**kwargs
)
self._host, self._port = stream.socket.getsockname()[:2]
return stream
async def open_listener(self, **kwargs) -> trio.SocketListener:
listeners = await trio.open_tcp_listeners(
host=self._host,
port=self._port,
**kwargs
)
assert len(listeners) == 1
listener = listeners[0]
self._host, self._port = listener.socket.getsockname()[:2]
return listener
class UDSAddress(Address[
None,
str,
trio.SocketStream,
trio.SocketListener
]):
name_key: str = 'uds'
address_type: type = str
def __init__(
self,
filepath: str
):
self._filepath = filepath
@property
def is_valid(self) -> bool:
return True
@property
def namespace(self) -> None:
return
@classmethod
def from_addr(cls, filepath: str) -> UDSAddress:
return UDSAddress(filepath)
def unwrap(self) -> str:
return self._filepath
@classmethod
def get_random(cls, _ns: None = None) -> UDSAddress:
return UDSAddress(f'{tempfile.gettempdir()}/{uuid4().sock}')
@classmethod
def get_root(cls) -> Address:
return UDSAddress('tractor.sock')
def __repr__(self) -> str:
return f'{type(self)} @ {self._filepath}'
def __eq__(self, other) -> bool:
if not isinstance(other, UDSAddress):
raise TypeError(
f'Can not compare {type(other)} with {type(self)}'
)
return self._filepath == other._filepath
async def open_stream(self, **kwargs) -> trio.SocketStream:
stream = await trio.open_tcp_stream(
self._filepath,
**kwargs
)
self._binded = True
return stream
async def open_listener(self, **kwargs) -> trio.SocketListener:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(self._filepath)
sock.listen()
self._binded = True
return trio.SocketListener(sock)
preferred_transport = 'tcp'
_address_types = (
TCPAddress,
UDSAddress
)
_default_addrs: dict[str, Type[Address]] = {
cls.name_key: cls
for cls in _address_types
}
AddressTypes = Union[
tuple([
cls.address_type
for cls in _address_types
])
]
_default_lo_addrs: dict[
str,
AddressTypes
] = {
cls.name_key: cls.get_root().unwrap()
for cls in _address_types
}
def get_address_cls(name: str) -> Type[Address]:
return _default_addrs[name]
def is_wrapped_addr(addr: any) -> bool:
return type(addr) in _address_types
def wrap_address(addr: AddressTypes) -> Address:
if is_wrapped_addr(addr):
return addr
cls = None
match addr:
case str():
cls = UDSAddress
case tuple() | list():
cls = TCPAddress
case None:
cls = get_address_cls(preferred_transport)
addr = cls.get_root().unwrap()
case _:
raise TypeError(
f'Can not wrap addr {addr} of type {type(addr)}'
)
return cls.from_addr(addr)
def default_lo_addrs(transports: list[str]) -> list[AddressTypes]:
return [
_default_lo_addrs[transport]
for transport in transports
]

View File

@ -31,8 +31,7 @@ def parse_uid(arg):
return str(name), str(uuid) # ensures str encoding
def parse_ipaddr(arg):
host, port = literal_eval(arg)
return (str(host), int(port))
return literal_eval(arg)
if __name__ == "__main__":

View File

@ -859,19 +859,10 @@ class Context:
@property
def dst_maddr(self) -> str:
chan: Channel = self.chan
dst_addr, dst_port = chan.raddr
trans: MsgTransport = chan.transport
# cid: str = self.cid
# cid_head, cid_tail = cid[:6], cid[-6:]
return (
f'/ipv4/{dst_addr}'
f'/{trans.name_key}/{dst_port}'
# f'/{self.chan.uid[0]}'
# f'/{self.cid}'
# f'/cid={cid_head}..{cid_tail}'
# TODO: ? not use this ^ right ?
)
return trans.maddr
dmaddr = dst_maddr

View File

@ -30,6 +30,12 @@ from contextlib import asynccontextmanager as acm
from tractor.log import get_logger
from .trionics import gather_contexts
from .ipc import _connect_chan, Channel
from ._addr import (
AddressTypes,
Address,
preferred_transport,
wrap_address
)
from ._portal import (
Portal,
open_portal,
@ -48,11 +54,7 @@ log = get_logger(__name__)
@acm
async def get_registry(
host: str,
port: int,
) -> AsyncGenerator[
async def get_registry(addr: AddressTypes) -> AsyncGenerator[
Portal | LocalPortal | None,
None,
]:
@ -69,13 +71,13 @@ async def get_registry(
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(
actor,
Channel((host, port))
await Channel.from_addr(addr)
)
else:
# TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead?
async with (
_connect_chan((host, port)) as chan,
_connect_chan(addr) as chan,
open_portal(chan) as regstr_ptl,
):
yield regstr_ptl
@ -89,11 +91,10 @@ async def get_root(
# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
host, port = _runtime_vars['_root_mailbox']
assert host is not None
addr = _runtime_vars['_root_mailbox']
async with (
_connect_chan((host, port)) as chan,
_connect_chan(addr) as chan,
open_portal(chan, **kwargs) as portal,
):
yield portal
@ -134,10 +135,10 @@ def get_peer_by_name(
@acm
async def query_actor(
name: str,
regaddr: tuple[str, int]|None = None,
regaddr: AddressTypes|None = None,
) -> AsyncGenerator[
tuple[str, int]|None,
AddressTypes|None,
None,
]:
'''
@ -163,31 +164,31 @@ async def query_actor(
return
reg_portal: Portal
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
async with get_registry(*regaddr) as reg_portal:
regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0]
async with get_registry(regaddr) as reg_portal:
# TODO: return portals to all available actors - for now
# just the last one that registered
sockaddr: tuple[str, int] = await reg_portal.run_from_ns(
addr: AddressTypes = await reg_portal.run_from_ns(
'self',
'find_actor',
name=name,
)
yield sockaddr
yield addr
@acm
async def maybe_open_portal(
addr: tuple[str, int],
addr: AddressTypes,
name: str,
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
) as addr:
pass
if sockaddr:
async with _connect_chan(sockaddr) as chan:
if addr:
async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
@ -197,7 +198,8 @@ async def maybe_open_portal(
@acm
async def find_actor(
name: str,
registry_addrs: list[tuple[str, int]]|None = None,
registry_addrs: list[AddressTypes]|None = None,
enable_transports: list[str] = [preferred_transport],
only_first: bool = True,
raise_on_none: bool = False,
@ -224,15 +226,15 @@ async def find_actor(
# XXX NOTE: make sure to dynamically read the value on
# every call since something may change it globally (eg.
# like in our discovery test suite)!
from . import _root
from ._addr import default_lo_addrs
registry_addrs = (
_runtime_vars['_registry_addrs']
or
_root._default_lo_addrs
default_lo_addrs(enable_transports)
)
maybe_portals: list[
AsyncContextManager[tuple[str, int]]
AsyncContextManager[AddressTypes]
] = list(
maybe_open_portal(
addr=addr,
@ -274,7 +276,7 @@ async def find_actor(
@acm
async def wait_for_actor(
name: str,
registry_addr: tuple[str, int] | None = None,
registry_addr: AddressTypes | None = None,
) -> AsyncGenerator[Portal, None]:
'''
@ -291,7 +293,7 @@ async def wait_for_actor(
yield peer_portal
return
regaddr: tuple[str, int] = (
regaddr: AddressTypes = (
registry_addr
or
actor.reg_addrs[0]
@ -299,8 +301,8 @@ async def wait_for_actor(
# TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well?
reg_portal: Portal
async with get_registry(*regaddr) as reg_portal:
sockaddrs = await reg_portal.run_from_ns(
async with get_registry(regaddr) as reg_portal:
addrs = await reg_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
@ -308,8 +310,8 @@ async def wait_for_actor(
# get latest registered addr by default?
# TODO: offer multi-portal yields in multi-homed case?
sockaddr: tuple[str, int] = sockaddrs[-1]
addr: AddressTypes = addrs[-1]
async with _connect_chan(sockaddr) as chan:
async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal

View File

@ -37,6 +37,7 @@ from .log import (
from . import _state
from .devx import _debug
from .to_asyncio import run_as_asyncio_guest
from ._addr import AddressTypes
from ._runtime import (
async_main,
Actor,
@ -52,10 +53,10 @@ log = get_logger(__name__)
def _mp_main(
actor: Actor,
accept_addrs: list[tuple[str, int]],
accept_addrs: list[AddressTypes],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
parent_addr: tuple[str, int] | None = None,
parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False,
) -> None:
@ -206,7 +207,7 @@ def nest_from_op(
def _trio_main(
actor: Actor,
*,
parent_addr: tuple[str, int] | None = None,
parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False,
) -> None:

View File

@ -43,21 +43,18 @@ from .devx import _debug
from . import _spawn
from . import _state
from . import log
from .ipc import _connect_chan
from .ipc import (
_connect_chan,
)
from ._addr import (
AddressTypes,
wrap_address,
preferred_transport,
default_lo_addrs
)
from ._exceptions import is_multi_cancelled
# set at startup and after forks
_default_host: str = '127.0.0.1'
_default_port: int = 1616
# default registry always on localhost
_default_lo_addrs: list[tuple[str, int]] = [(
_default_host,
_default_port,
)]
logger = log.get_logger('tractor')
@ -66,10 +63,12 @@ async def open_root_actor(
*,
# defaults are above
registry_addrs: list[tuple[str, int]]|None = None,
registry_addrs: list[AddressTypes]|None = None,
# defaults are above
arbiter_addr: tuple[str, int]|None = None,
arbiter_addr: tuple[AddressTypes]|None = None,
enable_transports: list[str] = [preferred_transport],
name: str|None = 'root',
@ -195,11 +194,9 @@ async def open_root_actor(
)
registry_addrs = [arbiter_addr]
registry_addrs: list[tuple[str, int]] = (
registry_addrs
or
_default_lo_addrs
)
if not registry_addrs:
registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports)
assert registry_addrs
loglevel = (
@ -248,10 +245,10 @@ async def open_root_actor(
enable_stack_on_sig()
# closed into below ping task-func
ponged_addrs: list[tuple[str, int]] = []
ponged_addrs: list[AddressTypes] = []
async def ping_tpt_socket(
addr: tuple[str, int],
addr: AddressTypes,
timeout: float = 1,
) -> None:
'''
@ -284,10 +281,10 @@ async def open_root_actor(
for addr in registry_addrs:
tn.start_soon(
ping_tpt_socket,
tuple(addr), # TODO: just drop this requirement?
addr,
)
trans_bind_addrs: list[tuple[str, int]] = []
trans_bind_addrs: list[AddressTypes] = []
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
@ -311,9 +308,12 @@ async def open_root_actor(
)
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
for host, port in ponged_addrs:
# NOTE: zero triggers dynamic OS port allocation
trans_bind_addrs.append((host, 0))
for addr in ponged_addrs:
waddr = wrap_address(addr)
print(waddr)
trans_bind_addrs.append(
waddr.get_random(namespace=waddr.namespace)
)
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
@ -322,7 +322,7 @@ async def open_root_actor(
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
# - the tranport layer server is bound to each (host, port)
# - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs
@ -462,7 +462,7 @@ def run_daemon(
# runtime kwargs
name: str | None = 'root',
registry_addrs: list[tuple[str, int]] = _default_lo_addrs,
registry_addrs: list[AddressTypes]|None = None,
start_method: str | None = None,
debug_mode: bool = False,

View File

@ -74,6 +74,12 @@ from tractor.msg import (
types as msgtypes,
)
from .ipc import Channel
from ._addr import (
AddressTypes,
Address,
TCPAddress,
wrap_address,
)
from ._context import (
mk_context,
Context,
@ -179,11 +185,11 @@ class Actor:
enable_modules: list[str] = [],
uid: str|None = None,
loglevel: str|None = None,
registry_addrs: list[tuple[str, int]]|None = None,
registry_addrs: list[AddressTypes]|None = None,
spawn_method: str|None = None,
# TODO: remove!
arbiter_addr: tuple[str, int]|None = None,
arbiter_addr: AddressTypes|None = None,
) -> None:
'''
@ -223,7 +229,7 @@ class Actor:
DeprecationWarning,
stacklevel=2,
)
registry_addrs: list[tuple[str, int]] = [arbiter_addr]
registry_addrs: list[AddressTypes] = [arbiter_addr]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -257,6 +263,7 @@ class Actor:
] = {}
self._listeners: list[trio.abc.Listener] = []
self._listen_addrs: list[Address] = []
self._parent_chan: Channel|None = None
self._forkserver_info: tuple|None = None
@ -269,13 +276,13 @@ class Actor:
# when provided, init the registry addresses property from
# input via the validator.
self._reg_addrs: list[tuple[str, int]] = []
self._reg_addrs: list[AddressTypes] = []
if registry_addrs:
self.reg_addrs: list[tuple[str, int]] = registry_addrs
self.reg_addrs: list[AddressTypes] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs
@property
def reg_addrs(self) -> list[tuple[str, int]]:
def reg_addrs(self) -> list[AddressTypes]:
'''
List of (socket) addresses for all known (and contactable)
registry actors.
@ -286,7 +293,7 @@ class Actor:
@reg_addrs.setter
def reg_addrs(
self,
addrs: list[tuple[str, int]],
addrs: list[AddressTypes],
) -> None:
if not addrs:
log.warning(
@ -295,15 +302,6 @@ class Actor:
)
return
# always sanity check the input list since it's critical
# that addrs are correct for discovery sys operation.
for addr in addrs:
if not isinstance(addr, tuple):
raise ValueError(
'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n'
f'Got {addrs}'
)
self._reg_addrs = addrs
async def wait_for_peer(
@ -1024,11 +1022,11 @@ class Actor:
async def _from_parent(
self,
parent_addr: tuple[str, int]|None,
parent_addr: AddressTypes|None,
) -> tuple[
Channel,
list[tuple[str, int]]|None,
list[AddressTypes]|None,
]:
'''
Bootstrap this local actor's runtime config from its parent by
@ -1040,13 +1038,13 @@ class Actor:
# Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we
# attempt to ship the exception back to the parent.
chan = await Channel.from_destaddr(parent_addr)
chan = await Channel.from_addr(wrap_address(parent_addr))
# TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await self._do_handshake(chan)
accept_addrs: list[tuple[str, int]]|None = None
accept_addrs: list[AddressTypes]|None = None
if self._spawn_method == "trio":
@ -1063,7 +1061,7 @@ class Actor:
# if "trace"/"util" mode is enabled?
f'{pretty_struct.pformat(spawnspec)}\n'
)
accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
accept_addrs: list[AddressTypes] = spawnspec.bind_addrs
# TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars
@ -1170,8 +1168,7 @@ class Actor:
self,
handler_nursery: Nursery,
*,
# (host, port) to bind for channel server
listen_sockaddrs: list[tuple[str, int]]|None = None,
listen_addrs: list[AddressTypes]|None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -1183,36 +1180,38 @@ class Actor:
`.cancel_server()` is called.
'''
if listen_sockaddrs is None:
listen_sockaddrs = [(None, 0)]
if listen_addrs is None:
listen_addrs = [TCPAddress.get_random()]
else:
listen_addrs: list[Address] = [
wrap_address(a) for a in listen_addrs
]
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
for host, port in listen_sockaddrs:
listeners: list[trio.abc.Listener] = await server_n.start(
listeners: list[trio.abc.Listener] = [
await addr.open_listener()
for addr in listen_addrs
]
await server_n.start(
partial(
trio.serve_tcp,
trio.serve_listeners,
handler=self._stream_handler,
port=port,
host=host,
listeners=listeners,
# NOTE: configured such that new
# connections will stay alive even if
# this server is cancelled!
handler_nursery=handler_nursery,
handler_nursery=handler_nursery
)
)
sockets: list[trio.socket] = [
getattr(listener, 'socket', 'unknown socket')
for listener in listeners
]
log.runtime(
'Started TCP server(s)\n'
f'|_{sockets}\n'
'Started server(s)\n'
'\n'.join([f'|_{addr}' for addr in listen_addrs])
)
self._listen_addrs.extend(listen_addrs)
self._listeners.extend(listeners)
task_status.started(server_n)
@ -1576,26 +1575,21 @@ class Actor:
return False
@property
def accept_addrs(self) -> list[tuple[str, int]]:
def accept_addrs(self) -> list[AddressTypes]:
'''
All addresses to which the transport-channel server binds
and listens for new connections.
'''
# throws OSError on failure
return [
listener.socket.getsockname()
for listener in self._listeners
] # type: ignore
return [a.unwrap() for a in self._listen_addrs]
@property
def accept_addr(self) -> tuple[str, int]:
def accept_addr(self) -> AddressTypes:
'''
Primary address to which the IPC transport server is
bound and listening for new connections.
'''
# throws OSError on failure
return self.accept_addrs[0]
def get_parent(self) -> Portal:
@ -1667,7 +1661,7 @@ class Actor:
async def async_main(
actor: Actor,
accept_addrs: tuple[str, int]|None = None,
accept_addrs: AddressTypes|None = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
@ -1676,7 +1670,7 @@ async def async_main(
# change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as
# a subactor.
parent_addr: tuple[str, int]|None = None,
parent_addr: AddressTypes|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -1766,7 +1760,7 @@ async def async_main(
partial(
actor._serve_forever,
service_nursery,
listen_sockaddrs=accept_addrs,
listen_addrs=accept_addrs,
)
)
except OSError as oserr:
@ -1782,7 +1776,7 @@ async def async_main(
raise
accept_addrs: list[tuple[str, int]] = actor.accept_addrs
accept_addrs: list[AddressTypes] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
@ -1790,9 +1784,8 @@ async def async_main(
# their root actor over that channel.
if _state._runtime_vars['_is_root']:
for addr in accept_addrs:
host, _ = addr
# TODO: generic 'lo' detector predicate
if '127.0.0.1' in host:
waddr = wrap_address(addr)
if waddr == waddr.get_root():
_state._runtime_vars['_root_mailbox'] = addr
# Register with the arbiter if we're told its addr
@ -1807,24 +1800,21 @@ async def async_main(
# only on unique actor uids?
for addr in actor.reg_addrs:
try:
assert isinstance(addr, tuple)
assert addr[1] # non-zero after bind
waddr = wrap_address(addr)
assert waddr.is_valid
except AssertionError:
await _debug.pause()
async with get_registry(*addr) as reg_portal:
async with get_registry(addr) as reg_portal:
for accept_addr in accept_addrs:
if not accept_addr[1]:
await _debug.pause()
assert accept_addr[1]
accept_addr = wrap_address(accept_addr)
assert accept_addr.is_valid
await reg_portal.run_from_ns(
'self',
'register_actor',
uid=actor.uid,
sockaddr=accept_addr,
addr=accept_addr.unwrap(),
)
is_registered: bool = True
@ -1951,12 +1941,13 @@ async def async_main(
):
failed: bool = False
for addr in actor.reg_addrs:
assert isinstance(addr, tuple)
waddr = wrap_address(addr)
assert waddr.is_valid
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_registry(
*addr,
addr,
) as reg_portal:
await reg_portal.run_from_ns(
'self',
@ -2034,7 +2025,7 @@ class Arbiter(Actor):
self._registry: dict[
tuple[str, str],
tuple[str, int],
AddressTypes,
] = {}
self._waiters: dict[
str,
@ -2050,18 +2041,18 @@ class Arbiter(Actor):
self,
name: str,
) -> tuple[str, int]|None:
) -> AddressTypes|None:
for uid, sockaddr in self._registry.items():
for uid, addr in self._registry.items():
if name in uid:
return sockaddr
return addr
return None
async def get_registry(
self
) -> dict[str, tuple[str, int]]:
) -> dict[str, AddressTypes]:
'''
Return current name registry.
@ -2081,7 +2072,7 @@ class Arbiter(Actor):
self,
name: str,
) -> list[tuple[str, int]]:
) -> list[AddressTypes]:
'''
Wait for a particular actor to register.
@ -2089,44 +2080,41 @@ class Arbiter(Actor):
registered.
'''
sockaddrs: list[tuple[str, int]] = []
sockaddr: tuple[str, int]
addrs: list[AddressTypes] = []
addr: AddressTypes
mailbox_info: str = 'Actor registry contact infos:\n'
for uid, sockaddr in self._registry.items():
for uid, addr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_sockaddr: {sockaddr}\n\n'
f'|_addr: {addr}\n\n'
)
if name == uid[0]:
sockaddrs.append(sockaddr)
addrs.append(addr)
if not sockaddrs:
if not addrs:
waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
sockaddrs.append(self._registry[uid])
addrs.append(self._registry[uid])
log.runtime(mailbox_info)
return sockaddrs
return addrs
async def register_actor(
self,
uid: tuple[str, str],
sockaddr: tuple[str, int]
addr: AddressTypes
) -> None:
uid = name, hash = (str(uid[0]), str(uid[1]))
addr = (host, port) = (
str(sockaddr[0]),
int(sockaddr[1]),
)
if port == 0:
waddr: Address = wrap_address(addr)
if not waddr.is_valid:
# should never be 0-dynamic-os-alloc
await _debug.pause()
assert port # should never be 0-dynamic-os-alloc
self._registry[uid] = addr
# pop and signal all waiter events

View File

@ -46,6 +46,7 @@ from tractor._state import (
_runtime_vars,
)
from tractor.log import get_logger
from tractor._addr import AddressTypes
from tractor._portal import Portal
from tractor._runtime import Actor
from tractor._entry import _mp_main
@ -392,8 +393,8 @@ async def new_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
bind_addrs: list[AddressTypes],
parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@ -431,8 +432,8 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
bind_addrs: list[AddressTypes],
parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
@ -520,15 +521,15 @@ async def trio_proc(
# send a "spawning specification" which configures the
# initial runtime state of the child.
await chan.send(
SpawnSpec(
sspec = SpawnSpec(
_parent_main_data=subactor._parent_main_data,
enable_modules=subactor.enable_modules,
reg_addrs=subactor.reg_addrs,
bind_addrs=bind_addrs,
_runtime_vars=_runtime_vars,
)
)
log.runtime(f'Sending spawn spec: {str(sspec)}')
await chan.send(sspec)
# track subactor in current nursery
curr_actor: Actor = current_actor()
@ -638,8 +639,8 @@ async def mp_proc(
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addrs: list[tuple[str, int]],
parent_addr: tuple[str, int],
bind_addrs: list[AddressTypes],
parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,

View File

@ -28,7 +28,13 @@ import warnings
import trio
from .devx._debug import maybe_wait_for_debugger
from ._addr import (
AddressTypes,
preferred_transport,
get_address_cls
)
from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
from ._runtime import Actor
@ -47,8 +53,6 @@ if TYPE_CHECKING:
log = get_logger(__name__)
_default_bind_addr: tuple[str, int] = ('127.0.0.1', 0)
class ActorNursery:
'''
@ -130,8 +134,9 @@ class ActorNursery:
*,
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
bind_addrs: list[AddressTypes]|None = None,
rpc_module_paths: list[str]|None = None,
enable_transports: list[str] = [preferred_transport],
enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor
debug_mode: bool|None = None,
@ -156,6 +161,12 @@ class ActorNursery:
or get_loglevel()
)
if not bind_addrs:
bind_addrs: list[AddressTypes] = [
get_address_cls(transport).get_random().unwrap()
for transport in enable_transports
]
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
@ -224,7 +235,7 @@ class ActorNursery:
*,
name: str | None = None,
bind_addrs: tuple[str, int] = [_default_bind_addr],
bind_addrs: AddressTypes|None = None,
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor

View File

@ -17,7 +17,6 @@ import platform
from ._transport import (
MsgTransportKey as MsgTransportKey,
AddressType as AddressType,
MsgType as MsgType,
MsgTransport as MsgTransport,
MsgpackTransport as MsgpackTransport
@ -27,10 +26,8 @@ from ._tcp import MsgpackTCPStream as MsgpackTCPStream
from ._uds import MsgpackUDSStream as MsgpackUDSStream
from ._types import (
default_lo_addrs as default_lo_addrs,
transport_from_destaddr as transport_from_destaddr,
transport_from_addr as transport_from_addr,
transport_from_stream as transport_from_stream,
AddressTypes as AddressTypes
)
from ._chan import (

View File

@ -35,8 +35,12 @@ import trio
from tractor.ipc._transport import MsgTransport
from tractor.ipc._types import (
transport_from_destaddr,
transport_from_addr,
transport_from_stream,
)
from tractor._addr import (
wrap_address,
Address,
AddressTypes
)
from tractor.log import get_logger
@ -66,7 +70,6 @@ class Channel:
def __init__(
self,
destaddr: AddressTypes|None = None,
transport: MsgTransport|None = None,
# TODO: optional reconnection support?
# auto_reconnect: bool = False,
@ -81,8 +84,6 @@ class Channel:
# user in ``.from_stream()``.
self._transport: MsgTransport|None = transport
self._destaddr = destaddr if destaddr else self._transport.raddr
# set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None
@ -121,13 +122,14 @@ class Channel:
)
@classmethod
async def from_destaddr(
async def from_addr(
cls,
destaddr: AddressTypes,
addr: AddressTypes,
**kwargs
) -> Channel:
transport_cls = transport_from_destaddr(destaddr)
transport = await transport_cls.connect_to(destaddr, **kwargs)
addr: Address = wrap_address(addr)
transport_cls = transport_from_addr(addr)
transport = await transport_cls.connect_to(addr, **kwargs)
log.transport(
f'Opened channel[{type(transport)}]: {transport.laddr} -> {transport.raddr}'
@ -164,11 +166,11 @@ class Channel:
)
@property
def laddr(self) -> tuple[str, int]|None:
def laddr(self) -> Address|None:
return self._transport.laddr if self._transport else None
@property
def raddr(self) -> tuple[str, int]|None:
def raddr(self) -> Address|None:
return self._transport.raddr if self._transport else None
# TODO: something like,
@ -205,8 +207,12 @@ class Channel:
# assert err
__tracebackhide__: bool = False
else:
try:
assert err.cid
except KeyError:
raise err
raise
async def recv(self) -> Any:
@ -332,14 +338,14 @@ class Channel:
@acm
async def _connect_chan(
destaddr: AddressTypes
addr: AddressTypes
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager
teardown.
'''
chan = await Channel.from_destaddr(destaddr)
chan = await Channel.from_addr(addr)
yield chan
with trio.CancelScope(shield=True):
await chan.aclose()

View File

@ -183,6 +183,9 @@ class RingBuffSender(trio.abc.SendStream):
def wrap_fd(self) -> int:
return self._wrap_event.fd
async def _wait_wrap(self):
await self._wrap_event.read()
async def send_all(self, data: Buffer):
async with self._send_lock:
# while data is larger than the remaining buf
@ -193,7 +196,7 @@ class RingBuffSender(trio.abc.SendStream):
self._shm.buf[self.ptr:] = data[:remaining]
# signal write and wait for reader wrap around
self._write_event.write(remaining)
await self._wrap_event.read()
await self._wait_wrap()
# wrap around and trim already written bytes
self._ptr = 0

View File

@ -23,6 +23,7 @@ import trio
from tractor.msg import MsgCodec
from tractor.log import get_logger
from tractor._addr import TCPAddress
from tractor.ipc._transport import MsgpackTransport
@ -38,9 +39,8 @@ class MsgpackTCPStream(MsgpackTransport):
using the ``msgspec`` codec lib.
'''
address_type = tuple[str, int]
address_type = TCPAddress
layer_key: int = 4
name_key: str = 'tcp'
# def __init__(
# self,
@ -55,19 +55,32 @@ class MsgpackTCPStream(MsgpackTransport):
# codec=codec
# )
@property
def maddr(self) -> str:
host, port = self.raddr.unwrap()
return (
f'/ipv4/{host}'
f'/{self.address_type.name_key}/{port}'
# f'/{self.chan.uid[0]}'
# f'/{self.cid}'
# f'/cid={cid_head}..{cid_tail}'
# TODO: ? not use this ^ right ?
)
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
@classmethod
async def connect_to(
cls,
destaddr: tuple[str, int],
destaddr: TCPAddress,
prefix_size: int = 4,
codec: MsgCodec|None = None,
**kwargs
) -> MsgpackTCPStream:
stream = await trio.open_tcp_stream(
*destaddr,
*destaddr.unwrap(),
**kwargs
)
return MsgpackTCPStream(
@ -87,14 +100,6 @@ class MsgpackTCPStream(MsgpackTransport):
lsockname = stream.socket.getsockname()
rsockname = stream.socket.getpeername()
return (
tuple(lsockname[:2]),
tuple(rsockname[:2]),
TCPAddress.from_addr(tuple(lsockname[:2])),
TCPAddress.from_addr(tuple(rsockname[:2])),
)
@classmethod
def get_random_addr(self) -> tuple[str, int]:
return (None, 0)
@classmethod
def get_root_addr(self) -> tuple[str, int]:
return ('127.0.0.1', 1616)

View File

@ -50,6 +50,7 @@ from tractor.msg import (
types as msgtypes,
pretty_struct,
)
from tractor._addr import Address
log = get_logger(__name__)
@ -62,12 +63,11 @@ MsgTransportKey = tuple[str, str]
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
# => BLEH, except can't bc prots must inherit typevar or param-spec
# vars..
AddressType = TypeVar('AddressType')
MsgType = TypeVar('MsgType')
@runtime_checkable
class MsgTransport(Protocol[AddressType, MsgType]):
class MsgTransport(Protocol[MsgType]):
#
# ^-TODO-^ consider using a generic def and indexing with our
# eventual msg definition/types?
@ -75,10 +75,9 @@ class MsgTransport(Protocol[AddressType, MsgType]):
stream: trio.abc.Stream
drained: list[MsgType]
address_type: ClassVar[Type[AddressType]]
address_type: ClassVar[Type[Address]]
codec_key: ClassVar[str]
name_key: ClassVar[str]
# XXX: should this instead be called `.sendall()`?
async def send(self, msg: MsgType) -> None:
@ -100,20 +99,24 @@ class MsgTransport(Protocol[AddressType, MsgType]):
@classmethod
def key(cls) -> MsgTransportKey:
return cls.codec_key, cls.name_key
return cls.codec_key, cls.address_type.name_key
@property
def laddr(self) -> AddressType:
def laddr(self) -> Address:
...
@property
def raddr(self) -> AddressType:
def raddr(self) -> Address:
...
@property
def maddr(self) -> str:
...
@classmethod
async def connect_to(
cls,
destaddr: AddressType,
addr: Address,
**kwargs
) -> MsgTransport:
...
@ -123,8 +126,8 @@ class MsgTransport(Protocol[AddressType, MsgType]):
cls,
stream: trio.abc.Stream
) -> tuple[
AddressType, # local
AddressType # remote
Address, # local
Address # remote
]:
'''
Return the `trio` streaming transport prot's addrs for both
@ -133,14 +136,6 @@ class MsgTransport(Protocol[AddressType, MsgType]):
'''
...
@classmethod
def get_random_addr(self) -> AddressType:
...
@classmethod
def get_root_addr(self) -> AddressType:
...
class MsgpackTransport(MsgTransport):
@ -447,9 +442,9 @@ class MsgpackTransport(MsgTransport):
return self._aiter_pkts
@property
def laddr(self) -> AddressType:
def laddr(self) -> Address:
return self._laddr
@property
def raddr(self) -> AddressType:
def raddr(self) -> Address:
return self._raddr

View File

@ -13,49 +13,42 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Type, Union
from typing import Type
import trio
import socket
from ._transport import (
from tractor._addr import Address
from tractor.ipc._transport import (
MsgTransportKey,
MsgTransport
)
from ._tcp import MsgpackTCPStream
from ._uds import MsgpackUDSStream
from tractor.ipc._tcp import MsgpackTCPStream
from tractor.ipc._uds import MsgpackUDSStream
# manually updated list of all supported msg transport types
_msg_transports = [
MsgpackTCPStream,
MsgpackUDSStream
]
# manually updated list of all supported codec+transport types
key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = {
# convert a MsgTransportKey to the corresponding transport type
_key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = {
cls.key(): cls
for cls in _msg_transports
}
# all different address py types we use
AddressTypes = Union[
tuple([
cls.address_type
for cls in _msg_transports
])
]
default_lo_addrs: dict[MsgTransportKey, AddressTypes] = {
cls.key(): cls.get_root_addr()
# convert an Address wrapper to its corresponding transport type
_addr_to_transport: dict[Type[Address], Type[MsgTransport]] = {
cls.address_type: cls
for cls in _msg_transports
}
def transport_from_destaddr(
destaddr: AddressTypes,
def transport_from_addr(
addr: Address,
codec_key: str = 'msgpack',
) -> Type[MsgTransport]:
'''
@ -63,22 +56,12 @@ def transport_from_destaddr(
corresponding `MsgTransport` type.
'''
match destaddr:
case str():
return MsgpackUDSStream
case tuple():
if (
len(destaddr) == 2
and
isinstance(destaddr[0], str)
and
isinstance(destaddr[1], int)
):
return MsgpackTCPStream
try:
return _addr_to_transport[type(addr)]
except KeyError:
raise NotImplementedError(
f'No known transport for address {destaddr}'
f'No known transport for address {repr(addr)}'
)
@ -113,4 +96,4 @@ def transport_from_stream(
key = (codec_key, transport)
return _msg_transports[key]
return _key_to_transport[key]

View File

@ -18,13 +18,12 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
'''
from __future__ import annotations
import tempfile
from uuid import uuid4
import trio
from tractor.msg import MsgCodec
from tractor.log import get_logger
from tractor._addr import UDSAddress
from tractor.ipc._transport import MsgpackTransport
@ -37,9 +36,8 @@ class MsgpackUDSStream(MsgpackTransport):
using the ``msgspec`` codec lib.
'''
address_type = str
address_type = UDSAddress
layer_key: int = 7
name_key: str = 'uds'
# def __init__(
# self,
@ -54,19 +52,32 @@ class MsgpackUDSStream(MsgpackTransport):
# codec=codec
# )
@property
def maddr(self) -> str:
filepath = self.raddr.unwrap()
return (
f'/ipv4/localhost'
f'/{self.address_type.name_key}/{filepath}'
# f'/{self.chan.uid[0]}'
# f'/{self.cid}'
# f'/cid={cid_head}..{cid_tail}'
# TODO: ? not use this ^ right ?
)
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
@classmethod
async def connect_to(
cls,
filename: str,
addr: UDSAddress,
prefix_size: int = 4,
codec: MsgCodec|None = None,
**kwargs
) -> MsgpackUDSStream:
stream = await trio.open_unix_socket(
filename,
addr.unwrap(),
**kwargs
)
return MsgpackUDSStream(
@ -79,16 +90,8 @@ class MsgpackUDSStream(MsgpackTransport):
def get_stream_addrs(
cls,
stream: trio.SocketStream
) -> tuple[str, str]:
) -> tuple[UDSAddress, UDSAddress]:
return (
stream.socket.getsockname(),
stream.socket.getpeername(),
UDSAddress.from_addr(stream.socket.getsockname()),
UDSAddress.from_addr(stream.socket.getsockname()),
)
@classmethod
def get_random_addr(self) -> str:
return f'{tempfile.gettempdir()}/{uuid4()}.sock'
@classmethod
def get_root_addr(self) -> str:
return 'tractor.sock'

View File

@ -46,8 +46,8 @@ from msgspec import (
from tractor.msg import (
pretty_struct,
)
from tractor.ipc import AddressTypes
from tractor.log import get_logger
from tractor._addr import AddressTypes
log = get_logger('tractor.msgspec')