Even more `tractor._addr.Address` simplifying
Namely reducing the duplication of class-fields and `TypeVar`s used for parametrizing the `Address` protocol type, - drop all of the `TypeVar` types and just stick with all concrete addrs types inheriting from `Address` only. - rename `Address.name_key` -> `.proto_key`. - rename `Address.address_type` -> `.unwrapped_type` - rename `.namespace` -> `.bindspace` to better reflect that this "part" of the address represents the possible "space for binding endpoints". |_ also linux already uses "namespace" to mean the `netns` and i'd prefer to stick with their semantics for that. - add `TCPAddress/UDSAddress.def_bindspace` values. - drop commented `.open_stream()` method; never used. - simplify `UnwrappedAdress` to just a `tuple` of union types. - add logging to `USDAddress.open_listener()` for now. - adjust `tractor.ipc/_uds/tcp` transport to use new addr field names.leslies_extra_appendix
parent
a28659c3cd
commit
89993a4e3a
289
tractor/_addr.py
289
tractor/_addr.py
|
@ -21,54 +21,121 @@ from uuid import uuid4
|
|||
from typing import (
|
||||
Protocol,
|
||||
ClassVar,
|
||||
TypeVar,
|
||||
Union,
|
||||
# TypeVar,
|
||||
# Union,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
from bidict import bidict
|
||||
import trio
|
||||
from trio import socket
|
||||
# import trio
|
||||
from trio import (
|
||||
socket,
|
||||
SocketListener,
|
||||
open_tcp_listeners,
|
||||
)
|
||||
|
||||
from .log import get_logger
|
||||
from ._state import (
|
||||
get_rt_dir,
|
||||
current_actor,
|
||||
is_root_process,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
|
||||
|
||||
NamespaceType = TypeVar('NamespaceType')
|
||||
AddressType = TypeVar('AddressType')
|
||||
StreamType = TypeVar('StreamType')
|
||||
ListenerType = TypeVar('ListenerType')
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
class Address(Protocol[
|
||||
NamespaceType,
|
||||
AddressType,
|
||||
StreamType,
|
||||
ListenerType
|
||||
]):
|
||||
# TODO, maybe breakout the netns key to a struct?
|
||||
# class NetNs(Struct)[str, int]:
|
||||
# ...
|
||||
|
||||
name_key: ClassVar[str]
|
||||
address_type: ClassVar[Type[AddressType]]
|
||||
# TODO, can't we just use a type alias
|
||||
# for this? namely just some `tuple[str, int, str, str]`?
|
||||
#
|
||||
# -[ ] would also just be simpler to keep this as SockAddr[tuple]
|
||||
# or something, implying it's just a simple pair of values which can
|
||||
# presumably be mapped to all transports?
|
||||
# -[ ] `pydoc socket.socket.getsockname()` delivers a 4-tuple for
|
||||
# ipv6 `(hostaddr, port, flowinfo, scope_id)`.. so how should we
|
||||
# handle that?
|
||||
# -[ ] as a further alternative to this wrap()/unwrap() approach we
|
||||
# could just implement `enc/dec_hook()`s for the `Address`-types
|
||||
# and just deal with our internal objs directly and always and
|
||||
# leave it to the codec layer to figure out marshalling?
|
||||
# |_ would mean only one spot to do the `.unwrap()` (which we may
|
||||
# end up needing to call from the hook()s anyway?)
|
||||
# -[x] rename to `UnwrappedAddress[Descriptor]` ??
|
||||
# seems like the right name as per,
|
||||
# https://www.geeksforgeeks.org/introduction-to-address-descriptor/
|
||||
#
|
||||
UnwrappedAddress = (
|
||||
# tcp/udp/uds
|
||||
tuple[
|
||||
str, # host/domain(tcp), filesys-dir(uds)
|
||||
int|str, # port/path(uds)
|
||||
]
|
||||
# ?TODO? should we also include another 2 fields from
|
||||
# our `Aid` msg such that we include the runtime `Actor.uid`
|
||||
# of `.name` and `.uuid`?
|
||||
# - would ensure uniqueness across entire net?
|
||||
# - allows for easier runtime-level filtering of "actors by
|
||||
# service name"
|
||||
)
|
||||
|
||||
|
||||
# TODO, maybe rename to `SocketAddress`?
|
||||
class Address(Protocol):
|
||||
proto_key: ClassVar[str]
|
||||
unwrapped_type: ClassVar[UnwrappedAddress]
|
||||
|
||||
# TODO, i feel like an `.is_bound()` is a better thing to
|
||||
# support?
|
||||
# Lke, what use does this have besides a noop and if it's not
|
||||
# valid why aren't we erroring on creation/use?
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
...
|
||||
|
||||
# TODO, maybe `.netns` is a better name?
|
||||
@property
|
||||
def namespace(self) -> NamespaceType|None:
|
||||
def namespace(self) -> tuple[str, int]|None:
|
||||
'''
|
||||
The if-available, OS-specific "network namespace" key.
|
||||
|
||||
'''
|
||||
...
|
||||
|
||||
@property
|
||||
def bindspace(self) -> str:
|
||||
'''
|
||||
Deliver the socket address' "bindable space" from
|
||||
a `socket.socket.bind()` and thus from the perspective of
|
||||
specific transport protocol domain.
|
||||
|
||||
I.e. for most (layer-4) network-socket protocols this is
|
||||
normally the ipv4/6 address, for UDS this is normally
|
||||
a filesystem (sub-directory).
|
||||
|
||||
For (distributed) network protocols this is normally the routing
|
||||
layer's domain/(ip-)address, though it might also include a "network namespace"
|
||||
key different then the default.
|
||||
|
||||
For local-host-only transports this is either an explicit
|
||||
namespace (with types defined by the OS: netns, Cgroup, IPC,
|
||||
pid, etc. on linux) or failing that the sub-directory in the
|
||||
filesys in which socket/shm files are located *under*.
|
||||
|
||||
'''
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def from_addr(cls, addr: AddressType) -> Address:
|
||||
def from_addr(cls, addr: UnwrappedAddress) -> Address:
|
||||
...
|
||||
|
||||
def unwrap(self) -> AddressType:
|
||||
def unwrap(self) -> UnwrappedAddress:
|
||||
'''
|
||||
Deliver the underying minimum field set in
|
||||
a primitive python data type-structure.
|
||||
|
@ -76,7 +143,11 @@ class Address(Protocol[
|
|||
...
|
||||
|
||||
@classmethod
|
||||
def get_random(cls, namespace: NamespaceType | None = None) -> Address:
|
||||
def get_random(
|
||||
cls,
|
||||
current_actor: Actor,
|
||||
bindspace: str|None = None,
|
||||
) -> Address:
|
||||
...
|
||||
|
||||
# TODO, this should be something like a `.get_def_registar_addr()`
|
||||
|
@ -97,30 +168,20 @@ class Address(Protocol[
|
|||
def __eq__(self, other) -> bool:
|
||||
...
|
||||
|
||||
# async def open_stream(self, **kwargs) -> StreamType:
|
||||
# '''
|
||||
# Open a connection *TO* this address and deliver back a
|
||||
# `trio.SocketStream` wrapping the underlying transport.
|
||||
|
||||
# '''
|
||||
# ...
|
||||
|
||||
async def open_listener(self, **kwargs) -> ListenerType:
|
||||
async def open_listener(
|
||||
self,
|
||||
**kwargs,
|
||||
) -> SocketListener:
|
||||
...
|
||||
|
||||
async def close_listener(self):
|
||||
...
|
||||
|
||||
|
||||
class TCPAddress(Address[
|
||||
str,
|
||||
tuple[str, int],
|
||||
trio.SocketStream,
|
||||
trio.SocketListener
|
||||
]):
|
||||
|
||||
name_key: str = 'tcp'
|
||||
address_type: type = tuple[str, int]
|
||||
class TCPAddress(Address):
|
||||
proto_key: str = 'tcp'
|
||||
unwrapped_type: type = tuple[str, int]
|
||||
def_bindspace: str = '127.0.0.1'
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -144,7 +205,11 @@ class TCPAddress(Address[
|
|||
return self._port != 0
|
||||
|
||||
@property
|
||||
def namespace(self) -> str:
|
||||
def bindspace(self) -> str:
|
||||
return self._host
|
||||
|
||||
@property
|
||||
def domain(self) -> str:
|
||||
return self._host
|
||||
|
||||
@classmethod
|
||||
|
@ -163,13 +228,17 @@ class TCPAddress(Address[
|
|||
@classmethod
|
||||
def get_random(
|
||||
cls,
|
||||
namespace: str = '127.0.0.1',
|
||||
current_actor: Actor,
|
||||
bindspace: str = def_bindspace,
|
||||
) -> TCPAddress:
|
||||
return TCPAddress(namespace, 0)
|
||||
return TCPAddress(bindspace, 0)
|
||||
|
||||
@classmethod
|
||||
def get_root(cls) -> Address:
|
||||
return TCPAddress('127.0.0.1', 1616)
|
||||
return TCPAddress(
|
||||
'127.0.0.1',
|
||||
1616,
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
|
@ -188,17 +257,11 @@ class TCPAddress(Address[
|
|||
self._port == other._port
|
||||
)
|
||||
|
||||
# async def open_stream(self, **kwargs) -> trio.SocketStream:
|
||||
# stream = await trio.open_tcp_stream(
|
||||
# self._host,
|
||||
# self._port,
|
||||
# **kwargs
|
||||
# )
|
||||
# self._host, self._port = stream.socket.getsockname()[:2]
|
||||
# return stream
|
||||
|
||||
async def open_listener(self, **kwargs) -> trio.SocketListener:
|
||||
listeners = await trio.open_tcp_listeners(
|
||||
async def open_listener(
|
||||
self,
|
||||
**kwargs,
|
||||
) -> SocketListener:
|
||||
listeners: list[SocketListener] = await open_tcp_listeners(
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
**kwargs
|
||||
|
@ -212,18 +275,15 @@ class TCPAddress(Address[
|
|||
...
|
||||
|
||||
|
||||
class UDSAddress(Address[
|
||||
None,
|
||||
str,
|
||||
trio.SocketStream,
|
||||
trio.SocketListener
|
||||
]):
|
||||
|
||||
# TODO, maybe we should use 'unix' instead?
|
||||
class UDSAddress(Address):
|
||||
# TODO, maybe we should use better field and value
|
||||
# -[x] really this is a `.protocol_key` not a "name" of anything.
|
||||
# -[ ] consider a 'unix' proto-key instead?
|
||||
# -[ ] need to check what other mult-transport frameworks do
|
||||
# like zmq, nng, uri-spec et al!
|
||||
name_key: str = 'uds'
|
||||
address_type: type = tuple[str, int]
|
||||
proto_key: str = 'uds'
|
||||
unwrapped_type: type = tuple[str, int]
|
||||
def_bindspace: Path = get_rt_dir()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -239,11 +299,20 @@ class UDSAddress(Address[
|
|||
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
return True
|
||||
'''
|
||||
We block socket files not allocated under the runtime subdir.
|
||||
|
||||
'''
|
||||
return self.bindspace in self._filepath.parents
|
||||
|
||||
@property
|
||||
def namespace(self) -> None:
|
||||
return
|
||||
def bindspace(self) -> Path:
|
||||
'''
|
||||
We replicate the "ip-set-of-hosts" part of a UDS socket as
|
||||
just the sub-directory in which we allocate socket files.
|
||||
|
||||
'''
|
||||
return self.def_bindspace
|
||||
|
||||
@classmethod
|
||||
def from_addr(
|
||||
|
@ -259,17 +328,17 @@ class UDSAddress(Address[
|
|||
return (
|
||||
str(self._filepath),
|
||||
# XXX NOTE, since this gets passed DIRECTLY to
|
||||
# `trio.open_unix_
|
||||
# `open_unix_socket_w_passcred()` above!
|
||||
self._pid,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_random(
|
||||
cls,
|
||||
namespace: None = None, # unused
|
||||
bindspace: Path|None = None, # default netns
|
||||
) -> UDSAddress:
|
||||
|
||||
rt_dir: Path = get_rt_dir()
|
||||
bs: Path = bindspace or get_rt_dir()
|
||||
pid: int = os.getpid()
|
||||
actor: Actor|None = current_actor(
|
||||
err_on_no_runtime=False,
|
||||
|
@ -277,9 +346,12 @@ class UDSAddress(Address[
|
|||
if actor:
|
||||
sockname: str = '::'.join(actor.uid) + f'@{pid}'
|
||||
else:
|
||||
sockname: str = f'@{pid}'
|
||||
prefix: str = '<unknown-actor>'
|
||||
if is_root_process():
|
||||
prefix: str = 'root'
|
||||
sockname: str = f'{prefix}@{pid}'
|
||||
|
||||
sockpath: Path = Path(f'{rt_dir}/{sockname}.sock')
|
||||
sockpath: Path = Path(f'{bs}/{sockname}.sock')
|
||||
return UDSAddress(
|
||||
# filename=f'{tempfile.gettempdir()}/{uuid4()}.sock'
|
||||
filepath=sockpath,
|
||||
|
@ -314,28 +386,30 @@ class UDSAddress(Address[
|
|||
|
||||
return self._filepath == other._filepath
|
||||
|
||||
# TODO? remove right, it's never used?
|
||||
#
|
||||
# async def open_stream(
|
||||
# self,
|
||||
# **kwargs,
|
||||
# ) -> trio.SocketStream:
|
||||
# stream: trio.SocketStream = await trio.open_unix_socket(
|
||||
# self._filepath,
|
||||
# **kwargs
|
||||
# )
|
||||
# return stream
|
||||
|
||||
async def open_listener(self, **kwargs) -> trio.SocketListener:
|
||||
# async def open_listener(self, **kwargs) -> SocketListener:
|
||||
async def open_listener(
|
||||
self,
|
||||
**kwargs,
|
||||
) -> SocketListener:
|
||||
self._sock = socket.socket(
|
||||
socket.AF_UNIX,
|
||||
socket.SOCK_STREAM
|
||||
)
|
||||
log.info(
|
||||
f'Attempting to bind UDS socket\n'
|
||||
f'>[\n'
|
||||
f'|_{self}\n'
|
||||
)
|
||||
await self._sock.bind(self._filepath)
|
||||
self._sock.listen(1)
|
||||
return trio.SocketListener(self._sock)
|
||||
log.info(
|
||||
f'Listening on UDS socket\n'
|
||||
f'[>\n'
|
||||
f' |_{self}\n'
|
||||
)
|
||||
return SocketListener(self._sock)
|
||||
|
||||
async def close_listener(self):
|
||||
def close_listener(self):
|
||||
self._sock.close()
|
||||
os.unlink(self._filepath)
|
||||
|
||||
|
@ -349,50 +423,13 @@ _address_types: bidict[str, Type[Address]] = {
|
|||
}
|
||||
|
||||
|
||||
# TODO, can't we just use a type alias
|
||||
# for this? namely just some `tuple[str, int, str, str]`?
|
||||
#
|
||||
# -[ ] would also just be simpler to keep this as SockAddr[tuple]
|
||||
# or something, implying it's just a simple pair of values which can
|
||||
# presumably be mapped to all transports?
|
||||
# -[ ] `pydoc socket.socket.getsockname()` delivers a 4-tuple for
|
||||
# ipv6 `(hostaddr, port, flowinfo, scope_id)`.. so how should we
|
||||
# handle that?
|
||||
# -[ ] as a further alternative to this wrap()/unwrap() approach we
|
||||
# could just implement `enc/dec_hook()`s for the `Address`-types
|
||||
# and just deal with our internal objs directly and always and
|
||||
# leave it to the codec layer to figure out marshalling?
|
||||
# |_ would mean only one spot to do the `.unwrap()` (which we may
|
||||
# end up needing to call from the hook()s anyway?)
|
||||
# -[x] rename to `UnwrappedAddress[Descriptor]` ??
|
||||
# seems like the right name as per,
|
||||
# https://www.geeksforgeeks.org/introduction-to-address-descriptor/
|
||||
#
|
||||
UnwrappedAddress = Union[
|
||||
tuple[
|
||||
str, # (net/cgroup-)namespace/host-domain
|
||||
int, # (p)id/port
|
||||
] # tcp/udp/uds
|
||||
|
||||
# ?TODO? should we also include another 2 fields from
|
||||
# our `Aid` msg such that we include the runtime `Actor.uid`
|
||||
# of `.name` and `.uuid`?
|
||||
# - would ensure uniqueness across entire net?
|
||||
# - allows for easier runtime-level filtering of "actors by
|
||||
# service name"
|
||||
]
|
||||
|
||||
|
||||
# TODO! really these are discovery sys default addrs ONLY useful for
|
||||
# when none is provided to a root actor on first boot.
|
||||
_default_lo_addrs: dict[
|
||||
str,
|
||||
UnwrappedAddress
|
||||
] = {
|
||||
'tcp': TCPAddress(
|
||||
host='127.0.0.1',
|
||||
port=1616,
|
||||
).unwrap(),
|
||||
'tcp': TCPAddress.get_root().unwrap(),
|
||||
'uds': UDSAddress.get_root().unwrap(),
|
||||
}
|
||||
|
||||
|
@ -435,7 +472,7 @@ def wrap_address(
|
|||
|
||||
case None:
|
||||
cls: Type[Address] = get_address_cls(preferred_transport)
|
||||
addr: AddressType = cls.get_root().unwrap()
|
||||
addr: UnwrappedAddress = cls.get_root().unwrap()
|
||||
|
||||
case _:
|
||||
raise TypeError(
|
||||
|
|
|
@ -187,7 +187,7 @@ class Channel:
|
|||
f' uid={self.uid}\n'
|
||||
f'\n'
|
||||
f' |_msgstream: {tpt_name}\n'
|
||||
f' proto={tpt.laddr.name_key!r}\n'
|
||||
f' proto={tpt.laddr.proto_key!r}\n'
|
||||
f' layer={tpt.layer_key!r}\n'
|
||||
f' laddr={tpt.laddr}\n'
|
||||
f' raddr={tpt.raddr}\n'
|
||||
|
|
|
@ -51,7 +51,7 @@ class MsgpackTCPStream(MsgpackTransport):
|
|||
# choosing the routing prefix part.
|
||||
f'/ipv4/{host}'
|
||||
|
||||
f'/{self.address_type.name_key}/{port}'
|
||||
f'/{self.address_type.proto_key}/{port}'
|
||||
# f'/{self.chan.uid[0]}'
|
||||
# f'/{self.cid}'
|
||||
|
||||
|
|
|
@ -119,7 +119,7 @@ class MsgpackUDSStream(MsgpackTransport):
|
|||
|
||||
filepath: Path = Path(self.raddr.unwrap()[0])
|
||||
return (
|
||||
f'/{self.address_type.name_key}/{filepath}'
|
||||
f'/{self.address_type.proto_key}/{filepath}'
|
||||
# f'/{self.chan.uid[0]}'
|
||||
# f'/{self.cid}'
|
||||
|
||||
|
@ -190,14 +190,6 @@ class MsgpackUDSStream(MsgpackTransport):
|
|||
uid,
|
||||
gid,
|
||||
) = get_peer_info(sock)
|
||||
log.info(
|
||||
f'UDS connection from process {pid!r}\n'
|
||||
f'>[\n'
|
||||
f'|_{sock_path}\n'
|
||||
f' |_pid: {pid}\n'
|
||||
f' |_uid: {uid}\n'
|
||||
f' |_gid: {gid}\n'
|
||||
)
|
||||
laddr = UDSAddress.from_addr((
|
||||
sock_path,
|
||||
os.getpid(),
|
||||
|
|
Loading…
Reference in New Issue