Compare commits

..

No commits in common. "6a5ccc2425e63be67745877a60d024c052138381" and "9de192390a47a3571989f70d28dd652c6186b034" have entirely different histories.

9 changed files with 139 additions and 303 deletions

View File

@ -31,7 +31,7 @@ from tractor.log import get_logger
from .trionics import gather_contexts from .trionics import gather_contexts
from .ipc import _connect_chan, Channel from .ipc import _connect_chan, Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, AddressTypes,
Address, Address,
preferred_transport, preferred_transport,
wrap_address wrap_address
@ -54,9 +54,7 @@ log = get_logger(__name__)
@acm @acm
async def get_registry( async def get_registry(addr: AddressTypes | None = None) -> AsyncGenerator[
addr: UnwrappedAddress|None = None,
) -> AsyncGenerator[
Portal | LocalPortal | None, Portal | LocalPortal | None,
None, None,
]: ]:
@ -73,9 +71,7 @@ async def get_registry(
# (likely a re-entrant call from the arbiter actor) # (likely a re-entrant call from the arbiter actor)
yield LocalPortal( yield LocalPortal(
actor, actor,
Channel(transport=None) await Channel.from_addr(addr)
# ^XXX, we DO NOT actually provide nor connect an
# underlying transport since this is merely an API shim.
) )
else: else:
# TODO: try to look pre-existing connection from # TODO: try to look pre-existing connection from
@ -139,10 +135,10 @@ def get_peer_by_name(
@acm @acm
async def query_actor( async def query_actor(
name: str, name: str,
regaddr: UnwrappedAddress|None = None, regaddr: AddressTypes|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
UnwrappedAddress|None, AddressTypes|None,
None, None,
]: ]:
''' '''
@ -172,7 +168,7 @@ async def query_actor(
async with get_registry(regaddr) as reg_portal: async with get_registry(regaddr) as reg_portal:
# TODO: return portals to all available actors - for now # TODO: return portals to all available actors - for now
# just the last one that registered # just the last one that registered
addr: UnwrappedAddress = await reg_portal.run_from_ns( addr: AddressTypes = await reg_portal.run_from_ns(
'self', 'self',
'find_actor', 'find_actor',
name=name, name=name,
@ -182,7 +178,7 @@ async def query_actor(
@acm @acm
async def maybe_open_portal( async def maybe_open_portal(
addr: UnwrappedAddress, addr: AddressTypes,
name: str, name: str,
): ):
async with query_actor( async with query_actor(
@ -202,7 +198,7 @@ async def maybe_open_portal(
@acm @acm
async def find_actor( async def find_actor(
name: str, name: str,
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[AddressTypes]|None = None,
enable_transports: list[str] = [preferred_transport], enable_transports: list[str] = [preferred_transport],
only_first: bool = True, only_first: bool = True,
@ -238,7 +234,7 @@ async def find_actor(
) )
maybe_portals: list[ maybe_portals: list[
AsyncContextManager[UnwrappedAddress] AsyncContextManager[AddressTypes]
] = list( ] = list(
maybe_open_portal( maybe_open_portal(
addr=addr, addr=addr,
@ -280,7 +276,7 @@ async def find_actor(
@acm @acm
async def wait_for_actor( async def wait_for_actor(
name: str, name: str,
registry_addr: UnwrappedAddress | None = None, registry_addr: AddressTypes | None = None,
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
''' '''
@ -297,7 +293,7 @@ async def wait_for_actor(
yield peer_portal yield peer_portal
return return
regaddr: UnwrappedAddress = ( regaddr: AddressTypes = (
registry_addr registry_addr
or or
actor.reg_addrs[0] actor.reg_addrs[0]
@ -314,7 +310,7 @@ async def wait_for_actor(
# get latest registered addr by default? # get latest registered addr by default?
# TODO: offer multi-portal yields in multi-homed case? # TODO: offer multi-portal yields in multi-homed case?
addr: UnwrappedAddress = addrs[-1] addr: AddressTypes = addrs[-1]
async with _connect_chan(addr) as chan: async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:

View File

@ -37,7 +37,7 @@ from .log import (
from . import _state from . import _state
from .devx import _debug from .devx import _debug
from .to_asyncio import run_as_asyncio_guest from .to_asyncio import run_as_asyncio_guest
from ._addr import UnwrappedAddress from ._addr import AddressTypes
from ._runtime import ( from ._runtime import (
async_main, async_main,
Actor, Actor,
@ -53,10 +53,10 @@ log = get_logger(__name__)
def _mp_main( def _mp_main(
actor: Actor, actor: Actor,
accept_addrs: list[UnwrappedAddress], accept_addrs: list[AddressTypes],
forkserver_info: tuple[Any, Any, Any, Any, Any], forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey, start_method: SpawnMethodKey,
parent_addr: UnwrappedAddress | None = None, parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> None: ) -> None:
@ -207,7 +207,7 @@ def nest_from_op(
def _trio_main( def _trio_main(
actor: Actor, actor: Actor,
*, *,
parent_addr: UnwrappedAddress|None = None, parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> None: ) -> None:

View File

@ -47,11 +47,10 @@ from .ipc import (
_connect_chan, _connect_chan,
) )
from ._addr import ( from ._addr import (
UnwrappedAddress, AddressTypes,
default_lo_addrs,
mk_uuid,
preferred_transport,
wrap_address, wrap_address,
preferred_transport,
default_lo_addrs
) )
from ._exceptions import is_multi_cancelled from ._exceptions import is_multi_cancelled
@ -64,10 +63,10 @@ async def open_root_actor(
*, *,
# defaults are above # defaults are above
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[AddressTypes]|None = None,
# defaults are above # defaults are above
arbiter_addr: tuple[UnwrappedAddress]|None = None, arbiter_addr: tuple[AddressTypes]|None = None,
enable_transports: list[str] = [preferred_transport], enable_transports: list[str] = [preferred_transport],
@ -196,9 +195,7 @@ async def open_root_actor(
registry_addrs = [arbiter_addr] registry_addrs = [arbiter_addr]
if not registry_addrs: if not registry_addrs:
registry_addrs: list[UnwrappedAddress] = default_lo_addrs( registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports)
enable_transports
)
assert registry_addrs assert registry_addrs
@ -248,10 +245,10 @@ async def open_root_actor(
enable_stack_on_sig() enable_stack_on_sig()
# closed into below ping task-func # closed into below ping task-func
ponged_addrs: list[UnwrappedAddress] = [] ponged_addrs: list[AddressTypes] = []
async def ping_tpt_socket( async def ping_tpt_socket(
addr: UnwrappedAddress, addr: AddressTypes,
timeout: float = 1, timeout: float = 1,
) -> None: ) -> None:
''' '''
@ -287,7 +284,7 @@ async def open_root_actor(
addr, addr,
) )
trans_bind_addrs: list[UnwrappedAddress] = [] trans_bind_addrs: list[AddressTypes] = []
# Create a new local root-actor instance which IS NOT THE # Create a new local root-actor instance which IS NOT THE
# REGISTRAR # REGISTRAR
@ -305,7 +302,6 @@ async def open_root_actor(
actor = Actor( actor = Actor(
name=name or 'anonymous', name=name or 'anonymous',
uuid=mk_uuid(),
registry_addrs=ponged_addrs, registry_addrs=ponged_addrs,
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
@ -340,8 +336,7 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/issues/296 # https://github.com/goodboy/tractor/issues/296
actor = Arbiter( actor = Arbiter(
name=name or 'registrar', name or 'registrar',
uuid=mk_uuid(),
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
loglevel=loglevel, loglevel=loglevel,
enable_modules=enable_modules, enable_modules=enable_modules,
@ -467,7 +462,7 @@ def run_daemon(
# runtime kwargs # runtime kwargs
name: str | None = 'root', name: str | None = 'root',
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[AddressTypes]|None = None,
start_method: str | None = None, start_method: str | None = None,
debug_mode: bool = False, debug_mode: bool = False,

View File

@ -52,7 +52,6 @@ import sys
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Type,
TYPE_CHECKING, TYPE_CHECKING,
) )
import uuid import uuid
@ -76,12 +75,11 @@ from tractor.msg import (
) )
from .ipc import Channel from .ipc import Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, AddressTypes,
Address, Address,
default_lo_addrs,
get_address_cls,
preferred_transport,
wrap_address, wrap_address,
preferred_transport,
default_lo_addrs
) )
from ._context import ( from ._context import (
mk_context, mk_context,
@ -184,15 +182,15 @@ class Actor:
def __init__( def __init__(
self, self,
name: str, name: str,
uuid: str,
*, *,
enable_modules: list[str] = [], enable_modules: list[str] = [],
uid: str|None = None,
loglevel: str|None = None, loglevel: str|None = None,
registry_addrs: list[UnwrappedAddress]|None = None, registry_addrs: list[AddressTypes]|None = None,
spawn_method: str|None = None, spawn_method: str|None = None,
# TODO: remove! # TODO: remove!
arbiter_addr: UnwrappedAddress|None = None, arbiter_addr: AddressTypes|None = None,
) -> None: ) -> None:
''' '''
@ -201,7 +199,10 @@ class Actor:
''' '''
self.name = name self.name = name
self.uid = (name, uuid) self.uid = (
name,
uid or str(uuid.uuid4())
)
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple]|None = None self._cancel_called_by_remote: tuple[str, tuple]|None = None
@ -229,7 +230,7 @@ class Actor:
DeprecationWarning, DeprecationWarning,
stacklevel=2, stacklevel=2,
) )
registry_addrs: list[UnwrappedAddress] = [arbiter_addr] registry_addrs: list[AddressTypes] = [arbiter_addr]
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -276,13 +277,13 @@ class Actor:
# when provided, init the registry addresses property from # when provided, init the registry addresses property from
# input via the validator. # input via the validator.
self._reg_addrs: list[UnwrappedAddress] = [] self._reg_addrs: list[AddressTypes] = []
if registry_addrs: if registry_addrs:
self.reg_addrs: list[UnwrappedAddress] = registry_addrs self.reg_addrs: list[AddressTypes] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs _state._runtime_vars['_registry_addrs'] = registry_addrs
@property @property
def reg_addrs(self) -> list[UnwrappedAddress]: def reg_addrs(self) -> list[AddressTypes]:
''' '''
List of (socket) addresses for all known (and contactable) List of (socket) addresses for all known (and contactable)
registry actors. registry actors.
@ -293,7 +294,7 @@ class Actor:
@reg_addrs.setter @reg_addrs.setter
def reg_addrs( def reg_addrs(
self, self,
addrs: list[UnwrappedAddress], addrs: list[AddressTypes],
) -> None: ) -> None:
if not addrs: if not addrs:
log.warning( log.warning(
@ -1022,12 +1023,11 @@ class Actor:
async def _from_parent( async def _from_parent(
self, self,
parent_addr: UnwrappedAddress|None, parent_addr: AddressTypes|None,
) -> tuple[ ) -> tuple[
Channel, Channel,
list[UnwrappedAddress]|None, list[AddressTypes]|None,
list[str]|None, # preferred tpts
]: ]:
''' '''
Bootstrap this local actor's runtime config from its parent by Bootstrap this local actor's runtime config from its parent by
@ -1039,58 +1039,30 @@ class Actor:
# Connect back to the parent actor and conduct initial # Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we # handshake. From this point on if we error, we
# attempt to ship the exception back to the parent. # attempt to ship the exception back to the parent.
chan = await Channel.from_addr( chan = await Channel.from_addr(wrap_address(parent_addr))
addr=wrap_address(parent_addr)
)
assert isinstance(chan, Channel)
# TODO: move this into a `Channel.handshake()`? # TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names. # Initial handshake: swap names.
await self._do_handshake(chan) await self._do_handshake(chan)
accept_addrs: list[UnwrappedAddress]|None = None accept_addrs: list[AddressTypes]|None = None
if self._spawn_method == "trio": if self._spawn_method == "trio":
# Receive post-spawn runtime state from our parent. # Receive post-spawn runtime state from our parent.
spawnspec: msgtypes.SpawnSpec = await chan.recv() spawnspec: msgtypes.SpawnSpec = await chan.recv()
match spawnspec: self._spawn_spec = spawnspec
case MsgTypeError():
raise spawnspec
case msgtypes.SpawnSpec():
self._spawn_spec = spawnspec
log.runtime(
'Received runtime spec from parent:\n\n'
# TODO: eventually all these msgs as log.runtime(
# `msgspec.Struct` with a special mode that 'Received runtime spec from parent:\n\n'
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
f'{pretty_struct.pformat(spawnspec)}\n'
)
case _: # TODO: eventually all these msgs as
raise InternalError( # `msgspec.Struct` with a special mode that
f'Received invalid non-`SpawnSpec` payload !?\n' # pformats them in multi-line mode, BUT only
f'{spawnspec}\n' # if "trace"/"util" mode is enabled?
) f'{pretty_struct.pformat(spawnspec)}\n'
)
# ^^TODO XXX!! when the `SpawnSpec` fails to decode accept_addrs: list[AddressTypes] = spawnspec.bind_addrs
# the above will raise a `MsgTypeError` which if we
# do NOT ALSO RAISE it will tried to be pprinted in
# the log.runtime() below..
#
# SO we gotta look at how other `chan.recv()` calls
# are wrapped and do the same for this spec receive!
# -[ ] see `._rpc` likely has the answer?
#
# XXX NOTE, can't be called here in subactor
# bc we haven't yet received the
# `SpawnSpec._runtime_vars: dict` which would
# declare whether `debug_mode` is set!
# breakpoint()
# import pdbp; pdbp.set_trace()
accept_addrs: list[UnwrappedAddress] = spawnspec.bind_addrs
# TODO: another `Struct` for rtvs.. # TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars rvs: dict[str, Any] = spawnspec._runtime_vars
@ -1182,9 +1154,6 @@ class Actor:
return ( return (
chan, chan,
accept_addrs, accept_addrs,
None,
# ^TODO, preferred tpts list from rent!
# -[ ] need to extend the `SpawnSpec` tho!
) )
except OSError: # failed to connect except OSError: # failed to connect
@ -1200,7 +1169,7 @@ class Actor:
self, self,
handler_nursery: Nursery, handler_nursery: Nursery,
*, *,
listen_addrs: list[UnwrappedAddress]|None = None, listen_addrs: list[AddressTypes]|None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1609,7 +1578,7 @@ class Actor:
return False return False
@property @property
def accept_addrs(self) -> list[UnwrappedAddress]: def accept_addrs(self) -> list[AddressTypes]:
''' '''
All addresses to which the transport-channel server binds All addresses to which the transport-channel server binds
and listens for new connections. and listens for new connections.
@ -1618,7 +1587,7 @@ class Actor:
return [a.unwrap() for a in self._listen_addrs] return [a.unwrap() for a in self._listen_addrs]
@property @property
def accept_addr(self) -> UnwrappedAddress: def accept_addr(self) -> AddressTypes:
''' '''
Primary address to which the IPC transport server is Primary address to which the IPC transport server is
bound and listening for new connections. bound and listening for new connections.
@ -1670,6 +1639,8 @@ class Actor:
chan.aid = aid chan.aid = aid
uid: tuple[str, str] = ( uid: tuple[str, str] = (
# str(value[0]),
# str(value[1])
aid.name, aid.name,
aid.uuid, aid.uuid,
) )
@ -1693,7 +1664,7 @@ class Actor:
async def async_main( async def async_main(
actor: Actor, actor: Actor,
accept_addrs: UnwrappedAddress|None = None, accept_addrs: AddressTypes|None = None,
# XXX: currently ``parent_addr`` is only needed for the # XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to # ``multiprocessing`` backend (which pickles state sent to
@ -1702,7 +1673,7 @@ async def async_main(
# change this to a simple ``is_subactor: bool`` which will # change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as # be False when running as root actor and True when as
# a subactor. # a subactor.
parent_addr: UnwrappedAddress|None = None, parent_addr: AddressTypes|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1731,31 +1702,16 @@ async def async_main(
( (
actor._parent_chan, actor._parent_chan,
set_accept_addr_says_rent, set_accept_addr_says_rent,
maybe_preferred_transports_says_rent,
) = await actor._from_parent(parent_addr) ) = await actor._from_parent(parent_addr)
# either it's passed in because we're not a child or # either it's passed in because we're not a child or
# because we're running in mp mode # because we're running in mp mode
accept_addrs: list[UnwrappedAddress] = []
if ( if (
set_accept_addr_says_rent set_accept_addr_says_rent
and and
set_accept_addr_says_rent is not None set_accept_addr_says_rent is not None
): ):
accept_addrs = set_accept_addr_says_rent accept_addrs = set_accept_addr_says_rent
else:
enable_transports: list[str] = (
maybe_preferred_transports_says_rent
or
[preferred_transport]
)
for transport_key in enable_transports:
transport_cls: Type[Address] = get_address_cls(
transport_key
)
addr: Address = transport_cls.get_random()
accept_addrs.append(addr.unwrap())
# The "root" nursery ensures the channel with the immediate # The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
@ -1823,7 +1779,7 @@ async def async_main(
raise raise
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs accept_addrs: list[AddressTypes] = actor.accept_addrs
# NOTE: only set the loopback addr for the # NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since # process-tree-global "root" mailbox since
@ -2072,7 +2028,7 @@ class Arbiter(Actor):
self._registry: dict[ self._registry: dict[
tuple[str, str], tuple[str, str],
UnwrappedAddress, AddressTypes,
] = {} ] = {}
self._waiters: dict[ self._waiters: dict[
str, str,
@ -2088,7 +2044,7 @@ class Arbiter(Actor):
self, self,
name: str, name: str,
) -> UnwrappedAddress|None: ) -> AddressTypes|None:
for uid, addr in self._registry.items(): for uid, addr in self._registry.items():
if name in uid: if name in uid:
@ -2099,7 +2055,7 @@ class Arbiter(Actor):
async def get_registry( async def get_registry(
self self
) -> dict[str, UnwrappedAddress]: ) -> dict[str, AddressTypes]:
''' '''
Return current name registry. Return current name registry.
@ -2119,7 +2075,7 @@ class Arbiter(Actor):
self, self,
name: str, name: str,
) -> list[UnwrappedAddress]: ) -> list[AddressTypes]:
''' '''
Wait for a particular actor to register. Wait for a particular actor to register.
@ -2127,8 +2083,8 @@ class Arbiter(Actor):
registered. registered.
''' '''
addrs: list[UnwrappedAddress] = [] addrs: list[AddressTypes] = []
addr: UnwrappedAddress addr: AddressTypes
mailbox_info: str = 'Actor registry contact infos:\n' mailbox_info: str = 'Actor registry contact infos:\n'
for uid, addr in self._registry.items(): for uid, addr in self._registry.items():
@ -2154,7 +2110,7 @@ class Arbiter(Actor):
async def register_actor( async def register_actor(
self, self,
uid: tuple[str, str], uid: tuple[str, str],
addr: UnwrappedAddress addr: AddressTypes
) -> None: ) -> None:
uid = name, hash = (str(uid[0]), str(uid[1])) uid = name, hash = (str(uid[0]), str(uid[1]))
waddr: Address = wrap_address(addr) waddr: Address = wrap_address(addr)

View File

@ -46,7 +46,7 @@ from tractor._state import (
_runtime_vars, _runtime_vars,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor._addr import UnwrappedAddress from tractor._addr import AddressTypes
from tractor._portal import Portal from tractor._portal import Portal
from tractor._runtime import Actor from tractor._runtime import Actor
from tractor._entry import _mp_main from tractor._entry import _mp_main
@ -393,8 +393,8 @@ async def new_proc(
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addrs: list[UnwrappedAddress], bind_addrs: list[AddressTypes],
parent_addr: UnwrappedAddress, parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
@ -432,8 +432,8 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addrs: list[UnwrappedAddress], bind_addrs: list[AddressTypes],
parent_addr: UnwrappedAddress, parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False, infect_asyncio: bool = False,
@ -639,8 +639,8 @@ async def mp_proc(
subactor: Actor, subactor: Actor,
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addrs: list[UnwrappedAddress], bind_addrs: list[AddressTypes],
parent_addr: UnwrappedAddress, parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False, infect_asyncio: bool = False,

View File

@ -22,9 +22,7 @@ from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import inspect import inspect
from pprint import pformat from pprint import pformat
from typing import ( from typing import TYPE_CHECKING
TYPE_CHECKING,
)
import typing import typing
import warnings import warnings
@ -33,9 +31,9 @@ import trio
from .devx._debug import maybe_wait_for_debugger from .devx._debug import maybe_wait_for_debugger
from ._addr import ( from ._addr import (
UnwrappedAddress, AddressTypes,
preferred_transport, preferred_transport,
mk_uuid, get_address_cls
) )
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
@ -136,7 +134,7 @@ class ActorNursery:
*, *,
bind_addrs: list[UnwrappedAddress]|None = None, bind_addrs: list[AddressTypes]|None = None,
rpc_module_paths: list[str]|None = None, rpc_module_paths: list[str]|None = None,
enable_transports: list[str] = [preferred_transport], enable_transports: list[str] = [preferred_transport],
enable_modules: list[str]|None = None, enable_modules: list[str]|None = None,
@ -163,6 +161,12 @@ class ActorNursery:
or get_loglevel() or get_loglevel()
) )
if not bind_addrs:
bind_addrs: list[AddressTypes] = [
get_address_cls(transport).get_random().unwrap()
for transport in enable_transports
]
# configure and pass runtime state # configure and pass runtime state
_rtv = _state._runtime_vars.copy() _rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False _rtv['_is_root'] = False
@ -185,9 +189,7 @@ class ActorNursery:
enable_modules.extend(rpc_module_paths) enable_modules.extend(rpc_module_paths)
subactor = Actor( subactor = Actor(
name=name, name,
uuid=mk_uuid(),
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
enable_modules=enable_modules, enable_modules=enable_modules,
loglevel=loglevel, loglevel=loglevel,
@ -195,7 +197,7 @@ class ActorNursery:
# verbatim relay this actor's registrar addresses # verbatim relay this actor's registrar addresses
registry_addrs=current_actor().reg_addrs, registry_addrs=current_actor().reg_addrs,
) )
parent_addr: UnwrappedAddress = self._actor.accept_addr parent_addr = self._actor.accept_addr
assert parent_addr assert parent_addr
# start a task to spawn a process # start a task to spawn a process
@ -233,7 +235,7 @@ class ActorNursery:
*, *,
name: str | None = None, name: str | None = None,
bind_addrs: UnwrappedAddress|None = None, bind_addrs: AddressTypes|None = None,
rpc_module_paths: list[str] | None = None, rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None, enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor loglevel: str | None = None, # set log level per subactor

View File

@ -42,15 +42,24 @@ class MsgpackTCPStream(MsgpackTransport):
address_type = TCPAddress address_type = TCPAddress
layer_key: int = 4 layer_key: int = 4
# def __init__(
# self,
# stream: trio.SocketStream,
# prefix_size: int = 4,
# codec: CodecType = None,
# ) -> None:
# super().__init__(
# stream,
# prefix_size=prefix_size,
# codec=codec
# )
@property @property
def maddr(self) -> str: def maddr(self) -> str:
host, port = self.raddr.unwrap() host, port = self.raddr.unwrap()
return ( return (
# TODO, use `ipaddress` from stdlib to handle
# first detecting which of `ipv4/6` before
# choosing the routing prefix part.
f'/ipv4/{host}' f'/ipv4/{host}'
f'/{self.address_type.name_key}/{port}' f'/{self.address_type.name_key}/{port}'
# f'/{self.chan.uid[0]}' # f'/{self.chan.uid[0]}'
# f'/{self.cid}' # f'/{self.cid}'
@ -85,15 +94,12 @@ class MsgpackTCPStream(MsgpackTransport):
cls, cls,
stream: trio.SocketStream stream: trio.SocketStream
) -> tuple[ ) -> tuple[
TCPAddress, tuple[str, int],
TCPAddress, tuple[str, int]
]: ]:
# TODO, what types are these?
lsockname = stream.socket.getsockname() lsockname = stream.socket.getsockname()
l_sockaddr: tuple[str, int] = tuple(lsockname[:2])
rsockname = stream.socket.getpeername() rsockname = stream.socket.getpeername()
r_sockaddr: tuple[str, int] = tuple(rsockname[:2])
return ( return (
TCPAddress.from_addr(l_sockaddr), TCPAddress.from_addr(tuple(lsockname[:2])),
TCPAddress.from_addr(r_sockaddr), TCPAddress.from_addr(tuple(rsockname[:2])),
) )

View File

@ -18,23 +18,8 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
''' '''
from __future__ import annotations from __future__ import annotations
from pathlib import Path
import os
from socket import (
# socket,
AF_UNIX,
SOCK_STREAM,
SO_PASSCRED,
SO_PEERCRED,
SOL_SOCKET,
)
import struct
import trio import trio
from trio._highlevel_open_unix_stream import (
close_on_error,
has_unix,
)
from tractor.msg import MsgCodec from tractor.msg import MsgCodec
from tractor.log import get_logger from tractor.log import get_logger
@ -45,80 +30,33 @@ from tractor.ipc._transport import MsgpackTransport
log = get_logger(__name__) log = get_logger(__name__)
async def open_unix_socket_w_passcred(
filename: str|bytes|os.PathLike[str]|os.PathLike[bytes],
) -> trio.SocketStream:
'''
Literally the exact same as `trio.open_unix_socket()` except we set the additiona
`socket.SO_PASSCRED` option to ensure the server side (the process calling `accept()`)
can extract the connecting peer's credentials, namely OS specific process
related IDs.
See this SO for "why" the extra opts,
- https://stackoverflow.com/a/7982749
'''
if not has_unix:
raise RuntimeError("Unix sockets are not supported on this platform")
# much more simplified logic vs tcp sockets - one socket type and only one
# possible location to connect to
sock = trio.socket.socket(AF_UNIX, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_PASSCRED, 1)
with close_on_error(sock):
await sock.connect(os.fspath(filename))
return trio.SocketStream(sock)
def get_peer_info(sock: trio.socket.socket) -> tuple[
int, # pid
int, # uid
int, # guid
]:
'''
Deliver the connecting peer's "credentials"-info as defined in
a very Linux specific way..
For more deats see,
- `man accept`,
- `man unix`,
this great online guide to all things sockets,
- https://beej.us/guide/bgnet/html/split-wide/man-pages.html#setsockoptman
AND this **wonderful SO answer**
- https://stackoverflow.com/a/7982749
'''
creds: bytes = sock.getsockopt(
SOL_SOCKET,
SO_PEERCRED,
struct.calcsize('3i')
)
# i.e a tuple of the fields,
# pid: int, "process"
# uid: int, "user"
# gid: int, "group"
return struct.unpack('3i', creds)
class MsgpackUDSStream(MsgpackTransport): class MsgpackUDSStream(MsgpackTransport):
''' '''
A `trio.SocketStream` around a Unix-Domain-Socket transport A ``trio.SocketStream`` delivering ``msgpack`` formatted data
delivering `msgpack` encoded msgs using the `msgspec` codec lib. using the ``msgspec`` codec lib.
''' '''
address_type = UDSAddress address_type = UDSAddress
layer_key: int = 4 layer_key: int = 7
# def __init__(
# self,
# stream: trio.SocketStream,
# prefix_size: int = 4,
# codec: CodecType = None,
# ) -> None:
# super().__init__(
# stream,
# prefix_size=prefix_size,
# codec=codec
# )
@property @property
def maddr(self) -> str: def maddr(self) -> str:
if not self.raddr: filepath = self.raddr.unwrap()
return '<unknown-peer>'
filepath: Path = Path(self.raddr.unwrap()[0])
return ( return (
f'/ipv4/localhost'
f'/{self.address_type.name_key}/{filepath}' f'/{self.address_type.name_key}/{filepath}'
# f'/{self.chan.uid[0]}' # f'/{self.chan.uid[0]}'
# f'/{self.cid}' # f'/{self.cid}'
@ -138,72 +76,22 @@ class MsgpackUDSStream(MsgpackTransport):
codec: MsgCodec|None = None, codec: MsgCodec|None = None,
**kwargs **kwargs
) -> MsgpackUDSStream: ) -> MsgpackUDSStream:
stream = await trio.open_unix_socket(
filepath: Path addr.unwrap(),
pid: int
(
filepath,
pid,
) = addr.unwrap()
# XXX NOTE, we don't need to provide the `.pid` part from
# the addr since the OS does this implicitly! .. lel
# stream = await trio.open_unix_socket(
stream = await open_unix_socket_w_passcred(
str(filepath),
**kwargs **kwargs
) )
stream = MsgpackUDSStream( return MsgpackUDSStream(
stream, stream,
prefix_size=prefix_size, prefix_size=prefix_size,
codec=codec codec=codec
) )
stream._raddr = addr
return stream
@classmethod @classmethod
def get_stream_addrs( def get_stream_addrs(
cls, cls,
stream: trio.SocketStream stream: trio.SocketStream
) -> tuple[ ) -> tuple[UDSAddress, UDSAddress]:
Path, return (
int, UDSAddress.from_addr(stream.socket.getsockname()),
]: UDSAddress.from_addr(stream.socket.getsockname()),
sock: trio.socket.socket = stream.socket
# NOTE XXX, it's unclear why one or the other ends up being
# `bytes` versus the socket-file-path, i presume it's
# something to do with who is the server (called `.listen()`)?
# maybe could be better implemented using another info-query
# on the socket like,
# https://beej.us/guide/bgnet/html/split-wide/system-calls-or-bust.html#gethostnamewho-am-i
sockname: str|bytes = sock.getsockname()
# https://beej.us/guide/bgnet/html/split-wide/system-calls-or-bust.html#getpeernamewho-are-you
peername: str|bytes = sock.getpeername()
match (peername, sockname):
case (str(), bytes()):
sock_path: Path = Path(peername)
case (bytes(), str()):
sock_path: Path = Path(sockname)
(
pid,
uid,
gid,
) = get_peer_info(sock)
log.info(
f'UDS connection from process {pid!r}\n'
f'>[\n'
f'|_{sock_path}\n'
f' |_pid: {pid}\n'
f' |_uid: {uid}\n'
f' |_gid: {gid}\n'
) )
laddr = UDSAddress.from_addr((
sock_path,
os.getpid(),
))
raddr = UDSAddress.from_addr((
sock_path,
pid
))
return (laddr, raddr)

View File

@ -31,7 +31,6 @@ from typing import (
Type, Type,
TypeVar, TypeVar,
TypeAlias, TypeAlias,
# TYPE_CHECKING,
Union, Union,
) )
@ -48,7 +47,7 @@ from tractor.msg import (
pretty_struct, pretty_struct,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor._addr import UnwrappedAddress from tractor._addr import AddressTypes
log = get_logger('tractor.msgspec') log = get_logger('tractor.msgspec')
@ -143,15 +142,9 @@ class Aid(
''' '''
name: str name: str
uuid: str uuid: str
# TODO: use built-in support for UUIDs?
# TODO? can/should we extend this field set? # -[ ] `uuid.UUID` which has multi-protocol support
# -[ ] use built-in support for UUIDs? `uuid.UUID` which has # https://jcristharif.com/msgspec/supported-types.html#uuid
# multi-protocol support
# https://jcristharif.com/msgspec/supported-types.html#uuid
#
# -[ ] as per the `.ipc._uds` / `._addr` comments, maybe we
# should also include at least `.pid` (equiv to port for tcp)
# and/or host-part always?
class SpawnSpec( class SpawnSpec(
@ -175,8 +168,8 @@ class SpawnSpec(
# TODO: not just sockaddr pairs? # TODO: not just sockaddr pairs?
# -[ ] abstract into a `TransportAddr` type? # -[ ] abstract into a `TransportAddr` type?
reg_addrs: list[UnwrappedAddress] reg_addrs: list[AddressTypes]
bind_addrs: list[UnwrappedAddress]|None bind_addrs: list[AddressTypes]
# TODO: caps based RPC support in the payload? # TODO: caps based RPC support in the payload?