diff --git a/tests/test_ringbuf.py b/tests/test_ringbuf.py
index 287a0501..0d3b420b 100644
--- a/tests/test_ringbuf.py
+++ b/tests/test_ringbuf.py
@@ -2,14 +2,17 @@ import time
import trio
import pytest
+
import tractor
-from tractor.ipc import (
+from tractor.ipc._ringbuf import (
open_ringbuf,
RBToken,
RingBuffSender,
RingBuffReceiver
)
-from tractor._testing.samples import generate_sample_messages
+from tractor._testing.samples import (
+ generate_sample_messages,
+)
# in case you don't want to melt your cores, uncomment dis!
pytestmark = pytest.mark.skip
diff --git a/tractor/_addr.py b/tractor/_addr.py
index eaf4c202..d8d11227 100644
--- a/tractor/_addr.py
+++ b/tractor/_addr.py
@@ -14,34 +14,25 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from __future__ import annotations
-from pathlib import Path
-import os
-# import tempfile
from uuid import uuid4
from typing import (
Protocol,
ClassVar,
- # TypeVar,
- # Union,
Type,
TYPE_CHECKING,
)
from bidict import bidict
-# import trio
from trio import (
- socket,
SocketListener,
- open_tcp_listeners,
)
from .log import get_logger
from ._state import (
- get_rt_dir,
- current_actor,
- is_root_process,
_def_tpt_proto,
)
+from .ipc._tcp import TCPAddress
+from .ipc._uds import UDSAddress
if TYPE_CHECKING:
from ._runtime import Actor
@@ -179,298 +170,6 @@ class Address(Protocol):
...
-class TCPAddress(Address):
- proto_key: str = 'tcp'
- unwrapped_type: type = tuple[str, int]
- def_bindspace: str = '127.0.0.1'
-
- def __init__(
- self,
- host: str,
- port: int
- ):
- if (
- not isinstance(host, str)
- or
- not isinstance(port, int)
- ):
- raise TypeError(
- f'Expected host {host!r} to be str and port {port!r} to be int'
- )
-
- self._host: str = host
- self._port: int = port
-
- @property
- def is_valid(self) -> bool:
- return self._port != 0
-
- @property
- def bindspace(self) -> str:
- return self._host
-
- @property
- def domain(self) -> str:
- return self._host
-
- @classmethod
- def from_addr(
- cls,
- addr: tuple[str, int]
- ) -> TCPAddress:
- match addr:
- case (str(), int()):
- return TCPAddress(addr[0], addr[1])
- case _:
- raise ValueError(
- f'Invalid unwrapped address for {cls}\n'
- f'{addr}\n'
- )
-
- def unwrap(self) -> tuple[str, int]:
- return (
- self._host,
- self._port,
- )
-
- @classmethod
- def get_random(
- cls,
- bindspace: str = def_bindspace,
- ) -> TCPAddress:
- return TCPAddress(bindspace, 0)
-
- @classmethod
- def get_root(cls) -> Address:
- return TCPAddress(
- '127.0.0.1',
- 1616,
- )
-
- def __repr__(self) -> str:
- return (
- f'{type(self).__name__}[{self.unwrap()}]'
- )
-
- def __eq__(self, other) -> bool:
- if not isinstance(other, TCPAddress):
- raise TypeError(
- f'Can not compare {type(other)} with {type(self)}'
- )
-
- return (
- self._host == other._host
- and
- self._port == other._port
- )
-
- async def open_listener(
- self,
- **kwargs,
- ) -> SocketListener:
- listeners: list[SocketListener] = await open_tcp_listeners(
- host=self._host,
- port=self._port,
- **kwargs
- )
- assert len(listeners) == 1
- listener = listeners[0]
- self._host, self._port = listener.socket.getsockname()[:2]
- return listener
-
- async def close_listener(self):
- ...
-
-
-def unwrap_sockpath(
- sockpath: Path,
-) -> tuple[Path, Path]:
- return (
- sockpath.parent,
- sockpath.name,
- )
-
-
-class UDSAddress(Address):
- # TODO, maybe we should use better field and value
- # -[x] really this is a `.protocol_key` not a "name" of anything.
- # -[ ] consider a 'unix' proto-key instead?
- # -[ ] need to check what other mult-transport frameworks do
- # like zmq, nng, uri-spec et al!
- proto_key: str = 'uds'
- unwrapped_type: type = tuple[str, int]
- def_bindspace: Path = get_rt_dir()
-
- def __init__(
- self,
- filedir: Path|str|None,
- # TODO, i think i want `.filename` here?
- filename: str|Path,
-
- # XXX, in the sense you can also pass
- # a "non-real-world-process-id" such as is handy to represent
- # our host-local default "port-like" key for the very first
- # root actor to create a registry address.
- maybe_pid: int|None = None,
- ):
- fdir = self._filedir = Path(
- filedir
- or
- self.def_bindspace
- ).absolute()
- fpath = self._filename = Path(filename)
- fp: Path = fdir / fpath
- assert (
- fp.is_absolute()
- and
- fp == self.sockpath
- )
-
- # to track which "side" is the peer process by reading socket
- # credentials-info.
- self._pid: int = maybe_pid
-
- @property
- def sockpath(self) -> Path:
- return self._filedir / self._filename
-
- @property
- def is_valid(self) -> bool:
- '''
- We block socket files not allocated under the runtime subdir.
-
- '''
- return self.bindspace in self.sockpath.parents
-
- @property
- def bindspace(self) -> Path:
- '''
- We replicate the "ip-set-of-hosts" part of a UDS socket as
- just the sub-directory in which we allocate socket files.
-
- '''
- return self._filedir or self.def_bindspace
-
- @classmethod
- def from_addr(
- cls,
- addr: (
- tuple[Path|str, Path|str]|Path|str
- ),
- ) -> UDSAddress:
- match addr:
- case tuple()|list():
- filedir = Path(addr[0])
- filename = Path(addr[1])
- # sockpath: Path = Path(addr[0])
- # filedir, filename = unwrap_sockpath(sockpath)
- # pid: int = addr[1]
- return UDSAddress(
- filedir=filedir,
- filename=filename,
- # maybe_pid=pid,
- )
- # NOTE, in case we ever decide to just `.unwrap()`
- # to a `Path|str`?
- case str()|Path():
- sockpath: Path = Path(addr)
- return UDSAddress(*unwrap_sockpath(sockpath))
- case _:
- # import pdbp; pdbp.set_trace()
- raise TypeError(
- f'Bad unwrapped-address for {cls} !\n'
- f'{addr!r}\n'
- )
-
- def unwrap(self) -> tuple[str, int]:
- # XXX NOTE, since this gets passed DIRECTLY to
- # `.ipc._uds.open_unix_socket_w_passcred()`
- return (
- str(self._filedir),
- str(self._filename),
- )
-
- @classmethod
- def get_random(
- cls,
- bindspace: Path|None = None, # default netns
- ) -> UDSAddress:
-
- filedir: Path = bindspace or cls.def_bindspace
- pid: int = os.getpid()
- actor: Actor|None = current_actor(
- err_on_no_runtime=False,
- )
- if actor:
- sockname: str = '::'.join(actor.uid) + f'@{pid}'
- else:
- prefix: str = ''
- if is_root_process():
- prefix: str = 'root'
- sockname: str = f'{prefix}@{pid}'
-
- sockpath: Path = Path(f'{sockname}.sock')
- return UDSAddress(
- filedir=filedir,
- filename=sockpath,
- maybe_pid=pid,
- )
-
- @classmethod
- def get_root(cls) -> Address:
- def_uds_filename: Path = 'registry@1616.sock'
- return UDSAddress(
- filedir=None,
- filename=def_uds_filename,
- # maybe_pid=1616,
- )
-
- def __repr__(self) -> str:
- return (
- f'{type(self).__name__}'
- f'['
- f'({self._filedir}, {self._filename})'
- f']'
- )
-
- def __eq__(self, other) -> bool:
- if not isinstance(other, UDSAddress):
- raise TypeError(
- f'Can not compare {type(other)} with {type(self)}'
- )
-
- return self.sockpath == other.sockpath
-
- # async def open_listener(self, **kwargs) -> SocketListener:
- async def open_listener(
- self,
- **kwargs,
- ) -> SocketListener:
- sock = self._sock = socket.socket(
- socket.AF_UNIX,
- socket.SOCK_STREAM
- )
- log.info(
- f'Attempting to bind UDS socket\n'
- f'>[\n'
- f'|_{self}\n'
- )
-
- bindpath: Path = self.sockpath
- await sock.bind(str(bindpath))
- sock.listen(1)
- log.info(
- f'Listening on UDS socket\n'
- f'[>\n'
- f' |_{self}\n'
- )
- return SocketListener(self._sock)
-
- def close_listener(self):
- self._sock.close()
- os.unlink(self.sockpath)
-
-
_address_types: bidict[str, Type[Address]] = {
'tcp': TCPAddress,
'uds': UDSAddress
diff --git a/tractor/_context.py b/tractor/_context.py
index 53f5b233..e5cce1ec 100644
--- a/tractor/_context.py
+++ b/tractor/_context.py
@@ -105,7 +105,7 @@ from ._state import (
if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
- from .ipc import MsgTransport
+ from .ipc._transport import MsgTransport
from .devx._frame_stack import (
CallerInfo,
)
diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py
index f1cb8e8b..2c6c3b5d 100644
--- a/tractor/ipc/__init__.py
+++ b/tractor/ipc/__init__.py
@@ -13,43 +13,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-import platform
-from ._transport import (
- MsgTransportKey as MsgTransportKey,
- MsgType as MsgType,
- MsgTransport as MsgTransport,
- MsgpackTransport as MsgpackTransport
-)
-
-from ._tcp import MsgpackTCPStream as MsgpackTCPStream
-from ._uds import MsgpackUDSStream as MsgpackUDSStream
-
-from ._types import (
- transport_from_addr as transport_from_addr,
- transport_from_stream as transport_from_stream,
-)
+'''
+A modular IPC layer supporting the power of cross-process SC!
+'''
from ._chan import (
_connect_chan as _connect_chan,
Channel as Channel
)
-
-if platform.system() == 'Linux':
- from ._linux import (
- EFD_SEMAPHORE as EFD_SEMAPHORE,
- EFD_CLOEXEC as EFD_CLOEXEC,
- EFD_NONBLOCK as EFD_NONBLOCK,
- open_eventfd as open_eventfd,
- write_eventfd as write_eventfd,
- read_eventfd as read_eventfd,
- close_eventfd as close_eventfd,
- EventFD as EventFD,
- )
-
- from ._ringbuf import (
- RBToken as RBToken,
- RingBuffSender as RingBuffSender,
- RingBuffReceiver as RingBuffReceiver,
- open_ringbuf as open_ringbuf
- )
diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py
index f6a50cc1..00c749e1 100644
--- a/tractor/ipc/_chan.py
+++ b/tractor/ipc/_chan.py
@@ -29,13 +29,13 @@ from pprint import pformat
import typing
from typing import (
Any,
+ TYPE_CHECKING,
)
import warnings
import trio
-from tractor.ipc._transport import MsgTransport
-from tractor.ipc._types import (
+from ._types import (
transport_from_addr,
transport_from_stream,
)
@@ -55,6 +55,9 @@ from tractor.msg import (
MsgCodec,
)
+if TYPE_CHECKING:
+ from ._transport import MsgTransport
+
log = get_logger(__name__)
diff --git a/tractor/ipc/_fd_share.py b/tractor/ipc/_fd_share.py
new file mode 100644
index 00000000..e51069ba
--- /dev/null
+++ b/tractor/ipc/_fd_share.py
@@ -0,0 +1,163 @@
+# tractor: structured concurrent "actors".
+# Copyright 2018-eternity Tyler Goodlet.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+'''
+File-descriptor-sharing on `linux` by "wilhelm_of_bohemia".
+
+'''
+from __future__ import annotations
+import os
+import array
+import socket
+import tempfile
+from pathlib import Path
+from contextlib import ExitStack
+
+import trio
+import tractor
+from tractor.ipc import RBToken
+
+
+actor_name = 'ringd'
+
+
+_rings: dict[str, dict] = {}
+
+
+async def _attach_to_ring(
+ ring_name: str
+) -> tuple[int, int, int]:
+ actor = tractor.current_actor()
+
+ fd_amount = 3
+ sock_path = (
+ Path(tempfile.gettempdir())
+ /
+ f'{os.getpid()}-pass-ring-fds-{ring_name}-to-{actor.name}.sock'
+ )
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.bind(sock_path)
+ sock.listen(1)
+
+ async with (
+ tractor.find_actor(actor_name) as ringd,
+ ringd.open_context(
+ _pass_fds,
+ name=ring_name,
+ sock_path=sock_path
+ ) as (ctx, _sent)
+ ):
+ # prepare array to receive FD
+ fds = array.array("i", [0] * fd_amount)
+
+ conn, _ = sock.accept()
+
+ # receive FD
+ msg, ancdata, flags, addr = conn.recvmsg(
+ 1024,
+ socket.CMSG_LEN(fds.itemsize * fd_amount)
+ )
+
+ for (
+ cmsg_level,
+ cmsg_type,
+ cmsg_data,
+ ) in ancdata:
+ if (
+ cmsg_level == socket.SOL_SOCKET
+ and
+ cmsg_type == socket.SCM_RIGHTS
+ ):
+ fds.frombytes(cmsg_data[:fds.itemsize * fd_amount])
+ break
+ else:
+ raise RuntimeError("Receiver: No FDs received")
+
+ conn.close()
+ sock.close()
+ sock_path.unlink()
+
+ return RBToken.from_msg(
+ await ctx.wait_for_result()
+ )
+
+
+@tractor.context
+async def _pass_fds(
+ ctx: tractor.Context,
+ name: str,
+ sock_path: str
+) -> RBToken:
+ global _rings
+ token = _rings[name]
+ client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ client.connect(sock_path)
+ await ctx.started()
+ fds = array.array('i', token.fds)
+ client.sendmsg([b'FDs'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
+ client.close()
+ return token
+
+
+@tractor.context
+async def _open_ringbuf(
+ ctx: tractor.Context,
+ name: str,
+ buf_size: int
+) -> RBToken:
+ global _rings
+ is_owner = False
+ if name not in _rings:
+ stack = ExitStack()
+ token = stack.enter_context(
+ tractor.open_ringbuf(
+ name,
+ buf_size=buf_size
+ )
+ )
+ _rings[name] = {
+ 'token': token,
+ 'stack': stack,
+ }
+ is_owner = True
+
+ ring = _rings[name]
+ await ctx.started()
+
+ try:
+ await trio.sleep_forever()
+
+ except tractor.ContextCancelled:
+ ...
+
+ finally:
+ if is_owner:
+ ring['stack'].close()
+
+
+async def open_ringbuf(
+ name: str,
+ buf_size: int
+) -> RBToken:
+ async with (
+ tractor.find_actor(actor_name) as ringd,
+ ringd.open_context(
+ _open_ringbuf,
+ name=name,
+ buf_size=buf_size
+ ) as (rd_ctx, _)
+ ):
+ yield await _attach_to_ring(name)
+ await rd_ctx.cancel()
diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py
index 61e6f3e6..dbecdf5e 100644
--- a/tractor/ipc/_tcp.py
+++ b/tractor/ipc/_tcp.py
@@ -20,16 +20,122 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol
from __future__ import annotations
import trio
+from trio import (
+ SocketListener,
+ open_tcp_listeners,
+)
from tractor.msg import MsgCodec
from tractor.log import get_logger
-from tractor._addr import TCPAddress
from tractor.ipc._transport import MsgpackTransport
log = get_logger(__name__)
+class TCPAddress:
+ proto_key: str = 'tcp'
+ unwrapped_type: type = tuple[str, int]
+ def_bindspace: str = '127.0.0.1'
+
+ def __init__(
+ self,
+ host: str,
+ port: int
+ ):
+ if (
+ not isinstance(host, str)
+ or
+ not isinstance(port, int)
+ ):
+ raise TypeError(
+ f'Expected host {host!r} to be str and port {port!r} to be int'
+ )
+
+ self._host: str = host
+ self._port: int = port
+
+ @property
+ def is_valid(self) -> bool:
+ return self._port != 0
+
+ @property
+ def bindspace(self) -> str:
+ return self._host
+
+ @property
+ def domain(self) -> str:
+ return self._host
+
+ @classmethod
+ def from_addr(
+ cls,
+ addr: tuple[str, int]
+ ) -> TCPAddress:
+ match addr:
+ case (str(), int()):
+ return TCPAddress(addr[0], addr[1])
+ case _:
+ raise ValueError(
+ f'Invalid unwrapped address for {cls}\n'
+ f'{addr}\n'
+ )
+
+ def unwrap(self) -> tuple[str, int]:
+ return (
+ self._host,
+ self._port,
+ )
+
+ @classmethod
+ def get_random(
+ cls,
+ bindspace: str = def_bindspace,
+ ) -> TCPAddress:
+ return TCPAddress(bindspace, 0)
+
+ @classmethod
+ def get_root(cls) -> TCPAddress:
+ return TCPAddress(
+ '127.0.0.1',
+ 1616,
+ )
+
+ def __repr__(self) -> str:
+ return (
+ f'{type(self).__name__}[{self.unwrap()}]'
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, TCPAddress):
+ raise TypeError(
+ f'Can not compare {type(other)} with {type(self)}'
+ )
+
+ return (
+ self._host == other._host
+ and
+ self._port == other._port
+ )
+
+ async def open_listener(
+ self,
+ **kwargs,
+ ) -> SocketListener:
+ listeners: list[SocketListener] = await open_tcp_listeners(
+ host=self._host,
+ port=self._port,
+ **kwargs
+ )
+ assert len(listeners) == 1
+ listener = listeners[0]
+ self._host, self._port = listener.socket.getsockname()[:2]
+ return listener
+
+ async def close_listener(self):
+ ...
+
+
# TODO: typing oddity.. not sure why we have to inherit here, but it
# seems to be an issue with `get_msg_transport()` returning
# a `Type[Protocol]`; probably should make a `mypy` issue?
diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py
index 160423c8..2a9926f9 100644
--- a/tractor/ipc/_transport.py
+++ b/tractor/ipc/_transport.py
@@ -14,8 +14,8 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
'''
-typing.Protocol based generic msg API, implement this class to add backends for
-tractor.ipc.Channel
+typing.Protocol based generic msg API, implement this class to add
+backends for tractor.ipc.Channel
'''
from __future__ import annotations
@@ -23,8 +23,9 @@ from typing import (
runtime_checkable,
Type,
Protocol,
- TypeVar,
- ClassVar
+ # TypeVar,
+ ClassVar,
+ TYPE_CHECKING,
)
from collections.abc import (
AsyncGenerator,
@@ -47,10 +48,13 @@ from tractor.msg import (
_ctxvar_MsgCodec,
# _codec, XXX see `self._codec` sanity/debug checks
MsgCodec,
+ MsgType,
types as msgtypes,
pretty_struct,
)
-from tractor._addr import Address
+
+if TYPE_CHECKING:
+ from tractor._addr import Address
log = get_logger(__name__)
@@ -63,12 +67,13 @@ MsgTransportKey = tuple[str, str]
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
# => BLEH, except can't bc prots must inherit typevar or param-spec
# vars..
-MsgType = TypeVar('MsgType')
+# MsgType = TypeVar('MsgType')
@runtime_checkable
-class MsgTransport(Protocol[MsgType]):
+class MsgTransport(Protocol):
#
+# class MsgTransport(Protocol[MsgType]):
# ^-TODO-^ consider using a generic def and indexing with our
# eventual msg definition/types?
# - https://docs.python.org/3/library/typing.html#typing.Protocol
diff --git a/tractor/ipc/_types.py b/tractor/ipc/_types.py
index 1b86636d..8d543d9d 100644
--- a/tractor/ipc/_types.py
+++ b/tractor/ipc/_types.py
@@ -13,19 +13,37 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import Type
+
+'''
+IPC subsys type-lookup helpers?
+
+'''
+from typing import (
+ Type,
+ # TYPE_CHECKING,
+)
import trio
import socket
-from tractor._addr import Address
from tractor.ipc._transport import (
MsgTransportKey,
MsgTransport
)
-from tractor.ipc._tcp import MsgpackTCPStream
-from tractor.ipc._uds import MsgpackUDSStream
+from tractor.ipc._tcp import (
+ TCPAddress,
+ MsgpackTCPStream,
+)
+from tractor.ipc._uds import (
+ UDSAddress,
+ MsgpackUDSStream,
+)
+# if TYPE_CHECKING:
+# from tractor._addr import Address
+
+
+Address = TCPAddress|UDSAddress
# manually updated list of all supported msg transport types
_msg_transports = [
@@ -41,7 +59,10 @@ _key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = {
}
# convert an Address wrapper to its corresponding transport type
-_addr_to_transport: dict[Type[Address], Type[MsgTransport]] = {
+_addr_to_transport: dict[
+ Type[TCPAddress|UDSAddress],
+ Type[MsgTransport]
+] = {
cls.address_type: cls
for cls in _msg_transports
}
diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py
index 3d24447b..33843f6a 100644
--- a/tractor/ipc/_uds.py
+++ b/tractor/ipc/_uds.py
@@ -29,8 +29,15 @@ from socket import (
SOL_SOCKET,
)
import struct
+from typing import (
+ TYPE_CHECKING,
+)
import trio
+from trio import (
+ socket,
+ SocketListener,
+)
from trio._highlevel_open_unix_stream import (
close_on_error,
has_unix,
@@ -38,16 +45,211 @@ from trio._highlevel_open_unix_stream import (
from tractor.msg import MsgCodec
from tractor.log import get_logger
-from tractor._addr import (
- UDSAddress,
- unwrap_sockpath,
+from tractor.ipc._transport import (
+ MsgpackTransport,
)
-from tractor.ipc._transport import MsgpackTransport
+from .._state import (
+ get_rt_dir,
+ current_actor,
+ is_root_process,
+)
+
+if TYPE_CHECKING:
+ from ._runtime import Actor
log = get_logger(__name__)
+def unwrap_sockpath(
+ sockpath: Path,
+) -> tuple[Path, Path]:
+ return (
+ sockpath.parent,
+ sockpath.name,
+ )
+
+
+class UDSAddress:
+ # TODO, maybe we should use better field and value
+ # -[x] really this is a `.protocol_key` not a "name" of anything.
+ # -[ ] consider a 'unix' proto-key instead?
+ # -[ ] need to check what other mult-transport frameworks do
+ # like zmq, nng, uri-spec et al!
+ proto_key: str = 'uds'
+ unwrapped_type: type = tuple[str, int]
+ def_bindspace: Path = get_rt_dir()
+
+ def __init__(
+ self,
+ filedir: Path|str|None,
+ # TODO, i think i want `.filename` here?
+ filename: str|Path,
+
+ # XXX, in the sense you can also pass
+ # a "non-real-world-process-id" such as is handy to represent
+ # our host-local default "port-like" key for the very first
+ # root actor to create a registry address.
+ maybe_pid: int|None = None,
+ ):
+ fdir = self._filedir = Path(
+ filedir
+ or
+ self.def_bindspace
+ ).absolute()
+ fpath = self._filename = Path(filename)
+ fp: Path = fdir / fpath
+ assert (
+ fp.is_absolute()
+ and
+ fp == self.sockpath
+ )
+
+ # to track which "side" is the peer process by reading socket
+ # credentials-info.
+ self._pid: int = maybe_pid
+
+ @property
+ def sockpath(self) -> Path:
+ return self._filedir / self._filename
+
+ @property
+ def is_valid(self) -> bool:
+ '''
+ We block socket files not allocated under the runtime subdir.
+
+ '''
+ return self.bindspace in self.sockpath.parents
+
+ @property
+ def bindspace(self) -> Path:
+ '''
+ We replicate the "ip-set-of-hosts" part of a UDS socket as
+ just the sub-directory in which we allocate socket files.
+
+ '''
+ return self._filedir or self.def_bindspace
+
+ @classmethod
+ def from_addr(
+ cls,
+ addr: (
+ tuple[Path|str, Path|str]|Path|str
+ ),
+ ) -> UDSAddress:
+ match addr:
+ case tuple()|list():
+ filedir = Path(addr[0])
+ filename = Path(addr[1])
+ # sockpath: Path = Path(addr[0])
+ # filedir, filename = unwrap_sockpath(sockpath)
+ # pid: int = addr[1]
+ return UDSAddress(
+ filedir=filedir,
+ filename=filename,
+ # maybe_pid=pid,
+ )
+ # NOTE, in case we ever decide to just `.unwrap()`
+ # to a `Path|str`?
+ case str()|Path():
+ sockpath: Path = Path(addr)
+ return UDSAddress(*unwrap_sockpath(sockpath))
+ case _:
+ # import pdbp; pdbp.set_trace()
+ raise TypeError(
+ f'Bad unwrapped-address for {cls} !\n'
+ f'{addr!r}\n'
+ )
+
+ def unwrap(self) -> tuple[str, int]:
+ # XXX NOTE, since this gets passed DIRECTLY to
+ # `.ipc._uds.open_unix_socket_w_passcred()`
+ return (
+ str(self._filedir),
+ str(self._filename),
+ )
+
+ @classmethod
+ def get_random(
+ cls,
+ bindspace: Path|None = None, # default netns
+ ) -> UDSAddress:
+
+ filedir: Path = bindspace or cls.def_bindspace
+ pid: int = os.getpid()
+ actor: Actor|None = current_actor(
+ err_on_no_runtime=False,
+ )
+ if actor:
+ sockname: str = '::'.join(actor.uid) + f'@{pid}'
+ else:
+ prefix: str = ''
+ if is_root_process():
+ prefix: str = 'root'
+ sockname: str = f'{prefix}@{pid}'
+
+ sockpath: Path = Path(f'{sockname}.sock')
+ return UDSAddress(
+ filedir=filedir,
+ filename=sockpath,
+ maybe_pid=pid,
+ )
+
+ @classmethod
+ def get_root(cls) -> UDSAddress:
+ def_uds_filename: Path = 'registry@1616.sock'
+ return UDSAddress(
+ filedir=None,
+ filename=def_uds_filename,
+ # maybe_pid=1616,
+ )
+
+ def __repr__(self) -> str:
+ return (
+ f'{type(self).__name__}'
+ f'['
+ f'({self._filedir}, {self._filename})'
+ f']'
+ )
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, UDSAddress):
+ raise TypeError(
+ f'Can not compare {type(other)} with {type(self)}'
+ )
+
+ return self.sockpath == other.sockpath
+
+ # async def open_listener(self, **kwargs) -> SocketListener:
+ async def open_listener(
+ self,
+ **kwargs,
+ ) -> SocketListener:
+ sock = self._sock = socket.socket(
+ socket.AF_UNIX,
+ socket.SOCK_STREAM
+ )
+ log.info(
+ f'Attempting to bind UDS socket\n'
+ f'>[\n'
+ f'|_{self}\n'
+ )
+
+ bindpath: Path = self.sockpath
+ await sock.bind(str(bindpath))
+ sock.listen(1)
+ log.info(
+ f'Listening on UDS socket\n'
+ f'[>\n'
+ f' |_{self}\n'
+ )
+ return SocketListener(self._sock)
+
+ def close_listener(self):
+ self._sock.close()
+ os.unlink(self.sockpath)
+
+
async def open_unix_socket_w_passcred(
filename: str|bytes|os.PathLike[str]|os.PathLike[bytes],
) -> trio.SocketStream:
@@ -214,3 +416,5 @@ class MsgpackUDSStream(MsgpackTransport):
maybe_pid=peer_pid
)
return (laddr, raddr)
+
+
diff --git a/tractor/msg/types.py b/tractor/msg/types.py
index d71fb7e0..86752aba 100644
--- a/tractor/msg/types.py
+++ b/tractor/msg/types.py
@@ -48,7 +48,7 @@ from tractor.msg import (
pretty_struct,
)
from tractor.log import get_logger
-from tractor._addr import UnwrappedAddress
+# from tractor._addr import UnwrappedAddress
log = get_logger('tractor.msgspec')
@@ -176,8 +176,8 @@ class SpawnSpec(
# TODO: not just sockaddr pairs?
# -[ ] abstract into a `TransportAddr` type?
- reg_addrs: list[UnwrappedAddress]
- bind_addrs: list[UnwrappedAddress]|None
+ reg_addrs: list[tuple[str, str|int]]
+ bind_addrs: list[tuple[str, str|int]]|None
# TODO: caps based RPC support in the payload?