diff --git a/tractor/_addr.py b/tractor/_addr.py index f59ad542..f6bb5e2a 100644 --- a/tractor/_addr.py +++ b/tractor/_addr.py @@ -14,20 +14,31 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations +from pathlib import Path import os -import tempfile +# import tempfile from uuid import uuid4 from typing import ( Protocol, ClassVar, TypeVar, Union, - Type + Type, + TYPE_CHECKING, ) +from bidict import bidict import trio from trio import socket +from ._state import ( + get_rt_dir, + current_actor, +) + +if TYPE_CHECKING: + from ._runtime import Actor + NamespaceType = TypeVar('NamespaceType') AddressType = TypeVar('AddressType') @@ -58,12 +69,24 @@ class Address(Protocol[ ... def unwrap(self) -> AddressType: + ''' + Deliver the underying minimum field set in + a primitive python data type-structure. + ''' ... @classmethod def get_random(cls, namespace: NamespaceType | None = None) -> Address: ... + # TODO, this should be something like a `.get_def_registar_addr()` + # or similar since, + # - it should be a **host singleton** (not root/tree singleton) + # - we **only need this value** when one isn't provided to the + # runtime at boot and we want to implicitly provide a host-wide + # registrar. + # - each rooted-actor-tree should likely have its own + # micro-registry (likely the root being it), also see @classmethod def get_root(cls) -> Address: ... @@ -74,8 +97,13 @@ class Address(Protocol[ def __eq__(self, other) -> bool: ... - async def open_stream(self, **kwargs) -> StreamType: - ... + # async def open_stream(self, **kwargs) -> StreamType: + # ''' + # Open a connection *TO* this address and deliver back a + # `trio.SocketStream` wrapping the underlying transport. + + # ''' + # ... async def open_listener(self, **kwargs) -> ListenerType: ... @@ -104,9 +132,12 @@ class TCPAddress(Address[ or not isinstance(port, int) ): - raise TypeError(f'Expected host {host} to be str and port {port} to be int') - self._host = host - self._port = port + raise TypeError( + f'Expected host {host!r} to be str and port {port!r} to be int' + ) + + self._host: str = host + self._port: int = port @property def is_valid(self) -> bool: @@ -117,14 +148,23 @@ class TCPAddress(Address[ return self._host @classmethod - def from_addr(cls, addr: tuple[str, int]) -> TCPAddress: + def from_addr( + cls, + addr: tuple[str, int] + ) -> TCPAddress: return TCPAddress(addr[0], addr[1]) def unwrap(self) -> tuple[str, int]: - return self._host, self._port + return ( + self._host, + self._port, + ) @classmethod - def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress: + def get_random( + cls, + namespace: str = '127.0.0.1', + ) -> TCPAddress: return TCPAddress(namespace, 0) @classmethod @@ -132,7 +172,9 @@ class TCPAddress(Address[ return TCPAddress('127.0.0.1', 1616) def __repr__(self) -> str: - return f'{type(self)} @ {self.unwrap()}' + return ( + f'{type(self).__name__}[{self.unwrap()}]' + ) def __eq__(self, other) -> bool: if not isinstance(other, TCPAddress): @@ -146,14 +188,14 @@ class TCPAddress(Address[ self._port == other._port ) - async def open_stream(self, **kwargs) -> trio.SocketStream: - stream = await trio.open_tcp_stream( - self._host, - self._port, - **kwargs - ) - self._host, self._port = stream.socket.getsockname()[:2] - return stream + # async def open_stream(self, **kwargs) -> trio.SocketStream: + # stream = await trio.open_tcp_stream( + # self._host, + # self._port, + # **kwargs + # ) + # self._host, self._port = stream.socket.getsockname()[:2] + # return stream async def open_listener(self, **kwargs) -> trio.SocketListener: listeners = await trio.open_tcp_listeners( @@ -177,14 +219,23 @@ class UDSAddress(Address[ trio.SocketListener ]): + # TODO, maybe we should use 'unix' instead? + # -[ ] need to check what other mult-transport frameworks do + # like zmq, nng, uri-spec et al! name_key: str = 'uds' - address_type: type = str + address_type: type = tuple[str, int] def __init__( self, - filepath: str + filepath: str|Path, + maybe_pid: int, + # ^XXX, in the sense you can also pass + # a "non-real-world-process-id" such as is handy to represent + # our host-local default "port-like" key for the very first + # root actor to create a registry address. ): - self._filepath = filepath + self._filepath: Path = Path(filepath).absolute() + self._pid: int = maybe_pid @property def is_valid(self) -> bool: @@ -195,22 +246,65 @@ class UDSAddress(Address[ return @classmethod - def from_addr(cls, filepath: str) -> UDSAddress: - return UDSAddress(filepath) + def from_addr( + cls, + addr: tuple[Path, int] + ) -> UDSAddress: + return UDSAddress( + filepath=addr[0], + maybe_pid=addr[1], + ) - def unwrap(self) -> str: - return self._filepath + def unwrap(self) -> tuple[Path, int]: + return ( + str(self._filepath), + # XXX NOTE, since this gets passed DIRECTLY to + # `trio.open_unix_ + self._pid, + ) @classmethod - def get_random(cls, namespace: None = None) -> UDSAddress: - return UDSAddress(f'{tempfile.gettempdir()}/{uuid4()}.sock') + def get_random( + cls, + namespace: None = None, # unused + ) -> UDSAddress: + + rt_dir: Path = get_rt_dir() + pid: int = os.getpid() + actor: Actor|None = current_actor( + err_on_no_runtime=False, + ) + if actor: + sockname: str = '::'.join(actor.uid) + f'@{pid}' + else: + sockname: str = f'@{pid}' + + sockpath: Path = Path(f'{rt_dir}/{sockname}.sock') + return UDSAddress( + # filename=f'{tempfile.gettempdir()}/{uuid4()}.sock' + filepath=sockpath, + maybe_pid=pid, + ) @classmethod def get_root(cls) -> Address: - return UDSAddress('tractor.sock') + def_uds_filepath: Path = ( + get_rt_dir() + / + 'registry@1616.sock' + ) + return UDSAddress( + filepath=def_uds_filepath, + maybe_pid=1616 + ) def __repr__(self) -> str: - return f'{type(self)} @ {self._filepath}' + return ( + f'{type(self).__name__}' + f'[' + f'({self._filepath}, {self._pid})' + f']' + ) def __eq__(self, other) -> bool: if not isinstance(other, UDSAddress): @@ -220,15 +314,23 @@ class UDSAddress(Address[ return self._filepath == other._filepath - async def open_stream(self, **kwargs) -> trio.SocketStream: - stream = await trio.open_unix_socket( - self._filepath, - **kwargs - ) - return stream + # TODO? remove right, it's never used? + # + # async def open_stream( + # self, + # **kwargs, + # ) -> trio.SocketStream: + # stream: trio.SocketStream = await trio.open_unix_socket( + # self._filepath, + # **kwargs + # ) + # return stream async def open_listener(self, **kwargs) -> trio.SocketListener: - self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._sock = socket.socket( + socket.AF_UNIX, + socket.SOCK_STREAM + ) await self._sock.bind(self._filepath) self._sock.listen(1) return trio.SocketListener(self._sock) @@ -238,72 +340,120 @@ class UDSAddress(Address[ os.unlink(self._filepath) -preferred_transport = 'uds' +preferred_transport: str = 'uds' -_address_types = ( - TCPAddress, - UDSAddress -) - - -_default_addrs: dict[str, Type[Address]] = { - cls.name_key: cls - for cls in _address_types +_address_types: bidict[str, Type[Address]] = { + 'tcp': TCPAddress, + 'uds': UDSAddress } -AddressTypes = Union[ - tuple([ - cls.address_type - for cls in _address_types - ]) +# TODO, can't we just use a type alias +# for this? namely just some `tuple[str, int, str, str]`? +# +# -[ ] would also just be simpler to keep this as SockAddr[tuple] +# or something, implying it's just a simple pair of values which can +# presumably be mapped to all transports? +# -[ ] `pydoc socket.socket.getsockname()` delivers a 4-tuple for +# ipv6 `(hostaddr, port, flowinfo, scope_id)`.. so how should we +# handle that? +# -[ ] as a further alternative to this wrap()/unwrap() approach we +# could just implement `enc/dec_hook()`s for the `Address`-types +# and just deal with our internal objs directly and always and +# leave it to the codec layer to figure out marshalling? +# |_ would mean only one spot to do the `.unwrap()` (which we may +# end up needing to call from the hook()s anyway?) +# -[x] rename to `UnwrappedAddress[Descriptor]` ?? +# seems like the right name as per, +# https://www.geeksforgeeks.org/introduction-to-address-descriptor/ +# +UnwrappedAddress = Union[ + tuple[ + str, # (net/cgroup-)namespace/host-domain + int, # (p)id/port + ] # tcp/udp/uds + + # ?TODO? should we also include another 2 fields from + # our `Aid` msg such that we include the runtime `Actor.uid` + # of `.name` and `.uuid`? + # - would ensure uniqueness across entire net? + # - allows for easier runtime-level filtering of "actors by + # service name" ] +# TODO! really these are discovery sys default addrs ONLY useful for +# when none is provided to a root actor on first boot. _default_lo_addrs: dict[ str, - AddressTypes + UnwrappedAddress ] = { - cls.name_key: cls.get_root().unwrap() - for cls in _address_types + 'tcp': TCPAddress( + host='127.0.0.1', + port=1616, + ).unwrap(), + 'uds': UDSAddress.get_root().unwrap(), } def get_address_cls(name: str) -> Type[Address]: - return _default_addrs[name] + return _address_types[name] def is_wrapped_addr(addr: any) -> bool: - return type(addr) in _address_types + return type(addr) in _address_types.values() -def wrap_address(addr: AddressTypes) -> Address: +def mk_uuid() -> str: + ''' + Encapsulate creation of a uuid4 as `str` as used + for creating `Actor.uid: tuple[str, str]` and/or + `.msg.types.Aid`. + + ''' + return str(uuid4()) + + +def wrap_address( + addr: UnwrappedAddress +) -> Address: if is_wrapped_addr(addr): return addr - cls = None + cls: Type|None = None match addr: - case str(): + case ( + str()|Path(), + int(), + ): cls = UDSAddress case tuple() | list(): cls = TCPAddress case None: - cls = get_address_cls(preferred_transport) - addr = cls.get_root().unwrap() + cls: Type[Address] = get_address_cls(preferred_transport) + addr: AddressType = cls.get_root().unwrap() case _: raise TypeError( - f'Can not wrap addr {addr} of type {type(addr)}' + f'Can not wrap address {type(addr)}\n' + f'{addr!r}\n' ) return cls.from_addr(addr) -def default_lo_addrs(transports: list[str]) -> list[AddressTypes]: +def default_lo_addrs( + transports: list[str], +) -> list[Type[Address]]: + ''' + Return the default, host-singleton, registry address + for an input transport key set. + + ''' return [ _default_lo_addrs[transport] for transport in transports diff --git a/tractor/_state.py b/tractor/_state.py index 79c8bdea..4cb7e784 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -14,14 +14,16 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Per process state +''' +Per actor-process runtime state mgmt APIs. -""" +''' from __future__ import annotations from contextvars import ( ContextVar, ) +import os +from pathlib import Path from typing import ( Any, TYPE_CHECKING, @@ -143,3 +145,22 @@ def current_ipc_ctx( f'|_{current_task()}\n' ) return ctx + + +# std ODE (mutable) app state location +_rtdir: Path = Path(os.environ['XDG_RUNTIME_DIR']) + + +def get_rt_dir( + subdir: str = 'tractor' +) -> Path: + ''' + Return the user "runtime dir" where most userspace apps stick + their IPC and cache related system util-files; we take hold + of a `'XDG_RUNTIME_DIR'/tractor/` subdir by default. + + ''' + rtdir: Path = _rtdir / subdir + if not rtdir.is_dir(): + rtdir.mkdir() + return rtdir