Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet 2d6b3922a6 Unwrap `UDSAddress` as `tuple[str, str]`, i.e. sin pid
Since in hindsight the real analog of a net-proto's "bindspace"
(normally its routing layer's addresses-port-set) is more akin to the
"location in the file-system" for a UDS socket file (aka the file's
parent directory) determines whether or not the "port" (aka it's
file-name) collides with any other.

So the `._filedir: Path` is like the allocated "address" and,
the `._filename: Path|str` is basically the "port",

at least in my mind.. Bp

Thinking about fs dirs like a "host address" means you can get
essentially the same benefits/behaviour of say an (ip)
addresses-port-space but using the (current process-namespace's)
filesys-tree. Note that for UDS sockets in particular the
network-namespace is what would normally isolate so called "abstract
sockets" (i.e. UDS sockets that do NOT use file-paths by setting `struct
sockaddr_un.sun_path = 'abstract', see `man unix`); using directories is
even easier and definitely more explicit/readable/immediately-obvious as
a human-user.

As such this reworks all the necessary `UDSAddress` meths,
- `.unwrap()` now returns a `tuple(str(._filedir, str(._filename))`,
- `wrap_address()` now matches UDS on a 2nd tuple `str()` element,
- `.get_root()` no longer passes `maybe_pid`.

AND adjusts `MsgpackUDSStream` to,
- use the new `unwrap_sockpath()` on the `socket.get[sock/peer]name()`
  output before passing directly as `UDSAddress.__init__(filedir, filename)`
  instead of via `.from_addr()`.
- also pass `maybe_pid`s to init since no longer included in the
  unwrapped-type form.
2025-04-03 22:24:24 -04:00
Tyler Goodlet 69fbe49d37 s/`._addr.preferred_transport`/`_state._def_tpt_proto`
Such that the "global-ish" setting (actor-local) is managed with the
others per actor-process and type it as a `Literal['tcp', 'uds']` of the
currently support protocol keys.

Here obvi `_tpt` is some kinda shorthand for "transport" and `_proto` is
for "protocol" Bp

Change imports and refs in all dependent modules.

Oh right, and disable UDS in `wrap_address()` for the moment while
i figure out how to avoid the unwrapped type collision..
2025-04-03 20:22:52 -04:00
Tyler Goodlet a99bec63a3 Add `Arbiter.is_registry()` in prep for proper `.discovery._registry` 2025-04-03 16:35:33 -04:00
7 changed files with 96 additions and 60 deletions

View File

@ -40,6 +40,7 @@ from ._state import (
get_rt_dir,
current_actor,
is_root_process,
_def_tpt_proto,
)
if TYPE_CHECKING:
@ -312,10 +313,18 @@ class UDSAddress(Address):
# root actor to create a registry address.
maybe_pid: int|None = None,
):
fdir = self._filedir = Path(filedir or self.def_bindspace).absolute()
fpath = self._filepath = Path(filename)
fdir = self._filedir = Path(
filedir
or
self.def_bindspace
).absolute()
fpath = self._filename = Path(filename)
fp: Path = fdir / fpath
assert fp.is_absolute()
assert (
fp.is_absolute()
and
fp == self.sockpath
)
# to track which "side" is the peer process by reading socket
# credentials-info.
@ -323,7 +332,7 @@ class UDSAddress(Address):
@property
def sockpath(self) -> Path:
return self._filedir / self._filepath
return self._filedir / self._filename
@property
def is_valid(self) -> bool:
@ -346,19 +355,20 @@ class UDSAddress(Address):
def from_addr(
cls,
addr: (
tuple[Path|str|None, int]
|Path|str
tuple[Path|str, Path|str]|Path|str
),
) -> UDSAddress:
match addr:
case tuple()|list():
sockpath: Path = Path(addr[0])
filedir, filename = unwrap_sockpath(sockpath)
pid: int = addr[1]
filedir = Path(addr[0])
filename = Path(addr[1])
# sockpath: Path = Path(addr[0])
# filedir, filename = unwrap_sockpath(sockpath)
# pid: int = addr[1]
return UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=pid,
# maybe_pid=pid,
)
# NOTE, in case we ever decide to just `.unwrap()`
# to a `Path|str`?
@ -376,8 +386,8 @@ class UDSAddress(Address):
# XXX NOTE, since this gets passed DIRECTLY to
# `.ipc._uds.open_unix_socket_w_passcred()`
return (
str(self.sockpath),
self._pid,
str(self._filedir),
str(self._filename),
)
@classmethod
@ -408,18 +418,18 @@ class UDSAddress(Address):
@classmethod
def get_root(cls) -> Address:
def_uds_filepath: Path = 'registry@1616.sock'
def_uds_filename: Path = 'registry@1616.sock'
return UDSAddress(
filedir=None,
filename=def_uds_filepath,
maybe_pid=1616,
filename=def_uds_filename,
# maybe_pid=1616,
)
def __repr__(self) -> str:
return (
f'{type(self).__name__}'
f'['
f'({self.sockpath}, {self._pid})'
f'({self._filedir}, {self._filename})'
f']'
)
@ -429,7 +439,7 @@ class UDSAddress(Address):
f'Can not compare {type(other)} with {type(self)}'
)
return self._filepath == other._filepath
return self.sockpath == other.sockpath
# async def open_listener(self, **kwargs) -> SocketListener:
async def open_listener(
@ -461,9 +471,6 @@ class UDSAddress(Address):
os.unlink(self.sockpath)
preferred_transport: str = 'uds'
_address_types: bidict[str, Type[Address]] = {
'tcp': TCPAddress,
'uds': UDSAddress
@ -522,14 +529,6 @@ def wrap_address(
# if 'sock' in addr[0]:
# import pdbp; pdbp.set_trace()
match addr:
# TODO! BUT THIS WILL MATCH FOR TCP !...
# -[ ] so prolly go back to what guille had orig XD
# a plain ol' `str`?
case ((
str()|Path(),
int(),
)):
cls = UDSAddress
# classic network socket-address as tuple/list
case (
@ -539,21 +538,32 @@ def wrap_address(
):
cls = TCPAddress
case (
# (str()|Path(), str()|Path()),
# ^TODO? uhh why doesn't this work!?
(_, filename)
) if type(filename) is str:
cls = UDSAddress
# likely an unset UDS or TCP reg address as defaulted in
# `_state._runtime_vars['_root_mailbox']`
#
# TODO? figure out when/if we even need this?
case (
None
|
[None, None]
):
cls: Type[Address] = get_address_cls(preferred_transport)
cls: Type[Address] = get_address_cls(_def_tpt_proto)
addr: UnwrappedAddress = cls.get_root().unwrap()
case _:
# import pdbp; pdbp.set_trace()
raise TypeError(
f'Can not wrap address {type(addr)}\n'
f'{addr!r}\n'
f'Can not wrap unwrapped-address ??\n'
f'type(addr): {type(addr)!r}\n'
f'addr: {addr!r}\n'
)
return cls.from_addr(addr)

View File

@ -33,7 +33,6 @@ from .ipc import _connect_chan, Channel
from ._addr import (
UnwrappedAddress,
Address,
preferred_transport,
wrap_address
)
from ._portal import (
@ -44,6 +43,7 @@ from ._portal import (
from ._state import (
current_actor,
_runtime_vars,
_def_tpt_proto,
)
if TYPE_CHECKING:
@ -203,7 +203,7 @@ async def maybe_open_portal(
async def find_actor(
name: str,
registry_addrs: list[UnwrappedAddress]|None = None,
enable_transports: list[str] = [preferred_transport],
enable_transports: list[str] = [_def_tpt_proto],
only_first: bool = True,
raise_on_none: bool = False,

View File

@ -56,7 +56,6 @@ from ._addr import (
UnwrappedAddress,
default_lo_addrs,
mk_uuid,
preferred_transport,
wrap_address,
)
from ._exceptions import (
@ -139,6 +138,7 @@ async def maybe_block_bp(
os.environ.pop('PYTHONBREAKPOINT', None)
@acm
async def open_root_actor(
*,
@ -148,7 +148,9 @@ async def open_root_actor(
# defaults are above
arbiter_addr: tuple[UnwrappedAddress]|None = None,
enable_transports: list[str] = [preferred_transport],
enable_transports: list[
_state.TransportProtocolKey,
] = [_state._def_tpt_proto],
name: str|None = 'root',

View File

@ -80,7 +80,6 @@ from ._addr import (
Address,
default_lo_addrs,
get_address_cls,
preferred_transport,
wrap_address,
)
from ._context import (
@ -1322,7 +1321,9 @@ class Actor:
'''
if listen_addrs is None:
listen_addrs = default_lo_addrs([preferred_transport])
listen_addrs = default_lo_addrs([
_state._def_tpt_proto
])
else:
listen_addrs: list[Address] = [
@ -1846,7 +1847,7 @@ async def async_main(
enable_transports: list[str] = (
maybe_preferred_transports_says_rent
or
[preferred_transport]
[_state._def_tpt_proto]
)
for transport_key in enable_transports:
transport_cls: Type[Address] = get_address_cls(
@ -2134,15 +2135,15 @@ async def async_main(
log.info(teardown_report)
# TODO: rename to `Registry` and move to `._discovery`!
# TODO: rename to `Registry` and move to `.discovery._registry`!
class Arbiter(Actor):
'''
A special registrar actor who can contact all other actors
within its immediate process tree and possibly keeps a registry
of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor"
and thus always has access to the top-most-level actor
(process) nursery.
A special registrar (and for now..) `Actor` who can contact all
other actors within its immediate process tree and possibly keeps
a registry of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor" and
thus always has access to the top-most-level actor (process)
nursery.
By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime
@ -2162,6 +2163,12 @@ class Arbiter(Actor):
'''
is_arbiter = True
# TODO, implement this as a read on there existing a `._state` of
# some sort setup by whenever we impl this all as
# a `.discovery._registry.open_registry()` API
def is_registry(self) -> bool:
return self.is_arbiter
def __init__(
self,
*args,

View File

@ -26,6 +26,7 @@ import os
from pathlib import Path
from typing import (
Any,
Literal,
TYPE_CHECKING,
)
@ -164,3 +165,11 @@ def get_rt_dir(
if not rtdir.is_dir():
rtdir.mkdir()
return rtdir
# default IPC transport protocol settings
TransportProtocolKey = Literal[
'tcp',
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'

View File

@ -34,7 +34,6 @@ import trio
from .devx._debug import maybe_wait_for_debugger
from ._addr import (
UnwrappedAddress,
preferred_transport,
mk_uuid,
)
from ._state import current_actor, is_main_process
@ -45,7 +44,9 @@ from ._exceptions import (
is_multi_cancelled,
ContextCancelled,
)
from ._root import open_root_actor
from ._root import (
open_root_actor,
)
from . import _state
from . import _spawn
@ -138,7 +139,7 @@ class ActorNursery:
bind_addrs: list[UnwrappedAddress]|None = None,
rpc_module_paths: list[str]|None = None,
enable_transports: list[str] = [preferred_transport],
enable_transports: list[str] = [_state._def_tpt_proto],
enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor
debug_mode: bool|None = None,

View File

@ -38,7 +38,10 @@ from trio._highlevel_open_unix_stream import (
from tractor.msg import MsgCodec
from tractor.log import get_logger
from tractor._addr import UDSAddress
from tractor._addr import (
UDSAddress,
unwrap_sockpath,
)
from tractor.ipc._transport import MsgpackTransport
@ -194,16 +197,20 @@ class MsgpackUDSStream(MsgpackTransport):
case (bytes(), str()):
sock_path: Path = Path(sockname)
(
pid,
uid,
gid,
peer_pid,
_,
_,
) = get_peer_info(sock)
laddr = UDSAddress.from_addr((
sock_path,
os.getpid(),
))
raddr = UDSAddress.from_addr((
sock_path,
pid
))
filedir, filename = unwrap_sockpath(sock_path)
laddr = UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=os.getpid(),
)
raddr = UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=peer_pid
)
return (laddr, raddr)