Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet 2d6b3922a6 Unwrap `UDSAddress` as `tuple[str, str]`, i.e. sin pid
Since in hindsight the real analog of a net-proto's "bindspace"
(normally its routing layer's addresses-port-set) is more akin to the
"location in the file-system" for a UDS socket file (aka the file's
parent directory) determines whether or not the "port" (aka it's
file-name) collides with any other.

So the `._filedir: Path` is like the allocated "address" and,
the `._filename: Path|str` is basically the "port",

at least in my mind.. Bp

Thinking about fs dirs like a "host address" means you can get
essentially the same benefits/behaviour of say an (ip)
addresses-port-space but using the (current process-namespace's)
filesys-tree. Note that for UDS sockets in particular the
network-namespace is what would normally isolate so called "abstract
sockets" (i.e. UDS sockets that do NOT use file-paths by setting `struct
sockaddr_un.sun_path = 'abstract', see `man unix`); using directories is
even easier and definitely more explicit/readable/immediately-obvious as
a human-user.

As such this reworks all the necessary `UDSAddress` meths,
- `.unwrap()` now returns a `tuple(str(._filedir, str(._filename))`,
- `wrap_address()` now matches UDS on a 2nd tuple `str()` element,
- `.get_root()` no longer passes `maybe_pid`.

AND adjusts `MsgpackUDSStream` to,
- use the new `unwrap_sockpath()` on the `socket.get[sock/peer]name()`
  output before passing directly as `UDSAddress.__init__(filedir, filename)`
  instead of via `.from_addr()`.
- also pass `maybe_pid`s to init since no longer included in the
  unwrapped-type form.
2025-04-03 22:24:24 -04:00
Tyler Goodlet 69fbe49d37 s/`._addr.preferred_transport`/`_state._def_tpt_proto`
Such that the "global-ish" setting (actor-local) is managed with the
others per actor-process and type it as a `Literal['tcp', 'uds']` of the
currently support protocol keys.

Here obvi `_tpt` is some kinda shorthand for "transport" and `_proto` is
for "protocol" Bp

Change imports and refs in all dependent modules.

Oh right, and disable UDS in `wrap_address()` for the moment while
i figure out how to avoid the unwrapped type collision..
2025-04-03 20:22:52 -04:00
Tyler Goodlet a99bec63a3 Add `Arbiter.is_registry()` in prep for proper `.discovery._registry` 2025-04-03 16:35:33 -04:00
7 changed files with 96 additions and 60 deletions

View File

@ -40,6 +40,7 @@ from ._state import (
get_rt_dir, get_rt_dir,
current_actor, current_actor,
is_root_process, is_root_process,
_def_tpt_proto,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -312,10 +313,18 @@ class UDSAddress(Address):
# root actor to create a registry address. # root actor to create a registry address.
maybe_pid: int|None = None, maybe_pid: int|None = None,
): ):
fdir = self._filedir = Path(filedir or self.def_bindspace).absolute() fdir = self._filedir = Path(
fpath = self._filepath = Path(filename) filedir
or
self.def_bindspace
).absolute()
fpath = self._filename = Path(filename)
fp: Path = fdir / fpath fp: Path = fdir / fpath
assert fp.is_absolute() assert (
fp.is_absolute()
and
fp == self.sockpath
)
# to track which "side" is the peer process by reading socket # to track which "side" is the peer process by reading socket
# credentials-info. # credentials-info.
@ -323,7 +332,7 @@ class UDSAddress(Address):
@property @property
def sockpath(self) -> Path: def sockpath(self) -> Path:
return self._filedir / self._filepath return self._filedir / self._filename
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
@ -346,19 +355,20 @@ class UDSAddress(Address):
def from_addr( def from_addr(
cls, cls,
addr: ( addr: (
tuple[Path|str|None, int] tuple[Path|str, Path|str]|Path|str
|Path|str
), ),
) -> UDSAddress: ) -> UDSAddress:
match addr: match addr:
case tuple()|list(): case tuple()|list():
sockpath: Path = Path(addr[0]) filedir = Path(addr[0])
filedir, filename = unwrap_sockpath(sockpath) filename = Path(addr[1])
pid: int = addr[1] # sockpath: Path = Path(addr[0])
# filedir, filename = unwrap_sockpath(sockpath)
# pid: int = addr[1]
return UDSAddress( return UDSAddress(
filedir=filedir, filedir=filedir,
filename=filename, filename=filename,
maybe_pid=pid, # maybe_pid=pid,
) )
# NOTE, in case we ever decide to just `.unwrap()` # NOTE, in case we ever decide to just `.unwrap()`
# to a `Path|str`? # to a `Path|str`?
@ -376,8 +386,8 @@ class UDSAddress(Address):
# XXX NOTE, since this gets passed DIRECTLY to # XXX NOTE, since this gets passed DIRECTLY to
# `.ipc._uds.open_unix_socket_w_passcred()` # `.ipc._uds.open_unix_socket_w_passcred()`
return ( return (
str(self.sockpath), str(self._filedir),
self._pid, str(self._filename),
) )
@classmethod @classmethod
@ -408,18 +418,18 @@ class UDSAddress(Address):
@classmethod @classmethod
def get_root(cls) -> Address: def get_root(cls) -> Address:
def_uds_filepath: Path = 'registry@1616.sock' def_uds_filename: Path = 'registry@1616.sock'
return UDSAddress( return UDSAddress(
filedir=None, filedir=None,
filename=def_uds_filepath, filename=def_uds_filename,
maybe_pid=1616, # maybe_pid=1616,
) )
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return (
f'{type(self).__name__}' f'{type(self).__name__}'
f'[' f'['
f'({self.sockpath}, {self._pid})' f'({self._filedir}, {self._filename})'
f']' f']'
) )
@ -429,7 +439,7 @@ class UDSAddress(Address):
f'Can not compare {type(other)} with {type(self)}' f'Can not compare {type(other)} with {type(self)}'
) )
return self._filepath == other._filepath return self.sockpath == other.sockpath
# async def open_listener(self, **kwargs) -> SocketListener: # async def open_listener(self, **kwargs) -> SocketListener:
async def open_listener( async def open_listener(
@ -461,9 +471,6 @@ class UDSAddress(Address):
os.unlink(self.sockpath) os.unlink(self.sockpath)
preferred_transport: str = 'uds'
_address_types: bidict[str, Type[Address]] = { _address_types: bidict[str, Type[Address]] = {
'tcp': TCPAddress, 'tcp': TCPAddress,
'uds': UDSAddress 'uds': UDSAddress
@ -522,14 +529,6 @@ def wrap_address(
# if 'sock' in addr[0]: # if 'sock' in addr[0]:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
match addr: match addr:
# TODO! BUT THIS WILL MATCH FOR TCP !...
# -[ ] so prolly go back to what guille had orig XD
# a plain ol' `str`?
case ((
str()|Path(),
int(),
)):
cls = UDSAddress
# classic network socket-address as tuple/list # classic network socket-address as tuple/list
case ( case (
@ -539,21 +538,32 @@ def wrap_address(
): ):
cls = TCPAddress cls = TCPAddress
case (
# (str()|Path(), str()|Path()),
# ^TODO? uhh why doesn't this work!?
(_, filename)
) if type(filename) is str:
cls = UDSAddress
# likely an unset UDS or TCP reg address as defaulted in # likely an unset UDS or TCP reg address as defaulted in
# `_state._runtime_vars['_root_mailbox']` # `_state._runtime_vars['_root_mailbox']`
#
# TODO? figure out when/if we even need this?
case ( case (
None None
| |
[None, None] [None, None]
): ):
cls: Type[Address] = get_address_cls(preferred_transport) cls: Type[Address] = get_address_cls(_def_tpt_proto)
addr: UnwrappedAddress = cls.get_root().unwrap() addr: UnwrappedAddress = cls.get_root().unwrap()
case _: case _:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
raise TypeError( raise TypeError(
f'Can not wrap address {type(addr)}\n' f'Can not wrap unwrapped-address ??\n'
f'{addr!r}\n' f'type(addr): {type(addr)!r}\n'
f'addr: {addr!r}\n'
) )
return cls.from_addr(addr) return cls.from_addr(addr)

View File

@ -33,7 +33,6 @@ from .ipc import _connect_chan, Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, UnwrappedAddress,
Address, Address,
preferred_transport,
wrap_address wrap_address
) )
from ._portal import ( from ._portal import (
@ -44,6 +43,7 @@ from ._portal import (
from ._state import ( from ._state import (
current_actor, current_actor,
_runtime_vars, _runtime_vars,
_def_tpt_proto,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -203,7 +203,7 @@ async def maybe_open_portal(
async def find_actor( async def find_actor(
name: str, name: str,
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[UnwrappedAddress]|None = None,
enable_transports: list[str] = [preferred_transport], enable_transports: list[str] = [_def_tpt_proto],
only_first: bool = True, only_first: bool = True,
raise_on_none: bool = False, raise_on_none: bool = False,

View File

@ -56,7 +56,6 @@ from ._addr import (
UnwrappedAddress, UnwrappedAddress,
default_lo_addrs, default_lo_addrs,
mk_uuid, mk_uuid,
preferred_transport,
wrap_address, wrap_address,
) )
from ._exceptions import ( from ._exceptions import (
@ -139,6 +138,7 @@ async def maybe_block_bp(
os.environ.pop('PYTHONBREAKPOINT', None) os.environ.pop('PYTHONBREAKPOINT', None)
@acm @acm
async def open_root_actor( async def open_root_actor(
*, *,
@ -148,7 +148,9 @@ async def open_root_actor(
# defaults are above # defaults are above
arbiter_addr: tuple[UnwrappedAddress]|None = None, arbiter_addr: tuple[UnwrappedAddress]|None = None,
enable_transports: list[str] = [preferred_transport], enable_transports: list[
_state.TransportProtocolKey,
] = [_state._def_tpt_proto],
name: str|None = 'root', name: str|None = 'root',

View File

@ -80,7 +80,6 @@ from ._addr import (
Address, Address,
default_lo_addrs, default_lo_addrs,
get_address_cls, get_address_cls,
preferred_transport,
wrap_address, wrap_address,
) )
from ._context import ( from ._context import (
@ -1322,7 +1321,9 @@ class Actor:
''' '''
if listen_addrs is None: if listen_addrs is None:
listen_addrs = default_lo_addrs([preferred_transport]) listen_addrs = default_lo_addrs([
_state._def_tpt_proto
])
else: else:
listen_addrs: list[Address] = [ listen_addrs: list[Address] = [
@ -1846,7 +1847,7 @@ async def async_main(
enable_transports: list[str] = ( enable_transports: list[str] = (
maybe_preferred_transports_says_rent maybe_preferred_transports_says_rent
or or
[preferred_transport] [_state._def_tpt_proto]
) )
for transport_key in enable_transports: for transport_key in enable_transports:
transport_cls: Type[Address] = get_address_cls( transport_cls: Type[Address] = get_address_cls(
@ -2134,15 +2135,15 @@ async def async_main(
log.info(teardown_report) log.info(teardown_report)
# TODO: rename to `Registry` and move to `._discovery`! # TODO: rename to `Registry` and move to `.discovery._registry`!
class Arbiter(Actor): class Arbiter(Actor):
''' '''
A special registrar actor who can contact all other actors A special registrar (and for now..) `Actor` who can contact all
within its immediate process tree and possibly keeps a registry other actors within its immediate process tree and possibly keeps
of others meant to be discoverable in a distributed a registry of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor" application. Normally the registrar is also the "root actor" and
and thus always has access to the top-most-level actor thus always has access to the top-most-level actor (process)
(process) nursery. nursery.
By default, the registrar is always initialized when and if no By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime other registrar socket addrs have been specified to runtime
@ -2162,6 +2163,12 @@ class Arbiter(Actor):
''' '''
is_arbiter = True is_arbiter = True
# TODO, implement this as a read on there existing a `._state` of
# some sort setup by whenever we impl this all as
# a `.discovery._registry.open_registry()` API
def is_registry(self) -> bool:
return self.is_arbiter
def __init__( def __init__(
self, self,
*args, *args,

View File

@ -26,6 +26,7 @@ import os
from pathlib import Path from pathlib import Path
from typing import ( from typing import (
Any, Any,
Literal,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -164,3 +165,11 @@ def get_rt_dir(
if not rtdir.is_dir(): if not rtdir.is_dir():
rtdir.mkdir() rtdir.mkdir()
return rtdir return rtdir
# default IPC transport protocol settings
TransportProtocolKey = Literal[
'tcp',
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'

View File

@ -34,7 +34,6 @@ import trio
from .devx._debug import maybe_wait_for_debugger from .devx._debug import maybe_wait_for_debugger
from ._addr import ( from ._addr import (
UnwrappedAddress, UnwrappedAddress,
preferred_transport,
mk_uuid, mk_uuid,
) )
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process
@ -45,7 +44,9 @@ from ._exceptions import (
is_multi_cancelled, is_multi_cancelled,
ContextCancelled, ContextCancelled,
) )
from ._root import open_root_actor from ._root import (
open_root_actor,
)
from . import _state from . import _state
from . import _spawn from . import _spawn
@ -138,7 +139,7 @@ class ActorNursery:
bind_addrs: list[UnwrappedAddress]|None = None, bind_addrs: list[UnwrappedAddress]|None = None,
rpc_module_paths: list[str]|None = None, rpc_module_paths: list[str]|None = None,
enable_transports: list[str] = [preferred_transport], enable_transports: list[str] = [_state._def_tpt_proto],
enable_modules: list[str]|None = None, enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor loglevel: str|None = None, # set log level per subactor
debug_mode: bool|None = None, debug_mode: bool|None = None,

View File

@ -38,7 +38,10 @@ from trio._highlevel_open_unix_stream import (
from tractor.msg import MsgCodec from tractor.msg import MsgCodec
from tractor.log import get_logger from tractor.log import get_logger
from tractor._addr import UDSAddress from tractor._addr import (
UDSAddress,
unwrap_sockpath,
)
from tractor.ipc._transport import MsgpackTransport from tractor.ipc._transport import MsgpackTransport
@ -194,16 +197,20 @@ class MsgpackUDSStream(MsgpackTransport):
case (bytes(), str()): case (bytes(), str()):
sock_path: Path = Path(sockname) sock_path: Path = Path(sockname)
( (
pid, peer_pid,
uid, _,
gid, _,
) = get_peer_info(sock) ) = get_peer_info(sock)
laddr = UDSAddress.from_addr((
sock_path, filedir, filename = unwrap_sockpath(sock_path)
os.getpid(), laddr = UDSAddress(
)) filedir=filedir,
raddr = UDSAddress.from_addr(( filename=filename,
sock_path, maybe_pid=os.getpid(),
pid )
)) raddr = UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=peer_pid
)
return (laddr, raddr) return (laddr, raddr)