From c9e9a3949f5a762b41aabd1212ba0a9a8947093f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Apr 2025 18:07:58 -0400 Subject: [PATCH] Move concrete `Address`es to each tpt module That is moving from `._addr`, - `TCPAddress` to `.ipc._tcp` - `UDSAddress` to `.ipc._uds` Obviously this requires adjusting a buncha stuff in `._addr` to avoid import cycles (the original reason the module was not also included in the new `.ipc` subpkg) including, - avoiding "unnecessary" imports of `[Unwrapped]Address` in various modules. * since `Address` is a protocol and the main point is that it **does not need to be inherited** per (https://typing.python.org/en/latest/spec/protocol.html#terminology) thus I removed the need for it in both transport submods. * and `UnwrappedAddress` is a type alias for tuples.. so we don't really always need to be importing it since it also kinda obfuscates what the underlying pairs are. - not exporting everything in submods at the `.ipc` top level and importing from specific submods by default. - only importing various types under a `if typing.TYPE_CHECKING:` guard as needed. --- tests/test_ringbuf.py | 7 +- tractor/_addr.py | 305 +------------------------------------- tractor/_context.py | 2 +- tractor/ipc/__init__.py | 37 +---- tractor/ipc/_chan.py | 7 +- tractor/ipc/_fd_share.py | 163 ++++++++++++++++++++ tractor/ipc/_tcp.py | 108 +++++++++++++- tractor/ipc/_transport.py | 19 ++- tractor/ipc/_types.py | 31 +++- tractor/ipc/_uds.py | 212 +++++++++++++++++++++++++- tractor/msg/types.py | 6 +- 11 files changed, 535 insertions(+), 362 deletions(-) create mode 100644 tractor/ipc/_fd_share.py diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py index 287a0501..0d3b420b 100644 --- a/tests/test_ringbuf.py +++ b/tests/test_ringbuf.py @@ -2,14 +2,17 @@ import time import trio import pytest + import tractor -from tractor.ipc import ( +from tractor.ipc._ringbuf import ( open_ringbuf, RBToken, RingBuffSender, RingBuffReceiver ) -from tractor._testing.samples import generate_sample_messages +from tractor._testing.samples import ( + generate_sample_messages, +) # in case you don't want to melt your cores, uncomment dis! pytestmark = pytest.mark.skip diff --git a/tractor/_addr.py b/tractor/_addr.py index eaf4c202..d8d11227 100644 --- a/tractor/_addr.py +++ b/tractor/_addr.py @@ -14,34 +14,25 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations -from pathlib import Path -import os -# import tempfile from uuid import uuid4 from typing import ( Protocol, ClassVar, - # TypeVar, - # Union, Type, TYPE_CHECKING, ) from bidict import bidict -# import trio from trio import ( - socket, SocketListener, - open_tcp_listeners, ) from .log import get_logger from ._state import ( - get_rt_dir, - current_actor, - is_root_process, _def_tpt_proto, ) +from .ipc._tcp import TCPAddress +from .ipc._uds import UDSAddress if TYPE_CHECKING: from ._runtime import Actor @@ -179,298 +170,6 @@ class Address(Protocol): ... -class TCPAddress(Address): - proto_key: str = 'tcp' - unwrapped_type: type = tuple[str, int] - def_bindspace: str = '127.0.0.1' - - def __init__( - self, - host: str, - port: int - ): - if ( - not isinstance(host, str) - or - not isinstance(port, int) - ): - raise TypeError( - f'Expected host {host!r} to be str and port {port!r} to be int' - ) - - self._host: str = host - self._port: int = port - - @property - def is_valid(self) -> bool: - return self._port != 0 - - @property - def bindspace(self) -> str: - return self._host - - @property - def domain(self) -> str: - return self._host - - @classmethod - def from_addr( - cls, - addr: tuple[str, int] - ) -> TCPAddress: - match addr: - case (str(), int()): - return TCPAddress(addr[0], addr[1]) - case _: - raise ValueError( - f'Invalid unwrapped address for {cls}\n' - f'{addr}\n' - ) - - def unwrap(self) -> tuple[str, int]: - return ( - self._host, - self._port, - ) - - @classmethod - def get_random( - cls, - bindspace: str = def_bindspace, - ) -> TCPAddress: - return TCPAddress(bindspace, 0) - - @classmethod - def get_root(cls) -> Address: - return TCPAddress( - '127.0.0.1', - 1616, - ) - - def __repr__(self) -> str: - return ( - f'{type(self).__name__}[{self.unwrap()}]' - ) - - def __eq__(self, other) -> bool: - if not isinstance(other, TCPAddress): - raise TypeError( - f'Can not compare {type(other)} with {type(self)}' - ) - - return ( - self._host == other._host - and - self._port == other._port - ) - - async def open_listener( - self, - **kwargs, - ) -> SocketListener: - listeners: list[SocketListener] = await open_tcp_listeners( - host=self._host, - port=self._port, - **kwargs - ) - assert len(listeners) == 1 - listener = listeners[0] - self._host, self._port = listener.socket.getsockname()[:2] - return listener - - async def close_listener(self): - ... - - -def unwrap_sockpath( - sockpath: Path, -) -> tuple[Path, Path]: - return ( - sockpath.parent, - sockpath.name, - ) - - -class UDSAddress(Address): - # TODO, maybe we should use better field and value - # -[x] really this is a `.protocol_key` not a "name" of anything. - # -[ ] consider a 'unix' proto-key instead? - # -[ ] need to check what other mult-transport frameworks do - # like zmq, nng, uri-spec et al! - proto_key: str = 'uds' - unwrapped_type: type = tuple[str, int] - def_bindspace: Path = get_rt_dir() - - def __init__( - self, - filedir: Path|str|None, - # TODO, i think i want `.filename` here? - filename: str|Path, - - # XXX, in the sense you can also pass - # a "non-real-world-process-id" such as is handy to represent - # our host-local default "port-like" key for the very first - # root actor to create a registry address. - maybe_pid: int|None = None, - ): - fdir = self._filedir = Path( - filedir - or - self.def_bindspace - ).absolute() - fpath = self._filename = Path(filename) - fp: Path = fdir / fpath - assert ( - fp.is_absolute() - and - fp == self.sockpath - ) - - # to track which "side" is the peer process by reading socket - # credentials-info. - self._pid: int = maybe_pid - - @property - def sockpath(self) -> Path: - return self._filedir / self._filename - - @property - def is_valid(self) -> bool: - ''' - We block socket files not allocated under the runtime subdir. - - ''' - return self.bindspace in self.sockpath.parents - - @property - def bindspace(self) -> Path: - ''' - We replicate the "ip-set-of-hosts" part of a UDS socket as - just the sub-directory in which we allocate socket files. - - ''' - return self._filedir or self.def_bindspace - - @classmethod - def from_addr( - cls, - addr: ( - tuple[Path|str, Path|str]|Path|str - ), - ) -> UDSAddress: - match addr: - case tuple()|list(): - filedir = Path(addr[0]) - filename = Path(addr[1]) - # sockpath: Path = Path(addr[0]) - # filedir, filename = unwrap_sockpath(sockpath) - # pid: int = addr[1] - return UDSAddress( - filedir=filedir, - filename=filename, - # maybe_pid=pid, - ) - # NOTE, in case we ever decide to just `.unwrap()` - # to a `Path|str`? - case str()|Path(): - sockpath: Path = Path(addr) - return UDSAddress(*unwrap_sockpath(sockpath)) - case _: - # import pdbp; pdbp.set_trace() - raise TypeError( - f'Bad unwrapped-address for {cls} !\n' - f'{addr!r}\n' - ) - - def unwrap(self) -> tuple[str, int]: - # XXX NOTE, since this gets passed DIRECTLY to - # `.ipc._uds.open_unix_socket_w_passcred()` - return ( - str(self._filedir), - str(self._filename), - ) - - @classmethod - def get_random( - cls, - bindspace: Path|None = None, # default netns - ) -> UDSAddress: - - filedir: Path = bindspace or cls.def_bindspace - pid: int = os.getpid() - actor: Actor|None = current_actor( - err_on_no_runtime=False, - ) - if actor: - sockname: str = '::'.join(actor.uid) + f'@{pid}' - else: - prefix: str = '' - if is_root_process(): - prefix: str = 'root' - sockname: str = f'{prefix}@{pid}' - - sockpath: Path = Path(f'{sockname}.sock') - return UDSAddress( - filedir=filedir, - filename=sockpath, - maybe_pid=pid, - ) - - @classmethod - def get_root(cls) -> Address: - def_uds_filename: Path = 'registry@1616.sock' - return UDSAddress( - filedir=None, - filename=def_uds_filename, - # maybe_pid=1616, - ) - - def __repr__(self) -> str: - return ( - f'{type(self).__name__}' - f'[' - f'({self._filedir}, {self._filename})' - f']' - ) - - def __eq__(self, other) -> bool: - if not isinstance(other, UDSAddress): - raise TypeError( - f'Can not compare {type(other)} with {type(self)}' - ) - - return self.sockpath == other.sockpath - - # async def open_listener(self, **kwargs) -> SocketListener: - async def open_listener( - self, - **kwargs, - ) -> SocketListener: - sock = self._sock = socket.socket( - socket.AF_UNIX, - socket.SOCK_STREAM - ) - log.info( - f'Attempting to bind UDS socket\n' - f'>[\n' - f'|_{self}\n' - ) - - bindpath: Path = self.sockpath - await sock.bind(str(bindpath)) - sock.listen(1) - log.info( - f'Listening on UDS socket\n' - f'[>\n' - f' |_{self}\n' - ) - return SocketListener(self._sock) - - def close_listener(self): - self._sock.close() - os.unlink(self.sockpath) - - _address_types: bidict[str, Type[Address]] = { 'tcp': TCPAddress, 'uds': UDSAddress diff --git a/tractor/_context.py b/tractor/_context.py index 53f5b233..e5cce1ec 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -105,7 +105,7 @@ from ._state import ( if TYPE_CHECKING: from ._portal import Portal from ._runtime import Actor - from .ipc import MsgTransport + from .ipc._transport import MsgTransport from .devx._frame_stack import ( CallerInfo, ) diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py index f1cb8e8b..2c6c3b5d 100644 --- a/tractor/ipc/__init__.py +++ b/tractor/ipc/__init__.py @@ -13,43 +13,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import platform -from ._transport import ( - MsgTransportKey as MsgTransportKey, - MsgType as MsgType, - MsgTransport as MsgTransport, - MsgpackTransport as MsgpackTransport -) - -from ._tcp import MsgpackTCPStream as MsgpackTCPStream -from ._uds import MsgpackUDSStream as MsgpackUDSStream - -from ._types import ( - transport_from_addr as transport_from_addr, - transport_from_stream as transport_from_stream, -) +''' +A modular IPC layer supporting the power of cross-process SC! +''' from ._chan import ( _connect_chan as _connect_chan, Channel as Channel ) - -if platform.system() == 'Linux': - from ._linux import ( - EFD_SEMAPHORE as EFD_SEMAPHORE, - EFD_CLOEXEC as EFD_CLOEXEC, - EFD_NONBLOCK as EFD_NONBLOCK, - open_eventfd as open_eventfd, - write_eventfd as write_eventfd, - read_eventfd as read_eventfd, - close_eventfd as close_eventfd, - EventFD as EventFD, - ) - - from ._ringbuf import ( - RBToken as RBToken, - RingBuffSender as RingBuffSender, - RingBuffReceiver as RingBuffReceiver, - open_ringbuf as open_ringbuf - ) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index f6a50cc1..00c749e1 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -29,13 +29,13 @@ from pprint import pformat import typing from typing import ( Any, + TYPE_CHECKING, ) import warnings import trio -from tractor.ipc._transport import MsgTransport -from tractor.ipc._types import ( +from ._types import ( transport_from_addr, transport_from_stream, ) @@ -55,6 +55,9 @@ from tractor.msg import ( MsgCodec, ) +if TYPE_CHECKING: + from ._transport import MsgTransport + log = get_logger(__name__) diff --git a/tractor/ipc/_fd_share.py b/tractor/ipc/_fd_share.py new file mode 100644 index 00000000..e51069ba --- /dev/null +++ b/tractor/ipc/_fd_share.py @@ -0,0 +1,163 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +File-descriptor-sharing on `linux` by "wilhelm_of_bohemia". + +''' +from __future__ import annotations +import os +import array +import socket +import tempfile +from pathlib import Path +from contextlib import ExitStack + +import trio +import tractor +from tractor.ipc import RBToken + + +actor_name = 'ringd' + + +_rings: dict[str, dict] = {} + + +async def _attach_to_ring( + ring_name: str +) -> tuple[int, int, int]: + actor = tractor.current_actor() + + fd_amount = 3 + sock_path = ( + Path(tempfile.gettempdir()) + / + f'{os.getpid()}-pass-ring-fds-{ring_name}-to-{actor.name}.sock' + ) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(sock_path) + sock.listen(1) + + async with ( + tractor.find_actor(actor_name) as ringd, + ringd.open_context( + _pass_fds, + name=ring_name, + sock_path=sock_path + ) as (ctx, _sent) + ): + # prepare array to receive FD + fds = array.array("i", [0] * fd_amount) + + conn, _ = sock.accept() + + # receive FD + msg, ancdata, flags, addr = conn.recvmsg( + 1024, + socket.CMSG_LEN(fds.itemsize * fd_amount) + ) + + for ( + cmsg_level, + cmsg_type, + cmsg_data, + ) in ancdata: + if ( + cmsg_level == socket.SOL_SOCKET + and + cmsg_type == socket.SCM_RIGHTS + ): + fds.frombytes(cmsg_data[:fds.itemsize * fd_amount]) + break + else: + raise RuntimeError("Receiver: No FDs received") + + conn.close() + sock.close() + sock_path.unlink() + + return RBToken.from_msg( + await ctx.wait_for_result() + ) + + +@tractor.context +async def _pass_fds( + ctx: tractor.Context, + name: str, + sock_path: str +) -> RBToken: + global _rings + token = _rings[name] + client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + client.connect(sock_path) + await ctx.started() + fds = array.array('i', token.fds) + client.sendmsg([b'FDs'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) + client.close() + return token + + +@tractor.context +async def _open_ringbuf( + ctx: tractor.Context, + name: str, + buf_size: int +) -> RBToken: + global _rings + is_owner = False + if name not in _rings: + stack = ExitStack() + token = stack.enter_context( + tractor.open_ringbuf( + name, + buf_size=buf_size + ) + ) + _rings[name] = { + 'token': token, + 'stack': stack, + } + is_owner = True + + ring = _rings[name] + await ctx.started() + + try: + await trio.sleep_forever() + + except tractor.ContextCancelled: + ... + + finally: + if is_owner: + ring['stack'].close() + + +async def open_ringbuf( + name: str, + buf_size: int +) -> RBToken: + async with ( + tractor.find_actor(actor_name) as ringd, + ringd.open_context( + _open_ringbuf, + name=name, + buf_size=buf_size + ) as (rd_ctx, _) + ): + yield await _attach_to_ring(name) + await rd_ctx.cancel() diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py index 61e6f3e6..dbecdf5e 100644 --- a/tractor/ipc/_tcp.py +++ b/tractor/ipc/_tcp.py @@ -20,16 +20,122 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol from __future__ import annotations import trio +from trio import ( + SocketListener, + open_tcp_listeners, +) from tractor.msg import MsgCodec from tractor.log import get_logger -from tractor._addr import TCPAddress from tractor.ipc._transport import MsgpackTransport log = get_logger(__name__) +class TCPAddress: + proto_key: str = 'tcp' + unwrapped_type: type = tuple[str, int] + def_bindspace: str = '127.0.0.1' + + def __init__( + self, + host: str, + port: int + ): + if ( + not isinstance(host, str) + or + not isinstance(port, int) + ): + raise TypeError( + f'Expected host {host!r} to be str and port {port!r} to be int' + ) + + self._host: str = host + self._port: int = port + + @property + def is_valid(self) -> bool: + return self._port != 0 + + @property + def bindspace(self) -> str: + return self._host + + @property + def domain(self) -> str: + return self._host + + @classmethod + def from_addr( + cls, + addr: tuple[str, int] + ) -> TCPAddress: + match addr: + case (str(), int()): + return TCPAddress(addr[0], addr[1]) + case _: + raise ValueError( + f'Invalid unwrapped address for {cls}\n' + f'{addr}\n' + ) + + def unwrap(self) -> tuple[str, int]: + return ( + self._host, + self._port, + ) + + @classmethod + def get_random( + cls, + bindspace: str = def_bindspace, + ) -> TCPAddress: + return TCPAddress(bindspace, 0) + + @classmethod + def get_root(cls) -> TCPAddress: + return TCPAddress( + '127.0.0.1', + 1616, + ) + + def __repr__(self) -> str: + return ( + f'{type(self).__name__}[{self.unwrap()}]' + ) + + def __eq__(self, other) -> bool: + if not isinstance(other, TCPAddress): + raise TypeError( + f'Can not compare {type(other)} with {type(self)}' + ) + + return ( + self._host == other._host + and + self._port == other._port + ) + + async def open_listener( + self, + **kwargs, + ) -> SocketListener: + listeners: list[SocketListener] = await open_tcp_listeners( + host=self._host, + port=self._port, + **kwargs + ) + assert len(listeners) == 1 + listener = listeners[0] + self._host, self._port = listener.socket.getsockname()[:2] + return listener + + async def close_listener(self): + ... + + # TODO: typing oddity.. not sure why we have to inherit here, but it # seems to be an issue with `get_msg_transport()` returning # a `Type[Protocol]`; probably should make a `mypy` issue? diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 160423c8..2a9926f9 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -14,8 +14,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' -typing.Protocol based generic msg API, implement this class to add backends for -tractor.ipc.Channel +typing.Protocol based generic msg API, implement this class to add +backends for tractor.ipc.Channel ''' from __future__ import annotations @@ -23,8 +23,9 @@ from typing import ( runtime_checkable, Type, Protocol, - TypeVar, - ClassVar + # TypeVar, + ClassVar, + TYPE_CHECKING, ) from collections.abc import ( AsyncGenerator, @@ -47,10 +48,13 @@ from tractor.msg import ( _ctxvar_MsgCodec, # _codec, XXX see `self._codec` sanity/debug checks MsgCodec, + MsgType, types as msgtypes, pretty_struct, ) -from tractor._addr import Address + +if TYPE_CHECKING: + from tractor._addr import Address log = get_logger(__name__) @@ -63,12 +67,13 @@ MsgTransportKey = tuple[str, str] # ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? # => BLEH, except can't bc prots must inherit typevar or param-spec # vars.. -MsgType = TypeVar('MsgType') +# MsgType = TypeVar('MsgType') @runtime_checkable -class MsgTransport(Protocol[MsgType]): +class MsgTransport(Protocol): # +# class MsgTransport(Protocol[MsgType]): # ^-TODO-^ consider using a generic def and indexing with our # eventual msg definition/types? # - https://docs.python.org/3/library/typing.html#typing.Protocol diff --git a/tractor/ipc/_types.py b/tractor/ipc/_types.py index 1b86636d..8d543d9d 100644 --- a/tractor/ipc/_types.py +++ b/tractor/ipc/_types.py @@ -13,19 +13,37 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Type + +''' +IPC subsys type-lookup helpers? + +''' +from typing import ( + Type, + # TYPE_CHECKING, +) import trio import socket -from tractor._addr import Address from tractor.ipc._transport import ( MsgTransportKey, MsgTransport ) -from tractor.ipc._tcp import MsgpackTCPStream -from tractor.ipc._uds import MsgpackUDSStream +from tractor.ipc._tcp import ( + TCPAddress, + MsgpackTCPStream, +) +from tractor.ipc._uds import ( + UDSAddress, + MsgpackUDSStream, +) +# if TYPE_CHECKING: +# from tractor._addr import Address + + +Address = TCPAddress|UDSAddress # manually updated list of all supported msg transport types _msg_transports = [ @@ -41,7 +59,10 @@ _key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = { } # convert an Address wrapper to its corresponding transport type -_addr_to_transport: dict[Type[Address], Type[MsgTransport]] = { +_addr_to_transport: dict[ + Type[TCPAddress|UDSAddress], + Type[MsgTransport] +] = { cls.address_type: cls for cls in _msg_transports } diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py index 3d24447b..33843f6a 100644 --- a/tractor/ipc/_uds.py +++ b/tractor/ipc/_uds.py @@ -29,8 +29,15 @@ from socket import ( SOL_SOCKET, ) import struct +from typing import ( + TYPE_CHECKING, +) import trio +from trio import ( + socket, + SocketListener, +) from trio._highlevel_open_unix_stream import ( close_on_error, has_unix, @@ -38,16 +45,211 @@ from trio._highlevel_open_unix_stream import ( from tractor.msg import MsgCodec from tractor.log import get_logger -from tractor._addr import ( - UDSAddress, - unwrap_sockpath, +from tractor.ipc._transport import ( + MsgpackTransport, ) -from tractor.ipc._transport import MsgpackTransport +from .._state import ( + get_rt_dir, + current_actor, + is_root_process, +) + +if TYPE_CHECKING: + from ._runtime import Actor log = get_logger(__name__) +def unwrap_sockpath( + sockpath: Path, +) -> tuple[Path, Path]: + return ( + sockpath.parent, + sockpath.name, + ) + + +class UDSAddress: + # TODO, maybe we should use better field and value + # -[x] really this is a `.protocol_key` not a "name" of anything. + # -[ ] consider a 'unix' proto-key instead? + # -[ ] need to check what other mult-transport frameworks do + # like zmq, nng, uri-spec et al! + proto_key: str = 'uds' + unwrapped_type: type = tuple[str, int] + def_bindspace: Path = get_rt_dir() + + def __init__( + self, + filedir: Path|str|None, + # TODO, i think i want `.filename` here? + filename: str|Path, + + # XXX, in the sense you can also pass + # a "non-real-world-process-id" such as is handy to represent + # our host-local default "port-like" key for the very first + # root actor to create a registry address. + maybe_pid: int|None = None, + ): + fdir = self._filedir = Path( + filedir + or + self.def_bindspace + ).absolute() + fpath = self._filename = Path(filename) + fp: Path = fdir / fpath + assert ( + fp.is_absolute() + and + fp == self.sockpath + ) + + # to track which "side" is the peer process by reading socket + # credentials-info. + self._pid: int = maybe_pid + + @property + def sockpath(self) -> Path: + return self._filedir / self._filename + + @property + def is_valid(self) -> bool: + ''' + We block socket files not allocated under the runtime subdir. + + ''' + return self.bindspace in self.sockpath.parents + + @property + def bindspace(self) -> Path: + ''' + We replicate the "ip-set-of-hosts" part of a UDS socket as + just the sub-directory in which we allocate socket files. + + ''' + return self._filedir or self.def_bindspace + + @classmethod + def from_addr( + cls, + addr: ( + tuple[Path|str, Path|str]|Path|str + ), + ) -> UDSAddress: + match addr: + case tuple()|list(): + filedir = Path(addr[0]) + filename = Path(addr[1]) + # sockpath: Path = Path(addr[0]) + # filedir, filename = unwrap_sockpath(sockpath) + # pid: int = addr[1] + return UDSAddress( + filedir=filedir, + filename=filename, + # maybe_pid=pid, + ) + # NOTE, in case we ever decide to just `.unwrap()` + # to a `Path|str`? + case str()|Path(): + sockpath: Path = Path(addr) + return UDSAddress(*unwrap_sockpath(sockpath)) + case _: + # import pdbp; pdbp.set_trace() + raise TypeError( + f'Bad unwrapped-address for {cls} !\n' + f'{addr!r}\n' + ) + + def unwrap(self) -> tuple[str, int]: + # XXX NOTE, since this gets passed DIRECTLY to + # `.ipc._uds.open_unix_socket_w_passcred()` + return ( + str(self._filedir), + str(self._filename), + ) + + @classmethod + def get_random( + cls, + bindspace: Path|None = None, # default netns + ) -> UDSAddress: + + filedir: Path = bindspace or cls.def_bindspace + pid: int = os.getpid() + actor: Actor|None = current_actor( + err_on_no_runtime=False, + ) + if actor: + sockname: str = '::'.join(actor.uid) + f'@{pid}' + else: + prefix: str = '' + if is_root_process(): + prefix: str = 'root' + sockname: str = f'{prefix}@{pid}' + + sockpath: Path = Path(f'{sockname}.sock') + return UDSAddress( + filedir=filedir, + filename=sockpath, + maybe_pid=pid, + ) + + @classmethod + def get_root(cls) -> UDSAddress: + def_uds_filename: Path = 'registry@1616.sock' + return UDSAddress( + filedir=None, + filename=def_uds_filename, + # maybe_pid=1616, + ) + + def __repr__(self) -> str: + return ( + f'{type(self).__name__}' + f'[' + f'({self._filedir}, {self._filename})' + f']' + ) + + def __eq__(self, other) -> bool: + if not isinstance(other, UDSAddress): + raise TypeError( + f'Can not compare {type(other)} with {type(self)}' + ) + + return self.sockpath == other.sockpath + + # async def open_listener(self, **kwargs) -> SocketListener: + async def open_listener( + self, + **kwargs, + ) -> SocketListener: + sock = self._sock = socket.socket( + socket.AF_UNIX, + socket.SOCK_STREAM + ) + log.info( + f'Attempting to bind UDS socket\n' + f'>[\n' + f'|_{self}\n' + ) + + bindpath: Path = self.sockpath + await sock.bind(str(bindpath)) + sock.listen(1) + log.info( + f'Listening on UDS socket\n' + f'[>\n' + f' |_{self}\n' + ) + return SocketListener(self._sock) + + def close_listener(self): + self._sock.close() + os.unlink(self.sockpath) + + async def open_unix_socket_w_passcred( filename: str|bytes|os.PathLike[str]|os.PathLike[bytes], ) -> trio.SocketStream: @@ -214,3 +416,5 @@ class MsgpackUDSStream(MsgpackTransport): maybe_pid=peer_pid ) return (laddr, raddr) + + diff --git a/tractor/msg/types.py b/tractor/msg/types.py index d71fb7e0..86752aba 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -48,7 +48,7 @@ from tractor.msg import ( pretty_struct, ) from tractor.log import get_logger -from tractor._addr import UnwrappedAddress +# from tractor._addr import UnwrappedAddress log = get_logger('tractor.msgspec') @@ -176,8 +176,8 @@ class SpawnSpec( # TODO: not just sockaddr pairs? # -[ ] abstract into a `TransportAddr` type? - reg_addrs: list[UnwrappedAddress] - bind_addrs: list[UnwrappedAddress]|None + reg_addrs: list[tuple[str, str|int]] + bind_addrs: list[tuple[str, str|int]]|None # TODO: caps based RPC support in the payload?