Move concrete `Address`es to each tpt module
That is moving from `._addr`, - `TCPAddress` to `.ipc._tcp` - `UDSAddress` to `.ipc._uds` Obviously this requires adjusting a buncha stuff in `._addr` to avoid import cycles (the original reason the module was not also included in the new `.ipc` subpkg) including, - avoiding "unnecessary" imports of `[Unwrapped]Address` in various modules. * since `Address` is a protocol and the main point is that it **does not need to be inherited** per (https://typing.python.org/en/latest/spec/protocol.html#terminology) thus I removed the need for it in both transport submods. * and `UnwrappedAddress` is a type alias for tuples.. so we don't really always need to be importing it since it also kinda obfuscates what the underlying pairs are. - not exporting everything in submods at the `.ipc` top level and importing from specific submods by default. - only importing various types under a `if typing.TYPE_CHECKING:` guard as needed.leslies_extra_appendix
parent
8fd7d1cec4
commit
c9e9a3949f
|
@ -2,14 +2,17 @@ import time
|
|||
|
||||
import trio
|
||||
import pytest
|
||||
|
||||
import tractor
|
||||
from tractor.ipc import (
|
||||
from tractor.ipc._ringbuf import (
|
||||
open_ringbuf,
|
||||
RBToken,
|
||||
RingBuffSender,
|
||||
RingBuffReceiver
|
||||
)
|
||||
from tractor._testing.samples import generate_sample_messages
|
||||
from tractor._testing.samples import (
|
||||
generate_sample_messages,
|
||||
)
|
||||
|
||||
# in case you don't want to melt your cores, uncomment dis!
|
||||
pytestmark = pytest.mark.skip
|
||||
|
|
305
tractor/_addr.py
305
tractor/_addr.py
|
@ -14,34 +14,25 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from __future__ import annotations
|
||||
from pathlib import Path
|
||||
import os
|
||||
# import tempfile
|
||||
from uuid import uuid4
|
||||
from typing import (
|
||||
Protocol,
|
||||
ClassVar,
|
||||
# TypeVar,
|
||||
# Union,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
from bidict import bidict
|
||||
# import trio
|
||||
from trio import (
|
||||
socket,
|
||||
SocketListener,
|
||||
open_tcp_listeners,
|
||||
)
|
||||
|
||||
from .log import get_logger
|
||||
from ._state import (
|
||||
get_rt_dir,
|
||||
current_actor,
|
||||
is_root_process,
|
||||
_def_tpt_proto,
|
||||
)
|
||||
from .ipc._tcp import TCPAddress
|
||||
from .ipc._uds import UDSAddress
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
|
@ -179,298 +170,6 @@ class Address(Protocol):
|
|||
...
|
||||
|
||||
|
||||
class TCPAddress(Address):
|
||||
proto_key: str = 'tcp'
|
||||
unwrapped_type: type = tuple[str, int]
|
||||
def_bindspace: str = '127.0.0.1'
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
port: int
|
||||
):
|
||||
if (
|
||||
not isinstance(host, str)
|
||||
or
|
||||
not isinstance(port, int)
|
||||
):
|
||||
raise TypeError(
|
||||
f'Expected host {host!r} to be str and port {port!r} to be int'
|
||||
)
|
||||
|
||||
self._host: str = host
|
||||
self._port: int = port
|
||||
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
return self._port != 0
|
||||
|
||||
@property
|
||||
def bindspace(self) -> str:
|
||||
return self._host
|
||||
|
||||
@property
|
||||
def domain(self) -> str:
|
||||
return self._host
|
||||
|
||||
@classmethod
|
||||
def from_addr(
|
||||
cls,
|
||||
addr: tuple[str, int]
|
||||
) -> TCPAddress:
|
||||
match addr:
|
||||
case (str(), int()):
|
||||
return TCPAddress(addr[0], addr[1])
|
||||
case _:
|
||||
raise ValueError(
|
||||
f'Invalid unwrapped address for {cls}\n'
|
||||
f'{addr}\n'
|
||||
)
|
||||
|
||||
def unwrap(self) -> tuple[str, int]:
|
||||
return (
|
||||
self._host,
|
||||
self._port,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_random(
|
||||
cls,
|
||||
bindspace: str = def_bindspace,
|
||||
) -> TCPAddress:
|
||||
return TCPAddress(bindspace, 0)
|
||||
|
||||
@classmethod
|
||||
def get_root(cls) -> Address:
|
||||
return TCPAddress(
|
||||
'127.0.0.1',
|
||||
1616,
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f'{type(self).__name__}[{self.unwrap()}]'
|
||||
)
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, TCPAddress):
|
||||
raise TypeError(
|
||||
f'Can not compare {type(other)} with {type(self)}'
|
||||
)
|
||||
|
||||
return (
|
||||
self._host == other._host
|
||||
and
|
||||
self._port == other._port
|
||||
)
|
||||
|
||||
async def open_listener(
|
||||
self,
|
||||
**kwargs,
|
||||
) -> SocketListener:
|
||||
listeners: list[SocketListener] = await open_tcp_listeners(
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
**kwargs
|
||||
)
|
||||
assert len(listeners) == 1
|
||||
listener = listeners[0]
|
||||
self._host, self._port = listener.socket.getsockname()[:2]
|
||||
return listener
|
||||
|
||||
async def close_listener(self):
|
||||
...
|
||||
|
||||
|
||||
def unwrap_sockpath(
|
||||
sockpath: Path,
|
||||
) -> tuple[Path, Path]:
|
||||
return (
|
||||
sockpath.parent,
|
||||
sockpath.name,
|
||||
)
|
||||
|
||||
|
||||
class UDSAddress(Address):
|
||||
# TODO, maybe we should use better field and value
|
||||
# -[x] really this is a `.protocol_key` not a "name" of anything.
|
||||
# -[ ] consider a 'unix' proto-key instead?
|
||||
# -[ ] need to check what other mult-transport frameworks do
|
||||
# like zmq, nng, uri-spec et al!
|
||||
proto_key: str = 'uds'
|
||||
unwrapped_type: type = tuple[str, int]
|
||||
def_bindspace: Path = get_rt_dir()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
filedir: Path|str|None,
|
||||
# TODO, i think i want `.filename` here?
|
||||
filename: str|Path,
|
||||
|
||||
# XXX, in the sense you can also pass
|
||||
# a "non-real-world-process-id" such as is handy to represent
|
||||
# our host-local default "port-like" key for the very first
|
||||
# root actor to create a registry address.
|
||||
maybe_pid: int|None = None,
|
||||
):
|
||||
fdir = self._filedir = Path(
|
||||
filedir
|
||||
or
|
||||
self.def_bindspace
|
||||
).absolute()
|
||||
fpath = self._filename = Path(filename)
|
||||
fp: Path = fdir / fpath
|
||||
assert (
|
||||
fp.is_absolute()
|
||||
and
|
||||
fp == self.sockpath
|
||||
)
|
||||
|
||||
# to track which "side" is the peer process by reading socket
|
||||
# credentials-info.
|
||||
self._pid: int = maybe_pid
|
||||
|
||||
@property
|
||||
def sockpath(self) -> Path:
|
||||
return self._filedir / self._filename
|
||||
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
'''
|
||||
We block socket files not allocated under the runtime subdir.
|
||||
|
||||
'''
|
||||
return self.bindspace in self.sockpath.parents
|
||||
|
||||
@property
|
||||
def bindspace(self) -> Path:
|
||||
'''
|
||||
We replicate the "ip-set-of-hosts" part of a UDS socket as
|
||||
just the sub-directory in which we allocate socket files.
|
||||
|
||||
'''
|
||||
return self._filedir or self.def_bindspace
|
||||
|
||||
@classmethod
|
||||
def from_addr(
|
||||
cls,
|
||||
addr: (
|
||||
tuple[Path|str, Path|str]|Path|str
|
||||
),
|
||||
) -> UDSAddress:
|
||||
match addr:
|
||||
case tuple()|list():
|
||||
filedir = Path(addr[0])
|
||||
filename = Path(addr[1])
|
||||
# sockpath: Path = Path(addr[0])
|
||||
# filedir, filename = unwrap_sockpath(sockpath)
|
||||
# pid: int = addr[1]
|
||||
return UDSAddress(
|
||||
filedir=filedir,
|
||||
filename=filename,
|
||||
# maybe_pid=pid,
|
||||
)
|
||||
# NOTE, in case we ever decide to just `.unwrap()`
|
||||
# to a `Path|str`?
|
||||
case str()|Path():
|
||||
sockpath: Path = Path(addr)
|
||||
return UDSAddress(*unwrap_sockpath(sockpath))
|
||||
case _:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
raise TypeError(
|
||||
f'Bad unwrapped-address for {cls} !\n'
|
||||
f'{addr!r}\n'
|
||||
)
|
||||
|
||||
def unwrap(self) -> tuple[str, int]:
|
||||
# XXX NOTE, since this gets passed DIRECTLY to
|
||||
# `.ipc._uds.open_unix_socket_w_passcred()`
|
||||
return (
|
||||
str(self._filedir),
|
||||
str(self._filename),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_random(
|
||||
cls,
|
||||
bindspace: Path|None = None, # default netns
|
||||
) -> UDSAddress:
|
||||
|
||||
filedir: Path = bindspace or cls.def_bindspace
|
||||
pid: int = os.getpid()
|
||||
actor: Actor|None = current_actor(
|
||||
err_on_no_runtime=False,
|
||||
)
|
||||
if actor:
|
||||
sockname: str = '::'.join(actor.uid) + f'@{pid}'
|
||||
else:
|
||||
prefix: str = '<unknown-actor>'
|
||||
if is_root_process():
|
||||
prefix: str = 'root'
|
||||
sockname: str = f'{prefix}@{pid}'
|
||||
|
||||
sockpath: Path = Path(f'{sockname}.sock')
|
||||
return UDSAddress(
|
||||
filedir=filedir,
|
||||
filename=sockpath,
|
||||
maybe_pid=pid,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_root(cls) -> Address:
|
||||
def_uds_filename: Path = 'registry@1616.sock'
|
||||
return UDSAddress(
|
||||
filedir=None,
|
||||
filename=def_uds_filename,
|
||||
# maybe_pid=1616,
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f'{type(self).__name__}'
|
||||
f'['
|
||||
f'({self._filedir}, {self._filename})'
|
||||
f']'
|
||||
)
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, UDSAddress):
|
||||
raise TypeError(
|
||||
f'Can not compare {type(other)} with {type(self)}'
|
||||
)
|
||||
|
||||
return self.sockpath == other.sockpath
|
||||
|
||||
# async def open_listener(self, **kwargs) -> SocketListener:
|
||||
async def open_listener(
|
||||
self,
|
||||
**kwargs,
|
||||
) -> SocketListener:
|
||||
sock = self._sock = socket.socket(
|
||||
socket.AF_UNIX,
|
||||
socket.SOCK_STREAM
|
||||
)
|
||||
log.info(
|
||||
f'Attempting to bind UDS socket\n'
|
||||
f'>[\n'
|
||||
f'|_{self}\n'
|
||||
)
|
||||
|
||||
bindpath: Path = self.sockpath
|
||||
await sock.bind(str(bindpath))
|
||||
sock.listen(1)
|
||||
log.info(
|
||||
f'Listening on UDS socket\n'
|
||||
f'[>\n'
|
||||
f' |_{self}\n'
|
||||
)
|
||||
return SocketListener(self._sock)
|
||||
|
||||
def close_listener(self):
|
||||
self._sock.close()
|
||||
os.unlink(self.sockpath)
|
||||
|
||||
|
||||
_address_types: bidict[str, Type[Address]] = {
|
||||
'tcp': TCPAddress,
|
||||
'uds': UDSAddress
|
||||
|
|
|
@ -105,7 +105,7 @@ from ._state import (
|
|||
if TYPE_CHECKING:
|
||||
from ._portal import Portal
|
||||
from ._runtime import Actor
|
||||
from .ipc import MsgTransport
|
||||
from .ipc._transport import MsgTransport
|
||||
from .devx._frame_stack import (
|
||||
CallerInfo,
|
||||
)
|
||||
|
|
|
@ -13,43 +13,12 @@
|
|||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
import platform
|
||||
|
||||
from ._transport import (
|
||||
MsgTransportKey as MsgTransportKey,
|
||||
MsgType as MsgType,
|
||||
MsgTransport as MsgTransport,
|
||||
MsgpackTransport as MsgpackTransport
|
||||
)
|
||||
|
||||
from ._tcp import MsgpackTCPStream as MsgpackTCPStream
|
||||
from ._uds import MsgpackUDSStream as MsgpackUDSStream
|
||||
|
||||
from ._types import (
|
||||
transport_from_addr as transport_from_addr,
|
||||
transport_from_stream as transport_from_stream,
|
||||
)
|
||||
'''
|
||||
A modular IPC layer supporting the power of cross-process SC!
|
||||
|
||||
'''
|
||||
from ._chan import (
|
||||
_connect_chan as _connect_chan,
|
||||
Channel as Channel
|
||||
)
|
||||
|
||||
if platform.system() == 'Linux':
|
||||
from ._linux import (
|
||||
EFD_SEMAPHORE as EFD_SEMAPHORE,
|
||||
EFD_CLOEXEC as EFD_CLOEXEC,
|
||||
EFD_NONBLOCK as EFD_NONBLOCK,
|
||||
open_eventfd as open_eventfd,
|
||||
write_eventfd as write_eventfd,
|
||||
read_eventfd as read_eventfd,
|
||||
close_eventfd as close_eventfd,
|
||||
EventFD as EventFD,
|
||||
)
|
||||
|
||||
from ._ringbuf import (
|
||||
RBToken as RBToken,
|
||||
RingBuffSender as RingBuffSender,
|
||||
RingBuffReceiver as RingBuffReceiver,
|
||||
open_ringbuf as open_ringbuf
|
||||
)
|
||||
|
|
|
@ -29,13 +29,13 @@ from pprint import pformat
|
|||
import typing
|
||||
from typing import (
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
import warnings
|
||||
|
||||
import trio
|
||||
|
||||
from tractor.ipc._transport import MsgTransport
|
||||
from tractor.ipc._types import (
|
||||
from ._types import (
|
||||
transport_from_addr,
|
||||
transport_from_stream,
|
||||
)
|
||||
|
@ -55,6 +55,9 @@ from tractor.msg import (
|
|||
MsgCodec,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._transport import MsgTransport
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
|
|
@ -0,0 +1,163 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
File-descriptor-sharing on `linux` by "wilhelm_of_bohemia".
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import array
|
||||
import socket
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from contextlib import ExitStack
|
||||
|
||||
import trio
|
||||
import tractor
|
||||
from tractor.ipc import RBToken
|
||||
|
||||
|
||||
actor_name = 'ringd'
|
||||
|
||||
|
||||
_rings: dict[str, dict] = {}
|
||||
|
||||
|
||||
async def _attach_to_ring(
|
||||
ring_name: str
|
||||
) -> tuple[int, int, int]:
|
||||
actor = tractor.current_actor()
|
||||
|
||||
fd_amount = 3
|
||||
sock_path = (
|
||||
Path(tempfile.gettempdir())
|
||||
/
|
||||
f'{os.getpid()}-pass-ring-fds-{ring_name}-to-{actor.name}.sock'
|
||||
)
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
sock.bind(sock_path)
|
||||
sock.listen(1)
|
||||
|
||||
async with (
|
||||
tractor.find_actor(actor_name) as ringd,
|
||||
ringd.open_context(
|
||||
_pass_fds,
|
||||
name=ring_name,
|
||||
sock_path=sock_path
|
||||
) as (ctx, _sent)
|
||||
):
|
||||
# prepare array to receive FD
|
||||
fds = array.array("i", [0] * fd_amount)
|
||||
|
||||
conn, _ = sock.accept()
|
||||
|
||||
# receive FD
|
||||
msg, ancdata, flags, addr = conn.recvmsg(
|
||||
1024,
|
||||
socket.CMSG_LEN(fds.itemsize * fd_amount)
|
||||
)
|
||||
|
||||
for (
|
||||
cmsg_level,
|
||||
cmsg_type,
|
||||
cmsg_data,
|
||||
) in ancdata:
|
||||
if (
|
||||
cmsg_level == socket.SOL_SOCKET
|
||||
and
|
||||
cmsg_type == socket.SCM_RIGHTS
|
||||
):
|
||||
fds.frombytes(cmsg_data[:fds.itemsize * fd_amount])
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("Receiver: No FDs received")
|
||||
|
||||
conn.close()
|
||||
sock.close()
|
||||
sock_path.unlink()
|
||||
|
||||
return RBToken.from_msg(
|
||||
await ctx.wait_for_result()
|
||||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def _pass_fds(
|
||||
ctx: tractor.Context,
|
||||
name: str,
|
||||
sock_path: str
|
||||
) -> RBToken:
|
||||
global _rings
|
||||
token = _rings[name]
|
||||
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
client.connect(sock_path)
|
||||
await ctx.started()
|
||||
fds = array.array('i', token.fds)
|
||||
client.sendmsg([b'FDs'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
|
||||
client.close()
|
||||
return token
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def _open_ringbuf(
|
||||
ctx: tractor.Context,
|
||||
name: str,
|
||||
buf_size: int
|
||||
) -> RBToken:
|
||||
global _rings
|
||||
is_owner = False
|
||||
if name not in _rings:
|
||||
stack = ExitStack()
|
||||
token = stack.enter_context(
|
||||
tractor.open_ringbuf(
|
||||
name,
|
||||
buf_size=buf_size
|
||||
)
|
||||
)
|
||||
_rings[name] = {
|
||||
'token': token,
|
||||
'stack': stack,
|
||||
}
|
||||
is_owner = True
|
||||
|
||||
ring = _rings[name]
|
||||
await ctx.started()
|
||||
|
||||
try:
|
||||
await trio.sleep_forever()
|
||||
|
||||
except tractor.ContextCancelled:
|
||||
...
|
||||
|
||||
finally:
|
||||
if is_owner:
|
||||
ring['stack'].close()
|
||||
|
||||
|
||||
async def open_ringbuf(
|
||||
name: str,
|
||||
buf_size: int
|
||||
) -> RBToken:
|
||||
async with (
|
||||
tractor.find_actor(actor_name) as ringd,
|
||||
ringd.open_context(
|
||||
_open_ringbuf,
|
||||
name=name,
|
||||
buf_size=buf_size
|
||||
) as (rd_ctx, _)
|
||||
):
|
||||
yield await _attach_to_ring(name)
|
||||
await rd_ctx.cancel()
|
|
@ -20,16 +20,122 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol
|
|||
from __future__ import annotations
|
||||
|
||||
import trio
|
||||
from trio import (
|
||||
SocketListener,
|
||||
open_tcp_listeners,
|
||||
)
|
||||
|
||||
from tractor.msg import MsgCodec
|
||||
from tractor.log import get_logger
|
||||
from tractor._addr import TCPAddress
|
||||
from tractor.ipc._transport import MsgpackTransport
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
class TCPAddress:
|
||||
proto_key: str = 'tcp'
|
||||
unwrapped_type: type = tuple[str, int]
|
||||
def_bindspace: str = '127.0.0.1'
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
port: int
|
||||
):
|
||||
if (
|
||||
not isinstance(host, str)
|
||||
or
|
||||
not isinstance(port, int)
|
||||
):
|
||||
raise TypeError(
|
||||
f'Expected host {host!r} to be str and port {port!r} to be int'
|
||||
)
|
||||
|
||||
self._host: str = host
|
||||
self._port: int = port
|
||||
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
return self._port != 0
|
||||
|
||||
@property
|
||||
def bindspace(self) -> str:
|
||||
return self._host
|
||||
|
||||
@property
|
||||
def domain(self) -> str:
|
||||
return self._host
|
||||
|
||||
@classmethod
|
||||
def from_addr(
|
||||
cls,
|
||||
addr: tuple[str, int]
|
||||
) -> TCPAddress:
|
||||
match addr:
|
||||
case (str(), int()):
|
||||
return TCPAddress(addr[0], addr[1])
|
||||
case _:
|
||||
raise ValueError(
|
||||
f'Invalid unwrapped address for {cls}\n'
|
||||
f'{addr}\n'
|
||||
)
|
||||
|
||||
def unwrap(self) -> tuple[str, int]:
|
||||
return (
|
||||
self._host,
|
||||
self._port,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_random(
|
||||
cls,
|
||||
bindspace: str = def_bindspace,
|
||||
) -> TCPAddress:
|
||||
return TCPAddress(bindspace, 0)
|
||||
|
||||
@classmethod
|
||||
def get_root(cls) -> TCPAddress:
|
||||
return TCPAddress(
|
||||
'127.0.0.1',
|
||||
1616,
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f'{type(self).__name__}[{self.unwrap()}]'
|
||||
)
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, TCPAddress):
|
||||
raise TypeError(
|
||||
f'Can not compare {type(other)} with {type(self)}'
|
||||
)
|
||||
|
||||
return (
|
||||
self._host == other._host
|
||||
and
|
||||
self._port == other._port
|
||||
)
|
||||
|
||||
async def open_listener(
|
||||
self,
|
||||
**kwargs,
|
||||
) -> SocketListener:
|
||||
listeners: list[SocketListener] = await open_tcp_listeners(
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
**kwargs
|
||||
)
|
||||
assert len(listeners) == 1
|
||||
listener = listeners[0]
|
||||
self._host, self._port = listener.socket.getsockname()[:2]
|
||||
return listener
|
||||
|
||||
async def close_listener(self):
|
||||
...
|
||||
|
||||
|
||||
# TODO: typing oddity.. not sure why we have to inherit here, but it
|
||||
# seems to be an issue with `get_msg_transport()` returning
|
||||
# a `Type[Protocol]`; probably should make a `mypy` issue?
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
typing.Protocol based generic msg API, implement this class to add backends for
|
||||
tractor.ipc.Channel
|
||||
typing.Protocol based generic msg API, implement this class to add
|
||||
backends for tractor.ipc.Channel
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
@ -23,8 +23,9 @@ from typing import (
|
|||
runtime_checkable,
|
||||
Type,
|
||||
Protocol,
|
||||
TypeVar,
|
||||
ClassVar
|
||||
# TypeVar,
|
||||
ClassVar,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from collections.abc import (
|
||||
AsyncGenerator,
|
||||
|
@ -47,10 +48,13 @@ from tractor.msg import (
|
|||
_ctxvar_MsgCodec,
|
||||
# _codec, XXX see `self._codec` sanity/debug checks
|
||||
MsgCodec,
|
||||
MsgType,
|
||||
types as msgtypes,
|
||||
pretty_struct,
|
||||
)
|
||||
from tractor._addr import Address
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._addr import Address
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
@ -63,12 +67,13 @@ MsgTransportKey = tuple[str, str]
|
|||
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
|
||||
# => BLEH, except can't bc prots must inherit typevar or param-spec
|
||||
# vars..
|
||||
MsgType = TypeVar('MsgType')
|
||||
# MsgType = TypeVar('MsgType')
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class MsgTransport(Protocol[MsgType]):
|
||||
class MsgTransport(Protocol):
|
||||
#
|
||||
# class MsgTransport(Protocol[MsgType]):
|
||||
# ^-TODO-^ consider using a generic def and indexing with our
|
||||
# eventual msg definition/types?
|
||||
# - https://docs.python.org/3/library/typing.html#typing.Protocol
|
||||
|
|
|
@ -13,19 +13,37 @@
|
|||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Type
|
||||
|
||||
'''
|
||||
IPC subsys type-lookup helpers?
|
||||
|
||||
'''
|
||||
from typing import (
|
||||
Type,
|
||||
# TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import trio
|
||||
import socket
|
||||
|
||||
from tractor._addr import Address
|
||||
from tractor.ipc._transport import (
|
||||
MsgTransportKey,
|
||||
MsgTransport
|
||||
)
|
||||
from tractor.ipc._tcp import MsgpackTCPStream
|
||||
from tractor.ipc._uds import MsgpackUDSStream
|
||||
from tractor.ipc._tcp import (
|
||||
TCPAddress,
|
||||
MsgpackTCPStream,
|
||||
)
|
||||
from tractor.ipc._uds import (
|
||||
UDSAddress,
|
||||
MsgpackUDSStream,
|
||||
)
|
||||
|
||||
# if TYPE_CHECKING:
|
||||
# from tractor._addr import Address
|
||||
|
||||
|
||||
Address = TCPAddress|UDSAddress
|
||||
|
||||
# manually updated list of all supported msg transport types
|
||||
_msg_transports = [
|
||||
|
@ -41,7 +59,10 @@ _key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = {
|
|||
}
|
||||
|
||||
# convert an Address wrapper to its corresponding transport type
|
||||
_addr_to_transport: dict[Type[Address], Type[MsgTransport]] = {
|
||||
_addr_to_transport: dict[
|
||||
Type[TCPAddress|UDSAddress],
|
||||
Type[MsgTransport]
|
||||
] = {
|
||||
cls.address_type: cls
|
||||
for cls in _msg_transports
|
||||
}
|
||||
|
|
|
@ -29,8 +29,15 @@ from socket import (
|
|||
SOL_SOCKET,
|
||||
)
|
||||
import struct
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import trio
|
||||
from trio import (
|
||||
socket,
|
||||
SocketListener,
|
||||
)
|
||||
from trio._highlevel_open_unix_stream import (
|
||||
close_on_error,
|
||||
has_unix,
|
||||
|
@ -38,16 +45,211 @@ from trio._highlevel_open_unix_stream import (
|
|||
|
||||
from tractor.msg import MsgCodec
|
||||
from tractor.log import get_logger
|
||||
from tractor._addr import (
|
||||
UDSAddress,
|
||||
unwrap_sockpath,
|
||||
from tractor.ipc._transport import (
|
||||
MsgpackTransport,
|
||||
)
|
||||
from tractor.ipc._transport import MsgpackTransport
|
||||
from .._state import (
|
||||
get_rt_dir,
|
||||
current_actor,
|
||||
is_root_process,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
def unwrap_sockpath(
|
||||
sockpath: Path,
|
||||
) -> tuple[Path, Path]:
|
||||
return (
|
||||
sockpath.parent,
|
||||
sockpath.name,
|
||||
)
|
||||
|
||||
|
||||
class UDSAddress:
|
||||
# TODO, maybe we should use better field and value
|
||||
# -[x] really this is a `.protocol_key` not a "name" of anything.
|
||||
# -[ ] consider a 'unix' proto-key instead?
|
||||
# -[ ] need to check what other mult-transport frameworks do
|
||||
# like zmq, nng, uri-spec et al!
|
||||
proto_key: str = 'uds'
|
||||
unwrapped_type: type = tuple[str, int]
|
||||
def_bindspace: Path = get_rt_dir()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
filedir: Path|str|None,
|
||||
# TODO, i think i want `.filename` here?
|
||||
filename: str|Path,
|
||||
|
||||
# XXX, in the sense you can also pass
|
||||
# a "non-real-world-process-id" such as is handy to represent
|
||||
# our host-local default "port-like" key for the very first
|
||||
# root actor to create a registry address.
|
||||
maybe_pid: int|None = None,
|
||||
):
|
||||
fdir = self._filedir = Path(
|
||||
filedir
|
||||
or
|
||||
self.def_bindspace
|
||||
).absolute()
|
||||
fpath = self._filename = Path(filename)
|
||||
fp: Path = fdir / fpath
|
||||
assert (
|
||||
fp.is_absolute()
|
||||
and
|
||||
fp == self.sockpath
|
||||
)
|
||||
|
||||
# to track which "side" is the peer process by reading socket
|
||||
# credentials-info.
|
||||
self._pid: int = maybe_pid
|
||||
|
||||
@property
|
||||
def sockpath(self) -> Path:
|
||||
return self._filedir / self._filename
|
||||
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
'''
|
||||
We block socket files not allocated under the runtime subdir.
|
||||
|
||||
'''
|
||||
return self.bindspace in self.sockpath.parents
|
||||
|
||||
@property
|
||||
def bindspace(self) -> Path:
|
||||
'''
|
||||
We replicate the "ip-set-of-hosts" part of a UDS socket as
|
||||
just the sub-directory in which we allocate socket files.
|
||||
|
||||
'''
|
||||
return self._filedir or self.def_bindspace
|
||||
|
||||
@classmethod
|
||||
def from_addr(
|
||||
cls,
|
||||
addr: (
|
||||
tuple[Path|str, Path|str]|Path|str
|
||||
),
|
||||
) -> UDSAddress:
|
||||
match addr:
|
||||
case tuple()|list():
|
||||
filedir = Path(addr[0])
|
||||
filename = Path(addr[1])
|
||||
# sockpath: Path = Path(addr[0])
|
||||
# filedir, filename = unwrap_sockpath(sockpath)
|
||||
# pid: int = addr[1]
|
||||
return UDSAddress(
|
||||
filedir=filedir,
|
||||
filename=filename,
|
||||
# maybe_pid=pid,
|
||||
)
|
||||
# NOTE, in case we ever decide to just `.unwrap()`
|
||||
# to a `Path|str`?
|
||||
case str()|Path():
|
||||
sockpath: Path = Path(addr)
|
||||
return UDSAddress(*unwrap_sockpath(sockpath))
|
||||
case _:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
raise TypeError(
|
||||
f'Bad unwrapped-address for {cls} !\n'
|
||||
f'{addr!r}\n'
|
||||
)
|
||||
|
||||
def unwrap(self) -> tuple[str, int]:
|
||||
# XXX NOTE, since this gets passed DIRECTLY to
|
||||
# `.ipc._uds.open_unix_socket_w_passcred()`
|
||||
return (
|
||||
str(self._filedir),
|
||||
str(self._filename),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_random(
|
||||
cls,
|
||||
bindspace: Path|None = None, # default netns
|
||||
) -> UDSAddress:
|
||||
|
||||
filedir: Path = bindspace or cls.def_bindspace
|
||||
pid: int = os.getpid()
|
||||
actor: Actor|None = current_actor(
|
||||
err_on_no_runtime=False,
|
||||
)
|
||||
if actor:
|
||||
sockname: str = '::'.join(actor.uid) + f'@{pid}'
|
||||
else:
|
||||
prefix: str = '<unknown-actor>'
|
||||
if is_root_process():
|
||||
prefix: str = 'root'
|
||||
sockname: str = f'{prefix}@{pid}'
|
||||
|
||||
sockpath: Path = Path(f'{sockname}.sock')
|
||||
return UDSAddress(
|
||||
filedir=filedir,
|
||||
filename=sockpath,
|
||||
maybe_pid=pid,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_root(cls) -> UDSAddress:
|
||||
def_uds_filename: Path = 'registry@1616.sock'
|
||||
return UDSAddress(
|
||||
filedir=None,
|
||||
filename=def_uds_filename,
|
||||
# maybe_pid=1616,
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f'{type(self).__name__}'
|
||||
f'['
|
||||
f'({self._filedir}, {self._filename})'
|
||||
f']'
|
||||
)
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, UDSAddress):
|
||||
raise TypeError(
|
||||
f'Can not compare {type(other)} with {type(self)}'
|
||||
)
|
||||
|
||||
return self.sockpath == other.sockpath
|
||||
|
||||
# async def open_listener(self, **kwargs) -> SocketListener:
|
||||
async def open_listener(
|
||||
self,
|
||||
**kwargs,
|
||||
) -> SocketListener:
|
||||
sock = self._sock = socket.socket(
|
||||
socket.AF_UNIX,
|
||||
socket.SOCK_STREAM
|
||||
)
|
||||
log.info(
|
||||
f'Attempting to bind UDS socket\n'
|
||||
f'>[\n'
|
||||
f'|_{self}\n'
|
||||
)
|
||||
|
||||
bindpath: Path = self.sockpath
|
||||
await sock.bind(str(bindpath))
|
||||
sock.listen(1)
|
||||
log.info(
|
||||
f'Listening on UDS socket\n'
|
||||
f'[>\n'
|
||||
f' |_{self}\n'
|
||||
)
|
||||
return SocketListener(self._sock)
|
||||
|
||||
def close_listener(self):
|
||||
self._sock.close()
|
||||
os.unlink(self.sockpath)
|
||||
|
||||
|
||||
async def open_unix_socket_w_passcred(
|
||||
filename: str|bytes|os.PathLike[str]|os.PathLike[bytes],
|
||||
) -> trio.SocketStream:
|
||||
|
@ -214,3 +416,5 @@ class MsgpackUDSStream(MsgpackTransport):
|
|||
maybe_pid=peer_pid
|
||||
)
|
||||
return (laddr, raddr)
|
||||
|
||||
|
||||
|
|
|
@ -48,7 +48,7 @@ from tractor.msg import (
|
|||
pretty_struct,
|
||||
)
|
||||
from tractor.log import get_logger
|
||||
from tractor._addr import UnwrappedAddress
|
||||
# from tractor._addr import UnwrappedAddress
|
||||
|
||||
|
||||
log = get_logger('tractor.msgspec')
|
||||
|
@ -176,8 +176,8 @@ class SpawnSpec(
|
|||
|
||||
# TODO: not just sockaddr pairs?
|
||||
# -[ ] abstract into a `TransportAddr` type?
|
||||
reg_addrs: list[UnwrappedAddress]
|
||||
bind_addrs: list[UnwrappedAddress]|None
|
||||
reg_addrs: list[tuple[str, str|int]]
|
||||
bind_addrs: list[tuple[str, str|int]]|None
|
||||
|
||||
|
||||
# TODO: caps based RPC support in the payload?
|
||||
|
|
Loading…
Reference in New Issue