From e025959d604480974e10dc346a9a7fb10dc211a7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Apr 2025 21:53:03 -0400 Subject: [PATCH] Even more `tractor._addr.Address` simplifying Namely reducing the duplication of class-fields and `TypeVar`s used for parametrizing the `Address` protocol type, - drop all of the `TypeVar` types and just stick with all concrete addrs types inheriting from `Address` only. - rename `Address.name_key` -> `.proto_key`. - rename `Address.address_type` -> `.unwrapped_type` - rename `.namespace` -> `.bindspace` to better reflect that this "part" of the address represents the possible "space for binding endpoints". |_ also linux already uses "namespace" to mean the `netns` and i'd prefer to stick with their semantics for that. - add `TCPAddress/UDSAddress.def_bindspace` values. - drop commented `.open_stream()` method; never used. - simplify `UnwrappedAdress` to just a `tuple` of union types. - add logging to `USDAddress.open_listener()` for now. - adjust `tractor.ipc/_uds/tcp` transport to use new addr field names. --- tractor/_addr.py | 289 ++++++++++++++++++++++++------------------- tractor/ipc/_chan.py | 2 +- tractor/ipc/_tcp.py | 2 +- tractor/ipc/_uds.py | 10 +- 4 files changed, 166 insertions(+), 137 deletions(-) diff --git a/tractor/_addr.py b/tractor/_addr.py index f6bb5e2a..d5b8c81b 100644 --- a/tractor/_addr.py +++ b/tractor/_addr.py @@ -21,54 +21,121 @@ from uuid import uuid4 from typing import ( Protocol, ClassVar, - TypeVar, - Union, + # TypeVar, + # Union, Type, TYPE_CHECKING, ) from bidict import bidict -import trio -from trio import socket +# import trio +from trio import ( + socket, + SocketListener, + open_tcp_listeners, +) +from .log import get_logger from ._state import ( get_rt_dir, current_actor, + is_root_process, ) if TYPE_CHECKING: from ._runtime import Actor - -NamespaceType = TypeVar('NamespaceType') -AddressType = TypeVar('AddressType') -StreamType = TypeVar('StreamType') -ListenerType = TypeVar('ListenerType') +log = get_logger(__name__) -class Address(Protocol[ - NamespaceType, - AddressType, - StreamType, - ListenerType -]): +# TODO, maybe breakout the netns key to a struct? +# class NetNs(Struct)[str, int]: +# ... - name_key: ClassVar[str] - address_type: ClassVar[Type[AddressType]] +# TODO, can't we just use a type alias +# for this? namely just some `tuple[str, int, str, str]`? +# +# -[ ] would also just be simpler to keep this as SockAddr[tuple] +# or something, implying it's just a simple pair of values which can +# presumably be mapped to all transports? +# -[ ] `pydoc socket.socket.getsockname()` delivers a 4-tuple for +# ipv6 `(hostaddr, port, flowinfo, scope_id)`.. so how should we +# handle that? +# -[ ] as a further alternative to this wrap()/unwrap() approach we +# could just implement `enc/dec_hook()`s for the `Address`-types +# and just deal with our internal objs directly and always and +# leave it to the codec layer to figure out marshalling? +# |_ would mean only one spot to do the `.unwrap()` (which we may +# end up needing to call from the hook()s anyway?) +# -[x] rename to `UnwrappedAddress[Descriptor]` ?? +# seems like the right name as per, +# https://www.geeksforgeeks.org/introduction-to-address-descriptor/ +# +UnwrappedAddress = ( + # tcp/udp/uds + tuple[ + str, # host/domain(tcp), filesys-dir(uds) + int|str, # port/path(uds) + ] + # ?TODO? should we also include another 2 fields from + # our `Aid` msg such that we include the runtime `Actor.uid` + # of `.name` and `.uuid`? + # - would ensure uniqueness across entire net? + # - allows for easier runtime-level filtering of "actors by + # service name" +) + +# TODO, maybe rename to `SocketAddress`? +class Address(Protocol): + proto_key: ClassVar[str] + unwrapped_type: ClassVar[UnwrappedAddress] + + # TODO, i feel like an `.is_bound()` is a better thing to + # support? + # Lke, what use does this have besides a noop and if it's not + # valid why aren't we erroring on creation/use? @property def is_valid(self) -> bool: ... + # TODO, maybe `.netns` is a better name? @property - def namespace(self) -> NamespaceType|None: + def namespace(self) -> tuple[str, int]|None: + ''' + The if-available, OS-specific "network namespace" key. + + ''' + ... + + @property + def bindspace(self) -> str: + ''' + Deliver the socket address' "bindable space" from + a `socket.socket.bind()` and thus from the perspective of + specific transport protocol domain. + + I.e. for most (layer-4) network-socket protocols this is + normally the ipv4/6 address, for UDS this is normally + a filesystem (sub-directory). + + For (distributed) network protocols this is normally the routing + layer's domain/(ip-)address, though it might also include a "network namespace" + key different then the default. + + For local-host-only transports this is either an explicit + namespace (with types defined by the OS: netns, Cgroup, IPC, + pid, etc. on linux) or failing that the sub-directory in the + filesys in which socket/shm files are located *under*. + + ''' ... @classmethod - def from_addr(cls, addr: AddressType) -> Address: + def from_addr(cls, addr: UnwrappedAddress) -> Address: ... - def unwrap(self) -> AddressType: + def unwrap(self) -> UnwrappedAddress: ''' Deliver the underying minimum field set in a primitive python data type-structure. @@ -76,7 +143,11 @@ class Address(Protocol[ ... @classmethod - def get_random(cls, namespace: NamespaceType | None = None) -> Address: + def get_random( + cls, + current_actor: Actor, + bindspace: str|None = None, + ) -> Address: ... # TODO, this should be something like a `.get_def_registar_addr()` @@ -97,30 +168,20 @@ class Address(Protocol[ def __eq__(self, other) -> bool: ... - # async def open_stream(self, **kwargs) -> StreamType: - # ''' - # Open a connection *TO* this address and deliver back a - # `trio.SocketStream` wrapping the underlying transport. - - # ''' - # ... - - async def open_listener(self, **kwargs) -> ListenerType: + async def open_listener( + self, + **kwargs, + ) -> SocketListener: ... async def close_listener(self): ... -class TCPAddress(Address[ - str, - tuple[str, int], - trio.SocketStream, - trio.SocketListener -]): - - name_key: str = 'tcp' - address_type: type = tuple[str, int] +class TCPAddress(Address): + proto_key: str = 'tcp' + unwrapped_type: type = tuple[str, int] + def_bindspace: str = '127.0.0.1' def __init__( self, @@ -144,7 +205,11 @@ class TCPAddress(Address[ return self._port != 0 @property - def namespace(self) -> str: + def bindspace(self) -> str: + return self._host + + @property + def domain(self) -> str: return self._host @classmethod @@ -163,13 +228,17 @@ class TCPAddress(Address[ @classmethod def get_random( cls, - namespace: str = '127.0.0.1', + current_actor: Actor, + bindspace: str = def_bindspace, ) -> TCPAddress: - return TCPAddress(namespace, 0) + return TCPAddress(bindspace, 0) @classmethod def get_root(cls) -> Address: - return TCPAddress('127.0.0.1', 1616) + return TCPAddress( + '127.0.0.1', + 1616, + ) def __repr__(self) -> str: return ( @@ -188,17 +257,11 @@ class TCPAddress(Address[ self._port == other._port ) - # async def open_stream(self, **kwargs) -> trio.SocketStream: - # stream = await trio.open_tcp_stream( - # self._host, - # self._port, - # **kwargs - # ) - # self._host, self._port = stream.socket.getsockname()[:2] - # return stream - - async def open_listener(self, **kwargs) -> trio.SocketListener: - listeners = await trio.open_tcp_listeners( + async def open_listener( + self, + **kwargs, + ) -> SocketListener: + listeners: list[SocketListener] = await open_tcp_listeners( host=self._host, port=self._port, **kwargs @@ -212,18 +275,15 @@ class TCPAddress(Address[ ... -class UDSAddress(Address[ - None, - str, - trio.SocketStream, - trio.SocketListener -]): - - # TODO, maybe we should use 'unix' instead? +class UDSAddress(Address): + # TODO, maybe we should use better field and value + # -[x] really this is a `.protocol_key` not a "name" of anything. + # -[ ] consider a 'unix' proto-key instead? # -[ ] need to check what other mult-transport frameworks do # like zmq, nng, uri-spec et al! - name_key: str = 'uds' - address_type: type = tuple[str, int] + proto_key: str = 'uds' + unwrapped_type: type = tuple[str, int] + def_bindspace: Path = get_rt_dir() def __init__( self, @@ -239,11 +299,20 @@ class UDSAddress(Address[ @property def is_valid(self) -> bool: - return True + ''' + We block socket files not allocated under the runtime subdir. + + ''' + return self.bindspace in self._filepath.parents @property - def namespace(self) -> None: - return + def bindspace(self) -> Path: + ''' + We replicate the "ip-set-of-hosts" part of a UDS socket as + just the sub-directory in which we allocate socket files. + + ''' + return self.def_bindspace @classmethod def from_addr( @@ -259,17 +328,17 @@ class UDSAddress(Address[ return ( str(self._filepath), # XXX NOTE, since this gets passed DIRECTLY to - # `trio.open_unix_ + # `open_unix_socket_w_passcred()` above! self._pid, ) @classmethod def get_random( cls, - namespace: None = None, # unused + bindspace: Path|None = None, # default netns ) -> UDSAddress: - rt_dir: Path = get_rt_dir() + bs: Path = bindspace or get_rt_dir() pid: int = os.getpid() actor: Actor|None = current_actor( err_on_no_runtime=False, @@ -277,9 +346,12 @@ class UDSAddress(Address[ if actor: sockname: str = '::'.join(actor.uid) + f'@{pid}' else: - sockname: str = f'@{pid}' + prefix: str = '' + if is_root_process(): + prefix: str = 'root' + sockname: str = f'{prefix}@{pid}' - sockpath: Path = Path(f'{rt_dir}/{sockname}.sock') + sockpath: Path = Path(f'{bs}/{sockname}.sock') return UDSAddress( # filename=f'{tempfile.gettempdir()}/{uuid4()}.sock' filepath=sockpath, @@ -314,28 +386,30 @@ class UDSAddress(Address[ return self._filepath == other._filepath - # TODO? remove right, it's never used? - # - # async def open_stream( - # self, - # **kwargs, - # ) -> trio.SocketStream: - # stream: trio.SocketStream = await trio.open_unix_socket( - # self._filepath, - # **kwargs - # ) - # return stream - - async def open_listener(self, **kwargs) -> trio.SocketListener: + # async def open_listener(self, **kwargs) -> SocketListener: + async def open_listener( + self, + **kwargs, + ) -> SocketListener: self._sock = socket.socket( socket.AF_UNIX, socket.SOCK_STREAM ) + log.info( + f'Attempting to bind UDS socket\n' + f'>[\n' + f'|_{self}\n' + ) await self._sock.bind(self._filepath) self._sock.listen(1) - return trio.SocketListener(self._sock) + log.info( + f'Listening on UDS socket\n' + f'[>\n' + f' |_{self}\n' + ) + return SocketListener(self._sock) - async def close_listener(self): + def close_listener(self): self._sock.close() os.unlink(self._filepath) @@ -349,50 +423,13 @@ _address_types: bidict[str, Type[Address]] = { } -# TODO, can't we just use a type alias -# for this? namely just some `tuple[str, int, str, str]`? -# -# -[ ] would also just be simpler to keep this as SockAddr[tuple] -# or something, implying it's just a simple pair of values which can -# presumably be mapped to all transports? -# -[ ] `pydoc socket.socket.getsockname()` delivers a 4-tuple for -# ipv6 `(hostaddr, port, flowinfo, scope_id)`.. so how should we -# handle that? -# -[ ] as a further alternative to this wrap()/unwrap() approach we -# could just implement `enc/dec_hook()`s for the `Address`-types -# and just deal with our internal objs directly and always and -# leave it to the codec layer to figure out marshalling? -# |_ would mean only one spot to do the `.unwrap()` (which we may -# end up needing to call from the hook()s anyway?) -# -[x] rename to `UnwrappedAddress[Descriptor]` ?? -# seems like the right name as per, -# https://www.geeksforgeeks.org/introduction-to-address-descriptor/ -# -UnwrappedAddress = Union[ - tuple[ - str, # (net/cgroup-)namespace/host-domain - int, # (p)id/port - ] # tcp/udp/uds - - # ?TODO? should we also include another 2 fields from - # our `Aid` msg such that we include the runtime `Actor.uid` - # of `.name` and `.uuid`? - # - would ensure uniqueness across entire net? - # - allows for easier runtime-level filtering of "actors by - # service name" -] - - # TODO! really these are discovery sys default addrs ONLY useful for # when none is provided to a root actor on first boot. _default_lo_addrs: dict[ str, UnwrappedAddress ] = { - 'tcp': TCPAddress( - host='127.0.0.1', - port=1616, - ).unwrap(), + 'tcp': TCPAddress.get_root().unwrap(), 'uds': UDSAddress.get_root().unwrap(), } @@ -435,7 +472,7 @@ def wrap_address( case None: cls: Type[Address] = get_address_cls(preferred_transport) - addr: AddressType = cls.get_root().unwrap() + addr: UnwrappedAddress = cls.get_root().unwrap() case _: raise TypeError( diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 1175cbb6..01baf1e1 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -187,7 +187,7 @@ class Channel: f' uid={self.uid}\n' f'\n' f' |_msgstream: {tpt_name}\n' - f' proto={tpt.laddr.name_key!r}\n' + f' proto={tpt.laddr.proto_key!r}\n' f' layer={tpt.layer_key!r}\n' f' laddr={tpt.laddr}\n' f' raddr={tpt.raddr}\n' diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py index eb2003ec..61e6f3e6 100644 --- a/tractor/ipc/_tcp.py +++ b/tractor/ipc/_tcp.py @@ -51,7 +51,7 @@ class MsgpackTCPStream(MsgpackTransport): # choosing the routing prefix part. f'/ipv4/{host}' - f'/{self.address_type.name_key}/{port}' + f'/{self.address_type.proto_key}/{port}' # f'/{self.chan.uid[0]}' # f'/{self.cid}' diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py index 894e3fbc..918e930c 100644 --- a/tractor/ipc/_uds.py +++ b/tractor/ipc/_uds.py @@ -119,7 +119,7 @@ class MsgpackUDSStream(MsgpackTransport): filepath: Path = Path(self.raddr.unwrap()[0]) return ( - f'/{self.address_type.name_key}/{filepath}' + f'/{self.address_type.proto_key}/{filepath}' # f'/{self.chan.uid[0]}' # f'/{self.cid}' @@ -190,14 +190,6 @@ class MsgpackUDSStream(MsgpackTransport): uid, gid, ) = get_peer_info(sock) - log.info( - f'UDS connection from process {pid!r}\n' - f'>[\n' - f'|_{sock_path}\n' - f' |_pid: {pid}\n' - f' |_uid: {uid}\n' - f' |_gid: {gid}\n' - ) laddr = UDSAddress.from_addr(( sock_path, os.getpid(),