Compare commits

..

No commits in common. "2d6b3922a6d614e4611ef3af9d2dac9f8b017ed9" and "7d537e60cce8598540841419709fb691230487c3" have entirely different histories.

7 changed files with 60 additions and 96 deletions

View File

@ -40,7 +40,6 @@ from ._state import (
get_rt_dir, get_rt_dir,
current_actor, current_actor,
is_root_process, is_root_process,
_def_tpt_proto,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -313,18 +312,10 @@ class UDSAddress(Address):
# root actor to create a registry address. # root actor to create a registry address.
maybe_pid: int|None = None, maybe_pid: int|None = None,
): ):
fdir = self._filedir = Path( fdir = self._filedir = Path(filedir or self.def_bindspace).absolute()
filedir fpath = self._filepath = Path(filename)
or
self.def_bindspace
).absolute()
fpath = self._filename = Path(filename)
fp: Path = fdir / fpath fp: Path = fdir / fpath
assert ( assert fp.is_absolute()
fp.is_absolute()
and
fp == self.sockpath
)
# to track which "side" is the peer process by reading socket # to track which "side" is the peer process by reading socket
# credentials-info. # credentials-info.
@ -332,7 +323,7 @@ class UDSAddress(Address):
@property @property
def sockpath(self) -> Path: def sockpath(self) -> Path:
return self._filedir / self._filename return self._filedir / self._filepath
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
@ -355,20 +346,19 @@ class UDSAddress(Address):
def from_addr( def from_addr(
cls, cls,
addr: ( addr: (
tuple[Path|str, Path|str]|Path|str tuple[Path|str|None, int]
|Path|str
), ),
) -> UDSAddress: ) -> UDSAddress:
match addr: match addr:
case tuple()|list(): case tuple()|list():
filedir = Path(addr[0]) sockpath: Path = Path(addr[0])
filename = Path(addr[1]) filedir, filename = unwrap_sockpath(sockpath)
# sockpath: Path = Path(addr[0]) pid: int = addr[1]
# filedir, filename = unwrap_sockpath(sockpath)
# pid: int = addr[1]
return UDSAddress( return UDSAddress(
filedir=filedir, filedir=filedir,
filename=filename, filename=filename,
# maybe_pid=pid, maybe_pid=pid,
) )
# NOTE, in case we ever decide to just `.unwrap()` # NOTE, in case we ever decide to just `.unwrap()`
# to a `Path|str`? # to a `Path|str`?
@ -386,8 +376,8 @@ class UDSAddress(Address):
# XXX NOTE, since this gets passed DIRECTLY to # XXX NOTE, since this gets passed DIRECTLY to
# `.ipc._uds.open_unix_socket_w_passcred()` # `.ipc._uds.open_unix_socket_w_passcred()`
return ( return (
str(self._filedir), str(self.sockpath),
str(self._filename), self._pid,
) )
@classmethod @classmethod
@ -418,18 +408,18 @@ class UDSAddress(Address):
@classmethod @classmethod
def get_root(cls) -> Address: def get_root(cls) -> Address:
def_uds_filename: Path = 'registry@1616.sock' def_uds_filepath: Path = 'registry@1616.sock'
return UDSAddress( return UDSAddress(
filedir=None, filedir=None,
filename=def_uds_filename, filename=def_uds_filepath,
# maybe_pid=1616, maybe_pid=1616,
) )
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return (
f'{type(self).__name__}' f'{type(self).__name__}'
f'[' f'['
f'({self._filedir}, {self._filename})' f'({self.sockpath}, {self._pid})'
f']' f']'
) )
@ -439,7 +429,7 @@ class UDSAddress(Address):
f'Can not compare {type(other)} with {type(self)}' f'Can not compare {type(other)} with {type(self)}'
) )
return self.sockpath == other.sockpath return self._filepath == other._filepath
# async def open_listener(self, **kwargs) -> SocketListener: # async def open_listener(self, **kwargs) -> SocketListener:
async def open_listener( async def open_listener(
@ -471,6 +461,9 @@ class UDSAddress(Address):
os.unlink(self.sockpath) os.unlink(self.sockpath)
preferred_transport: str = 'uds'
_address_types: bidict[str, Type[Address]] = { _address_types: bidict[str, Type[Address]] = {
'tcp': TCPAddress, 'tcp': TCPAddress,
'uds': UDSAddress 'uds': UDSAddress
@ -529,6 +522,14 @@ def wrap_address(
# if 'sock' in addr[0]: # if 'sock' in addr[0]:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
match addr: match addr:
# TODO! BUT THIS WILL MATCH FOR TCP !...
# -[ ] so prolly go back to what guille had orig XD
# a plain ol' `str`?
case ((
str()|Path(),
int(),
)):
cls = UDSAddress
# classic network socket-address as tuple/list # classic network socket-address as tuple/list
case ( case (
@ -538,32 +539,21 @@ def wrap_address(
): ):
cls = TCPAddress cls = TCPAddress
case (
# (str()|Path(), str()|Path()),
# ^TODO? uhh why doesn't this work!?
(_, filename)
) if type(filename) is str:
cls = UDSAddress
# likely an unset UDS or TCP reg address as defaulted in # likely an unset UDS or TCP reg address as defaulted in
# `_state._runtime_vars['_root_mailbox']` # `_state._runtime_vars['_root_mailbox']`
#
# TODO? figure out when/if we even need this?
case ( case (
None None
| |
[None, None] [None, None]
): ):
cls: Type[Address] = get_address_cls(_def_tpt_proto) cls: Type[Address] = get_address_cls(preferred_transport)
addr: UnwrappedAddress = cls.get_root().unwrap() addr: UnwrappedAddress = cls.get_root().unwrap()
case _: case _:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
raise TypeError( raise TypeError(
f'Can not wrap unwrapped-address ??\n' f'Can not wrap address {type(addr)}\n'
f'type(addr): {type(addr)!r}\n' f'{addr!r}\n'
f'addr: {addr!r}\n'
) )
return cls.from_addr(addr) return cls.from_addr(addr)

View File

@ -33,6 +33,7 @@ from .ipc import _connect_chan, Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, UnwrappedAddress,
Address, Address,
preferred_transport,
wrap_address wrap_address
) )
from ._portal import ( from ._portal import (
@ -43,7 +44,6 @@ from ._portal import (
from ._state import ( from ._state import (
current_actor, current_actor,
_runtime_vars, _runtime_vars,
_def_tpt_proto,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -203,7 +203,7 @@ async def maybe_open_portal(
async def find_actor( async def find_actor(
name: str, name: str,
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[UnwrappedAddress]|None = None,
enable_transports: list[str] = [_def_tpt_proto], enable_transports: list[str] = [preferred_transport],
only_first: bool = True, only_first: bool = True,
raise_on_none: bool = False, raise_on_none: bool = False,

View File

@ -56,6 +56,7 @@ from ._addr import (
UnwrappedAddress, UnwrappedAddress,
default_lo_addrs, default_lo_addrs,
mk_uuid, mk_uuid,
preferred_transport,
wrap_address, wrap_address,
) )
from ._exceptions import ( from ._exceptions import (
@ -138,7 +139,6 @@ async def maybe_block_bp(
os.environ.pop('PYTHONBREAKPOINT', None) os.environ.pop('PYTHONBREAKPOINT', None)
@acm @acm
async def open_root_actor( async def open_root_actor(
*, *,
@ -148,9 +148,7 @@ async def open_root_actor(
# defaults are above # defaults are above
arbiter_addr: tuple[UnwrappedAddress]|None = None, arbiter_addr: tuple[UnwrappedAddress]|None = None,
enable_transports: list[ enable_transports: list[str] = [preferred_transport],
_state.TransportProtocolKey,
] = [_state._def_tpt_proto],
name: str|None = 'root', name: str|None = 'root',

View File

@ -80,6 +80,7 @@ from ._addr import (
Address, Address,
default_lo_addrs, default_lo_addrs,
get_address_cls, get_address_cls,
preferred_transport,
wrap_address, wrap_address,
) )
from ._context import ( from ._context import (
@ -1321,9 +1322,7 @@ class Actor:
''' '''
if listen_addrs is None: if listen_addrs is None:
listen_addrs = default_lo_addrs([ listen_addrs = default_lo_addrs([preferred_transport])
_state._def_tpt_proto
])
else: else:
listen_addrs: list[Address] = [ listen_addrs: list[Address] = [
@ -1847,7 +1846,7 @@ async def async_main(
enable_transports: list[str] = ( enable_transports: list[str] = (
maybe_preferred_transports_says_rent maybe_preferred_transports_says_rent
or or
[_state._def_tpt_proto] [preferred_transport]
) )
for transport_key in enable_transports: for transport_key in enable_transports:
transport_cls: Type[Address] = get_address_cls( transport_cls: Type[Address] = get_address_cls(
@ -2135,15 +2134,15 @@ async def async_main(
log.info(teardown_report) log.info(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`! # TODO: rename to `Registry` and move to `._discovery`!
class Arbiter(Actor): class Arbiter(Actor):
''' '''
A special registrar (and for now..) `Actor` who can contact all A special registrar actor who can contact all other actors
other actors within its immediate process tree and possibly keeps within its immediate process tree and possibly keeps a registry
a registry of others meant to be discoverable in a distributed of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor" and application. Normally the registrar is also the "root actor"
thus always has access to the top-most-level actor (process) and thus always has access to the top-most-level actor
nursery. (process) nursery.
By default, the registrar is always initialized when and if no By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime other registrar socket addrs have been specified to runtime
@ -2163,12 +2162,6 @@ class Arbiter(Actor):
''' '''
is_arbiter = True is_arbiter = True
# TODO, implement this as a read on there existing a `._state` of
# some sort setup by whenever we impl this all as
# a `.discovery._registry.open_registry()` API
def is_registry(self) -> bool:
return self.is_arbiter
def __init__( def __init__(
self, self,
*args, *args,

View File

@ -26,7 +26,6 @@ import os
from pathlib import Path from pathlib import Path
from typing import ( from typing import (
Any, Any,
Literal,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -165,11 +164,3 @@ def get_rt_dir(
if not rtdir.is_dir(): if not rtdir.is_dir():
rtdir.mkdir() rtdir.mkdir()
return rtdir return rtdir
# default IPC transport protocol settings
TransportProtocolKey = Literal[
'tcp',
'uds',
]
_def_tpt_proto: TransportProtocolKey = 'tcp'

View File

@ -34,6 +34,7 @@ import trio
from .devx._debug import maybe_wait_for_debugger from .devx._debug import maybe_wait_for_debugger
from ._addr import ( from ._addr import (
UnwrappedAddress, UnwrappedAddress,
preferred_transport,
mk_uuid, mk_uuid,
) )
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process
@ -44,9 +45,7 @@ from ._exceptions import (
is_multi_cancelled, is_multi_cancelled,
ContextCancelled, ContextCancelled,
) )
from ._root import ( from ._root import open_root_actor
open_root_actor,
)
from . import _state from . import _state
from . import _spawn from . import _spawn
@ -139,7 +138,7 @@ class ActorNursery:
bind_addrs: list[UnwrappedAddress]|None = None, bind_addrs: list[UnwrappedAddress]|None = None,
rpc_module_paths: list[str]|None = None, rpc_module_paths: list[str]|None = None,
enable_transports: list[str] = [_state._def_tpt_proto], enable_transports: list[str] = [preferred_transport],
enable_modules: list[str]|None = None, enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor loglevel: str|None = None, # set log level per subactor
debug_mode: bool|None = None, debug_mode: bool|None = None,

View File

@ -38,10 +38,7 @@ from trio._highlevel_open_unix_stream import (
from tractor.msg import MsgCodec from tractor.msg import MsgCodec
from tractor.log import get_logger from tractor.log import get_logger
from tractor._addr import ( from tractor._addr import UDSAddress
UDSAddress,
unwrap_sockpath,
)
from tractor.ipc._transport import MsgpackTransport from tractor.ipc._transport import MsgpackTransport
@ -197,20 +194,16 @@ class MsgpackUDSStream(MsgpackTransport):
case (bytes(), str()): case (bytes(), str()):
sock_path: Path = Path(sockname) sock_path: Path = Path(sockname)
( (
peer_pid, pid,
_, uid,
_, gid,
) = get_peer_info(sock) ) = get_peer_info(sock)
laddr = UDSAddress.from_addr((
filedir, filename = unwrap_sockpath(sock_path) sock_path,
laddr = UDSAddress( os.getpid(),
filedir=filedir, ))
filename=filename, raddr = UDSAddress.from_addr((
maybe_pid=os.getpid(), sock_path,
) pid
raddr = UDSAddress( ))
filedir=filedir,
filename=filename,
maybe_pid=peer_pid
)
return (laddr, raddr) return (laddr, raddr)