Rework/simplify transport addressing

A few things that can fundamentally change,

- UDS addresses now always encapsulate the local and remote pid such
  that it denotes each side's process much like a TCP *port*.
  |_ `.__init__()` takes a new `maybe_pid: int`.
  |_ this required changes to the `.ipc._uds` backend which will come in
     an subsequent commit!
  |_ `UDSAddress.address_type` becomes a `tuple[str, int]` just like the
      TCP case.
  |_ adjust `wrap_address()` to match.
- use a new `_state.get_rt_dir() -> Path` as the default location for
  UDS socket file: now under `XDG_RUNTIME_DIR'/tractor/` subdir by
  default.
- re-implement `USDAddress.get_random()` to use both the local
  `Actor.uid` (if available) and at least the pid for its socket file
  name.

Removals,
- drop the loop generated `_default_addrs`, simplify to just
  `_default_lo_addrs` for per-transport default registry addresses.
  |_ change to `_address_types: dict[str, Type[Address]]` instead of
     separate types `list`.
  |_ adjust `is_wrapped_addr()` to just check `in _addr_types.values()`.
- comment out `Address.open_stream()` it's unused and i think the wrong
  place for this API.

Renames,
- from `AddressTypes` -> `UnwrappedAddress`, since it's a simple type
  union and all this type set is, is the simple python data-structures
  we encode to for the wire.
  |_ see note about possibly implementing the `.[un]wrap()` stuff as
     `msgspec` codec `enc/dec_hook()`s instead!

Additions,
- add a `mk_uuid()` to be used throughout the runtime including for
  generating the `Aid.uuid` part.
- tons of notes around follow up refinements!
ns_aware
Tyler Goodlet 2025-03-30 18:30:43 -04:00
parent efd11f7d74
commit 9de192390a
2 changed files with 238 additions and 67 deletions

View File

@ -14,20 +14,31 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations from __future__ import annotations
from pathlib import Path
import os import os
import tempfile # import tempfile
from uuid import uuid4 from uuid import uuid4
from typing import ( from typing import (
Protocol, Protocol,
ClassVar, ClassVar,
TypeVar, TypeVar,
Union, Union,
Type Type,
TYPE_CHECKING,
) )
from bidict import bidict
import trio import trio
from trio import socket from trio import socket
from ._state import (
get_rt_dir,
current_actor,
)
if TYPE_CHECKING:
from ._runtime import Actor
NamespaceType = TypeVar('NamespaceType') NamespaceType = TypeVar('NamespaceType')
AddressType = TypeVar('AddressType') AddressType = TypeVar('AddressType')
@ -58,12 +69,24 @@ class Address(Protocol[
... ...
def unwrap(self) -> AddressType: def unwrap(self) -> AddressType:
'''
Deliver the underying minimum field set in
a primitive python data type-structure.
'''
... ...
@classmethod @classmethod
def get_random(cls, namespace: NamespaceType | None = None) -> Address: def get_random(cls, namespace: NamespaceType | None = None) -> Address:
... ...
# TODO, this should be something like a `.get_def_registar_addr()`
# or similar since,
# - it should be a **host singleton** (not root/tree singleton)
# - we **only need this value** when one isn't provided to the
# runtime at boot and we want to implicitly provide a host-wide
# registrar.
# - each rooted-actor-tree should likely have its own
# micro-registry (likely the root being it), also see
@classmethod @classmethod
def get_root(cls) -> Address: def get_root(cls) -> Address:
... ...
@ -74,8 +97,13 @@ class Address(Protocol[
def __eq__(self, other) -> bool: def __eq__(self, other) -> bool:
... ...
async def open_stream(self, **kwargs) -> StreamType: # async def open_stream(self, **kwargs) -> StreamType:
... # '''
# Open a connection *TO* this address and deliver back a
# `trio.SocketStream` wrapping the underlying transport.
# '''
# ...
async def open_listener(self, **kwargs) -> ListenerType: async def open_listener(self, **kwargs) -> ListenerType:
... ...
@ -104,9 +132,12 @@ class TCPAddress(Address[
or or
not isinstance(port, int) not isinstance(port, int)
): ):
raise TypeError(f'Expected host {host} to be str and port {port} to be int') raise TypeError(
self._host = host f'Expected host {host!r} to be str and port {port!r} to be int'
self._port = port )
self._host: str = host
self._port: int = port
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
@ -117,14 +148,23 @@ class TCPAddress(Address[
return self._host return self._host
@classmethod @classmethod
def from_addr(cls, addr: tuple[str, int]) -> TCPAddress: def from_addr(
cls,
addr: tuple[str, int]
) -> TCPAddress:
return TCPAddress(addr[0], addr[1]) return TCPAddress(addr[0], addr[1])
def unwrap(self) -> tuple[str, int]: def unwrap(self) -> tuple[str, int]:
return self._host, self._port return (
self._host,
self._port,
)
@classmethod @classmethod
def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress: def get_random(
cls,
namespace: str = '127.0.0.1',
) -> TCPAddress:
return TCPAddress(namespace, 0) return TCPAddress(namespace, 0)
@classmethod @classmethod
@ -132,7 +172,9 @@ class TCPAddress(Address[
return TCPAddress('127.0.0.1', 1616) return TCPAddress('127.0.0.1', 1616)
def __repr__(self) -> str: def __repr__(self) -> str:
return f'{type(self)} @ {self.unwrap()}' return (
f'{type(self).__name__}[{self.unwrap()}]'
)
def __eq__(self, other) -> bool: def __eq__(self, other) -> bool:
if not isinstance(other, TCPAddress): if not isinstance(other, TCPAddress):
@ -146,14 +188,14 @@ class TCPAddress(Address[
self._port == other._port self._port == other._port
) )
async def open_stream(self, **kwargs) -> trio.SocketStream: # async def open_stream(self, **kwargs) -> trio.SocketStream:
stream = await trio.open_tcp_stream( # stream = await trio.open_tcp_stream(
self._host, # self._host,
self._port, # self._port,
**kwargs # **kwargs
) # )
self._host, self._port = stream.socket.getsockname()[:2] # self._host, self._port = stream.socket.getsockname()[:2]
return stream # return stream
async def open_listener(self, **kwargs) -> trio.SocketListener: async def open_listener(self, **kwargs) -> trio.SocketListener:
listeners = await trio.open_tcp_listeners( listeners = await trio.open_tcp_listeners(
@ -177,14 +219,23 @@ class UDSAddress(Address[
trio.SocketListener trio.SocketListener
]): ]):
# TODO, maybe we should use 'unix' instead?
# -[ ] need to check what other mult-transport frameworks do
# like zmq, nng, uri-spec et al!
name_key: str = 'uds' name_key: str = 'uds'
address_type: type = str address_type: type = tuple[str, int]
def __init__( def __init__(
self, self,
filepath: str filepath: str|Path,
maybe_pid: int,
# ^XXX, in the sense you can also pass
# a "non-real-world-process-id" such as is handy to represent
# our host-local default "port-like" key for the very first
# root actor to create a registry address.
): ):
self._filepath = filepath self._filepath: Path = Path(filepath).absolute()
self._pid: int = maybe_pid
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
@ -195,22 +246,65 @@ class UDSAddress(Address[
return return
@classmethod @classmethod
def from_addr(cls, filepath: str) -> UDSAddress: def from_addr(
return UDSAddress(filepath) cls,
addr: tuple[Path, int]
) -> UDSAddress:
return UDSAddress(
filepath=addr[0],
maybe_pid=addr[1],
)
def unwrap(self) -> str: def unwrap(self) -> tuple[Path, int]:
return self._filepath return (
str(self._filepath),
# XXX NOTE, since this gets passed DIRECTLY to
# `trio.open_unix_
self._pid,
)
@classmethod @classmethod
def get_random(cls, namespace: None = None) -> UDSAddress: def get_random(
return UDSAddress(f'{tempfile.gettempdir()}/{uuid4()}.sock') cls,
namespace: None = None, # unused
) -> UDSAddress:
rt_dir: Path = get_rt_dir()
pid: int = os.getpid()
actor: Actor|None = current_actor(
err_on_no_runtime=False,
)
if actor:
sockname: str = '::'.join(actor.uid) + f'@{pid}'
else:
sockname: str = f'@{pid}'
sockpath: Path = Path(f'{rt_dir}/{sockname}.sock')
return UDSAddress(
# filename=f'{tempfile.gettempdir()}/{uuid4()}.sock'
filepath=sockpath,
maybe_pid=pid,
)
@classmethod @classmethod
def get_root(cls) -> Address: def get_root(cls) -> Address:
return UDSAddress('tractor.sock') def_uds_filepath: Path = (
get_rt_dir()
/
'registry@1616.sock'
)
return UDSAddress(
filepath=def_uds_filepath,
maybe_pid=1616
)
def __repr__(self) -> str: def __repr__(self) -> str:
return f'{type(self)} @ {self._filepath}' return (
f'{type(self).__name__}'
f'['
f'({self._filepath}, {self._pid})'
f']'
)
def __eq__(self, other) -> bool: def __eq__(self, other) -> bool:
if not isinstance(other, UDSAddress): if not isinstance(other, UDSAddress):
@ -220,15 +314,23 @@ class UDSAddress(Address[
return self._filepath == other._filepath return self._filepath == other._filepath
async def open_stream(self, **kwargs) -> trio.SocketStream: # TODO? remove right, it's never used?
stream = await trio.open_unix_socket( #
self._filepath, # async def open_stream(
**kwargs # self,
) # **kwargs,
return stream # ) -> trio.SocketStream:
# stream: trio.SocketStream = await trio.open_unix_socket(
# self._filepath,
# **kwargs
# )
# return stream
async def open_listener(self, **kwargs) -> trio.SocketListener: async def open_listener(self, **kwargs) -> trio.SocketListener:
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM
)
await self._sock.bind(self._filepath) await self._sock.bind(self._filepath)
self._sock.listen(1) self._sock.listen(1)
return trio.SocketListener(self._sock) return trio.SocketListener(self._sock)
@ -238,72 +340,120 @@ class UDSAddress(Address[
os.unlink(self._filepath) os.unlink(self._filepath)
preferred_transport = 'uds' preferred_transport: str = 'uds'
_address_types = ( _address_types: bidict[str, Type[Address]] = {
TCPAddress, 'tcp': TCPAddress,
UDSAddress 'uds': UDSAddress
)
_default_addrs: dict[str, Type[Address]] = {
cls.name_key: cls
for cls in _address_types
} }
AddressTypes = Union[ # TODO, can't we just use a type alias
tuple([ # for this? namely just some `tuple[str, int, str, str]`?
cls.address_type #
for cls in _address_types # -[ ] would also just be simpler to keep this as SockAddr[tuple]
]) # or something, implying it's just a simple pair of values which can
# presumably be mapped to all transports?
# -[ ] `pydoc socket.socket.getsockname()` delivers a 4-tuple for
# ipv6 `(hostaddr, port, flowinfo, scope_id)`.. so how should we
# handle that?
# -[ ] as a further alternative to this wrap()/unwrap() approach we
# could just implement `enc/dec_hook()`s for the `Address`-types
# and just deal with our internal objs directly and always and
# leave it to the codec layer to figure out marshalling?
# |_ would mean only one spot to do the `.unwrap()` (which we may
# end up needing to call from the hook()s anyway?)
# -[x] rename to `UnwrappedAddress[Descriptor]` ??
# seems like the right name as per,
# https://www.geeksforgeeks.org/introduction-to-address-descriptor/
#
UnwrappedAddress = Union[
tuple[
str, # (net/cgroup-)namespace/host-domain
int, # (p)id/port
] # tcp/udp/uds
# ?TODO? should we also include another 2 fields from
# our `Aid` msg such that we include the runtime `Actor.uid`
# of `.name` and `.uuid`?
# - would ensure uniqueness across entire net?
# - allows for easier runtime-level filtering of "actors by
# service name"
] ]
# TODO! really these are discovery sys default addrs ONLY useful for
# when none is provided to a root actor on first boot.
_default_lo_addrs: dict[ _default_lo_addrs: dict[
str, str,
AddressTypes UnwrappedAddress
] = { ] = {
cls.name_key: cls.get_root().unwrap() 'tcp': TCPAddress(
for cls in _address_types host='127.0.0.1',
port=1616,
).unwrap(),
'uds': UDSAddress.get_root().unwrap(),
} }
def get_address_cls(name: str) -> Type[Address]: def get_address_cls(name: str) -> Type[Address]:
return _default_addrs[name] return _address_types[name]
def is_wrapped_addr(addr: any) -> bool: def is_wrapped_addr(addr: any) -> bool:
return type(addr) in _address_types return type(addr) in _address_types.values()
def wrap_address(addr: AddressTypes) -> Address: def mk_uuid() -> str:
'''
Encapsulate creation of a uuid4 as `str` as used
for creating `Actor.uid: tuple[str, str]` and/or
`.msg.types.Aid`.
'''
return str(uuid4())
def wrap_address(
addr: UnwrappedAddress
) -> Address:
if is_wrapped_addr(addr): if is_wrapped_addr(addr):
return addr return addr
cls = None cls: Type|None = None
match addr: match addr:
case str(): case (
str()|Path(),
int(),
):
cls = UDSAddress cls = UDSAddress
case tuple() | list(): case tuple() | list():
cls = TCPAddress cls = TCPAddress
case None: case None:
cls = get_address_cls(preferred_transport) cls: Type[Address] = get_address_cls(preferred_transport)
addr = cls.get_root().unwrap() addr: AddressType = cls.get_root().unwrap()
case _: case _:
raise TypeError( raise TypeError(
f'Can not wrap addr {addr} of type {type(addr)}' f'Can not wrap address {type(addr)}\n'
f'{addr!r}\n'
) )
return cls.from_addr(addr) return cls.from_addr(addr)
def default_lo_addrs(transports: list[str]) -> list[AddressTypes]: def default_lo_addrs(
transports: list[str],
) -> list[Type[Address]]:
'''
Return the default, host-singleton, registry address
for an input transport key set.
'''
return [ return [
_default_lo_addrs[transport] _default_lo_addrs[transport]
for transport in transports for transport in transports

View File

@ -14,14 +14,16 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" '''
Per process state Per actor-process runtime state mgmt APIs.
""" '''
from __future__ import annotations from __future__ import annotations
from contextvars import ( from contextvars import (
ContextVar, ContextVar,
) )
import os
from pathlib import Path
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
@ -143,3 +145,22 @@ def current_ipc_ctx(
f'|_{current_task()}\n' f'|_{current_task()}\n'
) )
return ctx return ctx
# std ODE (mutable) app state location
_rtdir: Path = Path(os.environ['XDG_RUNTIME_DIR'])
def get_rt_dir(
subdir: str = 'tractor'
) -> Path:
'''
Return the user "runtime dir" where most userspace apps stick
their IPC and cache related system util-files; we take hold
of a `'XDG_RUNTIME_DIR'/tractor/` subdir by default.
'''
rtdir: Path = _rtdir / subdir
if not rtdir.is_dir():
rtdir.mkdir()
return rtdir