Rework/simplify transport addressing
A few things that can fundamentally change,
- UDS addresses now always encapsulate the local and remote pid such
  that it denotes each side's process much like a TCP *port*.
  |_ `.__init__()` takes a new `maybe_pid: int`.
  |_ this required changes to the `.ipc._uds` backend which will come in
     an subsequent commit!
  |_ `UDSAddress.address_type` becomes a `tuple[str, int]` just like the
      TCP case.
  |_ adjust `wrap_address()` to match.
- use a new `_state.get_rt_dir() -> Path` as the default location for
  UDS socket file: now under `XDG_RUNTIME_DIR'/tractor/` subdir by
  default.
- re-implement `USDAddress.get_random()` to use both the local
  `Actor.uid` (if available) and at least the pid for its socket file
  name.
Removals,
- drop the loop generated `_default_addrs`, simplify to just
  `_default_lo_addrs` for per-transport default registry addresses.
  |_ change to `_address_types: dict[str, Type[Address]]` instead of
     separate types `list`.
  |_ adjust `is_wrapped_addr()` to just check `in _addr_types.values()`.
- comment out `Address.open_stream()` it's unused and i think the wrong
  place for this API.
Renames,
- from `AddressTypes` -> `UnwrappedAddress`, since it's a simple type
  union and all this type set is, is the simple python data-structures
  we encode to for the wire.
  |_ see note about possibly implementing the `.[un]wrap()` stuff as
     `msgspec` codec `enc/dec_hook()`s instead!
Additions,
- add a `mk_uuid()` to be used throughout the runtime including for
  generating the `Aid.uuid` part.
- tons of notes around follow up refinements!
			
			
				pull/17/head
			
			
		
							parent
							
								
									1762b3eb64
								
							
						
					
					
						commit
						4a8a555bdf
					
				
							
								
								
									
										278
									
								
								tractor/_addr.py
								
								
								
								
							
							
						
						
									
										278
									
								
								tractor/_addr.py
								
								
								
								
							|  | @ -14,20 +14,31 @@ | |||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| from __future__ import annotations | ||||
| from pathlib import Path | ||||
| import os | ||||
| import tempfile | ||||
| # import tempfile | ||||
| from uuid import uuid4 | ||||
| from typing import ( | ||||
|     Protocol, | ||||
|     ClassVar, | ||||
|     TypeVar, | ||||
|     Union, | ||||
|     Type | ||||
|     Type, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
| from bidict import bidict | ||||
| import trio | ||||
| from trio import socket | ||||
| 
 | ||||
| from ._state import ( | ||||
|     get_rt_dir, | ||||
|     current_actor, | ||||
| ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._runtime import Actor | ||||
| 
 | ||||
| 
 | ||||
| NamespaceType = TypeVar('NamespaceType') | ||||
| AddressType = TypeVar('AddressType') | ||||
|  | @ -58,12 +69,24 @@ class Address(Protocol[ | |||
|         ... | ||||
| 
 | ||||
|     def unwrap(self) -> AddressType: | ||||
|         ''' | ||||
|         Deliver the underying minimum field set in | ||||
|         a primitive python data type-structure. | ||||
|         ''' | ||||
|         ... | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_random(cls, namespace: NamespaceType | None = None) -> Address: | ||||
|         ... | ||||
| 
 | ||||
|     # TODO, this should be something like a `.get_def_registar_addr()` | ||||
|     # or similar since, | ||||
|     # - it should be a **host singleton** (not root/tree singleton) | ||||
|     # - we **only need this value** when one isn't provided to the | ||||
|     #   runtime at boot and we want to implicitly provide a host-wide | ||||
|     #   registrar. | ||||
|     # - each rooted-actor-tree should likely have its own | ||||
|     #   micro-registry (likely the root being it), also see | ||||
|     @classmethod | ||||
|     def get_root(cls) -> Address: | ||||
|         ... | ||||
|  | @ -74,8 +97,13 @@ class Address(Protocol[ | |||
|     def __eq__(self, other) -> bool: | ||||
|         ... | ||||
| 
 | ||||
|     async def open_stream(self, **kwargs) -> StreamType: | ||||
|         ... | ||||
|     # async def open_stream(self, **kwargs) -> StreamType: | ||||
|     #     ''' | ||||
|     #     Open a connection *TO* this address and deliver back a | ||||
|     #     `trio.SocketStream` wrapping the underlying transport. | ||||
| 
 | ||||
|     #     ''' | ||||
|     #     ... | ||||
| 
 | ||||
|     async def open_listener(self, **kwargs) -> ListenerType: | ||||
|         ... | ||||
|  | @ -104,9 +132,12 @@ class TCPAddress(Address[ | |||
|             or | ||||
|             not isinstance(port, int) | ||||
|         ): | ||||
|             raise TypeError(f'Expected host {host} to be str and port {port} to be int') | ||||
|         self._host = host | ||||
|         self._port = port | ||||
|             raise TypeError( | ||||
|                 f'Expected host {host!r} to be str and port {port!r} to be int' | ||||
|             ) | ||||
| 
 | ||||
|         self._host: str = host | ||||
|         self._port: int = port | ||||
| 
 | ||||
|     @property | ||||
|     def is_valid(self) -> bool: | ||||
|  | @ -117,14 +148,23 @@ class TCPAddress(Address[ | |||
|         return self._host | ||||
| 
 | ||||
|     @classmethod | ||||
|     def from_addr(cls, addr: tuple[str, int]) -> TCPAddress: | ||||
|     def from_addr( | ||||
|         cls, | ||||
|         addr: tuple[str, int] | ||||
|     ) -> TCPAddress: | ||||
|         return TCPAddress(addr[0], addr[1]) | ||||
| 
 | ||||
|     def unwrap(self) -> tuple[str, int]: | ||||
|         return self._host, self._port | ||||
|         return ( | ||||
|             self._host, | ||||
|             self._port, | ||||
|         ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress: | ||||
|     def get_random( | ||||
|         cls, | ||||
|         namespace: str = '127.0.0.1', | ||||
|     ) -> TCPAddress: | ||||
|         return TCPAddress(namespace, 0) | ||||
| 
 | ||||
|     @classmethod | ||||
|  | @ -132,7 +172,9 @@ class TCPAddress(Address[ | |||
|         return TCPAddress('127.0.0.1', 1616) | ||||
| 
 | ||||
|     def __repr__(self) -> str: | ||||
|         return f'{type(self)} @ {self.unwrap()}' | ||||
|         return ( | ||||
|             f'{type(self).__name__}[{self.unwrap()}]' | ||||
|         ) | ||||
| 
 | ||||
|     def __eq__(self, other) -> bool: | ||||
|         if not isinstance(other, TCPAddress): | ||||
|  | @ -146,14 +188,14 @@ class TCPAddress(Address[ | |||
|             self._port == other._port | ||||
|         ) | ||||
| 
 | ||||
|     async def open_stream(self, **kwargs) -> trio.SocketStream: | ||||
|         stream = await trio.open_tcp_stream( | ||||
|             self._host, | ||||
|             self._port, | ||||
|             **kwargs | ||||
|         ) | ||||
|         self._host, self._port = stream.socket.getsockname()[:2] | ||||
|         return stream | ||||
|     # async def open_stream(self, **kwargs) -> trio.SocketStream: | ||||
|     #     stream = await trio.open_tcp_stream( | ||||
|     #         self._host, | ||||
|     #         self._port, | ||||
|     #         **kwargs | ||||
|     #     ) | ||||
|     #     self._host, self._port = stream.socket.getsockname()[:2] | ||||
|     #     return stream | ||||
| 
 | ||||
|     async def open_listener(self, **kwargs) -> trio.SocketListener: | ||||
|         listeners = await trio.open_tcp_listeners( | ||||
|  | @ -177,14 +219,23 @@ class UDSAddress(Address[ | |||
|     trio.SocketListener | ||||
| ]): | ||||
| 
 | ||||
|     # TODO, maybe we should use 'unix' instead? | ||||
|     # -[ ] need to check what other mult-transport frameworks do | ||||
|     #     like zmq, nng, uri-spec et al! | ||||
|     name_key: str = 'uds' | ||||
|     address_type: type = str | ||||
|     address_type: type = tuple[str, int] | ||||
| 
 | ||||
|     def __init__( | ||||
|         self, | ||||
|         filepath: str | ||||
|         filepath: str|Path, | ||||
|         maybe_pid: int, | ||||
|         # ^XXX, in the sense you can also pass | ||||
|         # a "non-real-world-process-id" such as is handy to represent | ||||
|         # our host-local default "port-like" key for the very first | ||||
|         # root actor to create a registry address. | ||||
|     ): | ||||
|         self._filepath = filepath | ||||
|         self._filepath: Path = Path(filepath).absolute() | ||||
|         self._pid: int = maybe_pid | ||||
| 
 | ||||
|     @property | ||||
|     def is_valid(self) -> bool: | ||||
|  | @ -195,22 +246,65 @@ class UDSAddress(Address[ | |||
|         return | ||||
| 
 | ||||
|     @classmethod | ||||
|     def from_addr(cls, filepath: str) -> UDSAddress: | ||||
|         return UDSAddress(filepath) | ||||
|     def from_addr( | ||||
|         cls, | ||||
|         addr: tuple[Path, int] | ||||
|     ) -> UDSAddress: | ||||
|         return UDSAddress( | ||||
|             filepath=addr[0], | ||||
|             maybe_pid=addr[1], | ||||
|         ) | ||||
| 
 | ||||
|     def unwrap(self) -> str: | ||||
|         return self._filepath | ||||
|     def unwrap(self) -> tuple[Path, int]: | ||||
|         return ( | ||||
|             str(self._filepath), | ||||
|             # XXX NOTE, since this gets passed DIRECTLY to | ||||
|             # `trio.open_unix_ | ||||
|             self._pid, | ||||
|         ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_random(cls, namespace: None = None) -> UDSAddress: | ||||
|         return UDSAddress(f'{tempfile.gettempdir()}/{uuid4()}.sock') | ||||
|     def get_random( | ||||
|         cls, | ||||
|         namespace: None = None,  # unused | ||||
|     ) -> UDSAddress: | ||||
| 
 | ||||
|         rt_dir: Path = get_rt_dir() | ||||
|         pid: int = os.getpid() | ||||
|         actor: Actor|None = current_actor( | ||||
|             err_on_no_runtime=False, | ||||
|         ) | ||||
|         if actor: | ||||
|             sockname: str = '::'.join(actor.uid) + f'@{pid}' | ||||
|         else: | ||||
|             sockname: str = f'@{pid}' | ||||
| 
 | ||||
|         sockpath: Path = Path(f'{rt_dir}/{sockname}.sock') | ||||
|         return UDSAddress( | ||||
|             # filename=f'{tempfile.gettempdir()}/{uuid4()}.sock' | ||||
|             filepath=sockpath, | ||||
|             maybe_pid=pid, | ||||
|         ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_root(cls) -> Address: | ||||
|         return UDSAddress('tractor.sock') | ||||
|         def_uds_filepath: Path = ( | ||||
|             get_rt_dir() | ||||
|             / | ||||
|             'registry@1616.sock' | ||||
|         ) | ||||
|         return UDSAddress( | ||||
|             filepath=def_uds_filepath, | ||||
|             maybe_pid=1616 | ||||
|         ) | ||||
| 
 | ||||
|     def __repr__(self) -> str: | ||||
|         return f'{type(self)} @ {self._filepath}' | ||||
|         return ( | ||||
|             f'{type(self).__name__}' | ||||
|             f'[' | ||||
|             f'({self._filepath}, {self._pid})' | ||||
|             f']' | ||||
|         ) | ||||
| 
 | ||||
|     def __eq__(self, other) -> bool: | ||||
|         if not isinstance(other, UDSAddress): | ||||
|  | @ -220,15 +314,23 @@ class UDSAddress(Address[ | |||
| 
 | ||||
|         return self._filepath == other._filepath | ||||
| 
 | ||||
|     async def open_stream(self, **kwargs) -> trio.SocketStream: | ||||
|         stream = await trio.open_unix_socket( | ||||
|             self._filepath, | ||||
|             **kwargs | ||||
|         ) | ||||
|         return stream | ||||
|     # TODO? remove right, it's never used? | ||||
|     # | ||||
|     # async def open_stream( | ||||
|     #     self, | ||||
|     #     **kwargs, | ||||
|     # ) -> trio.SocketStream: | ||||
|     #     stream: trio.SocketStream = await trio.open_unix_socket( | ||||
|     #         self._filepath, | ||||
|     #         **kwargs | ||||
|     #     ) | ||||
|     #     return stream | ||||
| 
 | ||||
|     async def open_listener(self, **kwargs) -> trio.SocketListener: | ||||
|         self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||||
|         self._sock = socket.socket( | ||||
|             socket.AF_UNIX, | ||||
|             socket.SOCK_STREAM | ||||
|         ) | ||||
|         await self._sock.bind(self._filepath) | ||||
|         self._sock.listen(1) | ||||
|         return trio.SocketListener(self._sock) | ||||
|  | @ -238,72 +340,120 @@ class UDSAddress(Address[ | |||
|         os.unlink(self._filepath) | ||||
| 
 | ||||
| 
 | ||||
| preferred_transport = 'uds' | ||||
| preferred_transport: str = 'uds' | ||||
| 
 | ||||
| 
 | ||||
| _address_types = ( | ||||
|     TCPAddress, | ||||
|     UDSAddress | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| _default_addrs: dict[str, Type[Address]] = { | ||||
|     cls.name_key: cls | ||||
|     for cls in _address_types | ||||
| _address_types: bidict[str, Type[Address]] = { | ||||
|     'tcp': TCPAddress, | ||||
|     'uds': UDSAddress | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| AddressTypes = Union[ | ||||
|     tuple([ | ||||
|         cls.address_type | ||||
|         for cls in _address_types | ||||
|     ]) | ||||
| # TODO, can't we just use a type alias | ||||
| # for this? namely just some `tuple[str, int, str, str]`? | ||||
| # | ||||
| # -[ ] would also just be simpler to keep this as SockAddr[tuple] | ||||
| #     or something, implying it's just a simple pair of values which can | ||||
| #     presumably be mapped to all transports? | ||||
| # -[ ] `pydoc socket.socket.getsockname()` delivers a 4-tuple for | ||||
| #     ipv6 `(hostaddr, port, flowinfo, scope_id)`.. so how should we | ||||
| #     handle that? | ||||
| # -[ ] as a further alternative to this wrap()/unwrap() approach we | ||||
| #     could just implement `enc/dec_hook()`s for the `Address`-types | ||||
| #     and just deal with our internal objs directly and always and | ||||
| #     leave it to the codec layer to figure out marshalling? | ||||
| #    |_ would mean only one spot to do the `.unwrap()` (which we may | ||||
| #       end up needing to call from the hook()s anyway?) | ||||
| # -[x] rename to `UnwrappedAddress[Descriptor]` ?? | ||||
| #    seems like the right name as per, | ||||
| #    https://www.geeksforgeeks.org/introduction-to-address-descriptor/ | ||||
| # | ||||
| UnwrappedAddress = Union[ | ||||
|     tuple[ | ||||
|         str,  # (net/cgroup-)namespace/host-domain | ||||
|         int,  # (p)id/port | ||||
|     ]  # tcp/udp/uds | ||||
| 
 | ||||
|     # ?TODO? should we also include another 2 fields from | ||||
|     # our `Aid` msg such that we include the runtime `Actor.uid` | ||||
|     # of `.name` and `.uuid`? | ||||
|     # - would ensure uniqueness across entire net? | ||||
|     # - allows for easier runtime-level filtering of "actors by | ||||
|     #   service name" | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| # TODO! really these are discovery sys default addrs ONLY useful for | ||||
| # when none is provided to a root actor on first boot. | ||||
| _default_lo_addrs: dict[ | ||||
|     str, | ||||
|     AddressTypes | ||||
|     UnwrappedAddress | ||||
| ] = { | ||||
|     cls.name_key: cls.get_root().unwrap() | ||||
|     for cls in _address_types | ||||
|     'tcp': TCPAddress( | ||||
|         host='127.0.0.1', | ||||
|         port=1616, | ||||
|     ).unwrap(), | ||||
|     'uds': UDSAddress.get_root().unwrap(), | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def get_address_cls(name: str) -> Type[Address]: | ||||
|     return _default_addrs[name] | ||||
|     return _address_types[name] | ||||
| 
 | ||||
| 
 | ||||
| def is_wrapped_addr(addr: any) -> bool: | ||||
|     return type(addr) in _address_types | ||||
|     return type(addr) in _address_types.values() | ||||
| 
 | ||||
| 
 | ||||
| def wrap_address(addr: AddressTypes) -> Address: | ||||
| def mk_uuid() -> str: | ||||
|     ''' | ||||
|     Encapsulate creation of a uuid4 as `str` as used | ||||
|     for creating `Actor.uid: tuple[str, str]` and/or | ||||
|     `.msg.types.Aid`. | ||||
| 
 | ||||
|     ''' | ||||
|     return str(uuid4()) | ||||
| 
 | ||||
| 
 | ||||
| def wrap_address( | ||||
|     addr: UnwrappedAddress | ||||
| ) -> Address: | ||||
| 
 | ||||
|     if is_wrapped_addr(addr): | ||||
|         return addr | ||||
| 
 | ||||
|     cls = None | ||||
|     cls: Type|None = None | ||||
|     match addr: | ||||
|         case str(): | ||||
|         case ( | ||||
|             str()|Path(), | ||||
|             int(), | ||||
|         ): | ||||
|             cls = UDSAddress | ||||
| 
 | ||||
|         case tuple() | list(): | ||||
|             cls = TCPAddress | ||||
| 
 | ||||
|         case None: | ||||
|             cls = get_address_cls(preferred_transport) | ||||
|             addr = cls.get_root().unwrap() | ||||
|             cls: Type[Address] = get_address_cls(preferred_transport) | ||||
|             addr: AddressType = cls.get_root().unwrap() | ||||
| 
 | ||||
|         case _: | ||||
|             raise TypeError( | ||||
|                 f'Can not wrap addr {addr} of type {type(addr)}' | ||||
|                 f'Can not wrap address {type(addr)}\n' | ||||
|                 f'{addr!r}\n' | ||||
|             ) | ||||
| 
 | ||||
|     return cls.from_addr(addr) | ||||
| 
 | ||||
| 
 | ||||
| def default_lo_addrs(transports: list[str]) -> list[AddressTypes]: | ||||
| def default_lo_addrs( | ||||
|     transports: list[str], | ||||
| ) -> list[Type[Address]]: | ||||
|     ''' | ||||
|     Return the default, host-singleton, registry address | ||||
|     for an input transport key set. | ||||
| 
 | ||||
|     ''' | ||||
|     return [ | ||||
|         _default_lo_addrs[transport] | ||||
|         for transport in transports | ||||
|  |  | |||
|  | @ -14,14 +14,16 @@ | |||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| """ | ||||
| Per process state | ||||
| ''' | ||||
| Per actor-process runtime state mgmt APIs. | ||||
| 
 | ||||
| """ | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextvars import ( | ||||
|     ContextVar, | ||||
| ) | ||||
| import os | ||||
| from pathlib import Path | ||||
| from typing import ( | ||||
|     Any, | ||||
|     TYPE_CHECKING, | ||||
|  | @ -143,3 +145,22 @@ def current_ipc_ctx( | |||
|             f'|_{current_task()}\n' | ||||
|         ) | ||||
|     return ctx | ||||
| 
 | ||||
| 
 | ||||
| # std ODE (mutable) app state location | ||||
| _rtdir: Path = Path(os.environ['XDG_RUNTIME_DIR']) | ||||
| 
 | ||||
| 
 | ||||
| def get_rt_dir( | ||||
|     subdir: str = 'tractor' | ||||
| ) -> Path: | ||||
|     ''' | ||||
|     Return the user "runtime dir" where most userspace apps stick | ||||
|     their IPC and cache related system util-files; we take hold | ||||
|     of a `'XDG_RUNTIME_DIR'/tractor/` subdir by default. | ||||
| 
 | ||||
|     ''' | ||||
|     rtdir: Path = _rtdir / subdir | ||||
|     if not rtdir.is_dir(): | ||||
|         rtdir.mkdir() | ||||
|     return rtdir | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue