Compare commits
3 Commits
9de192390a
...
6a5ccc2425
Author | SHA1 | Date |
---|---|---|
|
6a5ccc2425 | |
|
23acd0f4cb | |
|
2c11d1d44a |
|
@ -31,7 +31,7 @@ from tractor.log import get_logger
|
||||||
from .trionics import gather_contexts
|
from .trionics import gather_contexts
|
||||||
from .ipc import _connect_chan, Channel
|
from .ipc import _connect_chan, Channel
|
||||||
from ._addr import (
|
from ._addr import (
|
||||||
AddressTypes,
|
UnwrappedAddress,
|
||||||
Address,
|
Address,
|
||||||
preferred_transport,
|
preferred_transport,
|
||||||
wrap_address
|
wrap_address
|
||||||
|
@ -54,7 +54,9 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def get_registry(addr: AddressTypes | None = None) -> AsyncGenerator[
|
async def get_registry(
|
||||||
|
addr: UnwrappedAddress|None = None,
|
||||||
|
) -> AsyncGenerator[
|
||||||
Portal | LocalPortal | None,
|
Portal | LocalPortal | None,
|
||||||
None,
|
None,
|
||||||
]:
|
]:
|
||||||
|
@ -71,7 +73,9 @@ async def get_registry(addr: AddressTypes | None = None) -> AsyncGenerator[
|
||||||
# (likely a re-entrant call from the arbiter actor)
|
# (likely a re-entrant call from the arbiter actor)
|
||||||
yield LocalPortal(
|
yield LocalPortal(
|
||||||
actor,
|
actor,
|
||||||
await Channel.from_addr(addr)
|
Channel(transport=None)
|
||||||
|
# ^XXX, we DO NOT actually provide nor connect an
|
||||||
|
# underlying transport since this is merely an API shim.
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# TODO: try to look pre-existing connection from
|
# TODO: try to look pre-existing connection from
|
||||||
|
@ -135,10 +139,10 @@ def get_peer_by_name(
|
||||||
@acm
|
@acm
|
||||||
async def query_actor(
|
async def query_actor(
|
||||||
name: str,
|
name: str,
|
||||||
regaddr: AddressTypes|None = None,
|
regaddr: UnwrappedAddress|None = None,
|
||||||
|
|
||||||
) -> AsyncGenerator[
|
) -> AsyncGenerator[
|
||||||
AddressTypes|None,
|
UnwrappedAddress|None,
|
||||||
None,
|
None,
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
|
@ -168,7 +172,7 @@ async def query_actor(
|
||||||
async with get_registry(regaddr) as reg_portal:
|
async with get_registry(regaddr) as reg_portal:
|
||||||
# TODO: return portals to all available actors - for now
|
# TODO: return portals to all available actors - for now
|
||||||
# just the last one that registered
|
# just the last one that registered
|
||||||
addr: AddressTypes = await reg_portal.run_from_ns(
|
addr: UnwrappedAddress = await reg_portal.run_from_ns(
|
||||||
'self',
|
'self',
|
||||||
'find_actor',
|
'find_actor',
|
||||||
name=name,
|
name=name,
|
||||||
|
@ -178,7 +182,7 @@ async def query_actor(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_portal(
|
async def maybe_open_portal(
|
||||||
addr: AddressTypes,
|
addr: UnwrappedAddress,
|
||||||
name: str,
|
name: str,
|
||||||
):
|
):
|
||||||
async with query_actor(
|
async with query_actor(
|
||||||
|
@ -198,7 +202,7 @@ async def maybe_open_portal(
|
||||||
@acm
|
@acm
|
||||||
async def find_actor(
|
async def find_actor(
|
||||||
name: str,
|
name: str,
|
||||||
registry_addrs: list[AddressTypes]|None = None,
|
registry_addrs: list[UnwrappedAddress]|None = None,
|
||||||
enable_transports: list[str] = [preferred_transport],
|
enable_transports: list[str] = [preferred_transport],
|
||||||
|
|
||||||
only_first: bool = True,
|
only_first: bool = True,
|
||||||
|
@ -234,7 +238,7 @@ async def find_actor(
|
||||||
)
|
)
|
||||||
|
|
||||||
maybe_portals: list[
|
maybe_portals: list[
|
||||||
AsyncContextManager[AddressTypes]
|
AsyncContextManager[UnwrappedAddress]
|
||||||
] = list(
|
] = list(
|
||||||
maybe_open_portal(
|
maybe_open_portal(
|
||||||
addr=addr,
|
addr=addr,
|
||||||
|
@ -276,7 +280,7 @@ async def find_actor(
|
||||||
@acm
|
@acm
|
||||||
async def wait_for_actor(
|
async def wait_for_actor(
|
||||||
name: str,
|
name: str,
|
||||||
registry_addr: AddressTypes | None = None,
|
registry_addr: UnwrappedAddress | None = None,
|
||||||
|
|
||||||
) -> AsyncGenerator[Portal, None]:
|
) -> AsyncGenerator[Portal, None]:
|
||||||
'''
|
'''
|
||||||
|
@ -293,7 +297,7 @@ async def wait_for_actor(
|
||||||
yield peer_portal
|
yield peer_portal
|
||||||
return
|
return
|
||||||
|
|
||||||
regaddr: AddressTypes = (
|
regaddr: UnwrappedAddress = (
|
||||||
registry_addr
|
registry_addr
|
||||||
or
|
or
|
||||||
actor.reg_addrs[0]
|
actor.reg_addrs[0]
|
||||||
|
@ -310,7 +314,7 @@ async def wait_for_actor(
|
||||||
|
|
||||||
# get latest registered addr by default?
|
# get latest registered addr by default?
|
||||||
# TODO: offer multi-portal yields in multi-homed case?
|
# TODO: offer multi-portal yields in multi-homed case?
|
||||||
addr: AddressTypes = addrs[-1]
|
addr: UnwrappedAddress = addrs[-1]
|
||||||
|
|
||||||
async with _connect_chan(addr) as chan:
|
async with _connect_chan(addr) as chan:
|
||||||
async with open_portal(chan) as portal:
|
async with open_portal(chan) as portal:
|
||||||
|
|
|
@ -37,7 +37,7 @@ from .log import (
|
||||||
from . import _state
|
from . import _state
|
||||||
from .devx import _debug
|
from .devx import _debug
|
||||||
from .to_asyncio import run_as_asyncio_guest
|
from .to_asyncio import run_as_asyncio_guest
|
||||||
from ._addr import AddressTypes
|
from ._addr import UnwrappedAddress
|
||||||
from ._runtime import (
|
from ._runtime import (
|
||||||
async_main,
|
async_main,
|
||||||
Actor,
|
Actor,
|
||||||
|
@ -53,10 +53,10 @@ log = get_logger(__name__)
|
||||||
def _mp_main(
|
def _mp_main(
|
||||||
|
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
accept_addrs: list[AddressTypes],
|
accept_addrs: list[UnwrappedAddress],
|
||||||
forkserver_info: tuple[Any, Any, Any, Any, Any],
|
forkserver_info: tuple[Any, Any, Any, Any, Any],
|
||||||
start_method: SpawnMethodKey,
|
start_method: SpawnMethodKey,
|
||||||
parent_addr: AddressTypes | None = None,
|
parent_addr: UnwrappedAddress | None = None,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -207,7 +207,7 @@ def nest_from_op(
|
||||||
def _trio_main(
|
def _trio_main(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
*,
|
*,
|
||||||
parent_addr: AddressTypes | None = None,
|
parent_addr: UnwrappedAddress|None = None,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
|
@ -47,10 +47,11 @@ from .ipc import (
|
||||||
_connect_chan,
|
_connect_chan,
|
||||||
)
|
)
|
||||||
from ._addr import (
|
from ._addr import (
|
||||||
AddressTypes,
|
UnwrappedAddress,
|
||||||
wrap_address,
|
default_lo_addrs,
|
||||||
|
mk_uuid,
|
||||||
preferred_transport,
|
preferred_transport,
|
||||||
default_lo_addrs
|
wrap_address,
|
||||||
)
|
)
|
||||||
from ._exceptions import is_multi_cancelled
|
from ._exceptions import is_multi_cancelled
|
||||||
|
|
||||||
|
@ -63,10 +64,10 @@ async def open_root_actor(
|
||||||
|
|
||||||
*,
|
*,
|
||||||
# defaults are above
|
# defaults are above
|
||||||
registry_addrs: list[AddressTypes]|None = None,
|
registry_addrs: list[UnwrappedAddress]|None = None,
|
||||||
|
|
||||||
# defaults are above
|
# defaults are above
|
||||||
arbiter_addr: tuple[AddressTypes]|None = None,
|
arbiter_addr: tuple[UnwrappedAddress]|None = None,
|
||||||
|
|
||||||
enable_transports: list[str] = [preferred_transport],
|
enable_transports: list[str] = [preferred_transport],
|
||||||
|
|
||||||
|
@ -195,7 +196,9 @@ async def open_root_actor(
|
||||||
registry_addrs = [arbiter_addr]
|
registry_addrs = [arbiter_addr]
|
||||||
|
|
||||||
if not registry_addrs:
|
if not registry_addrs:
|
||||||
registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports)
|
registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
|
||||||
|
enable_transports
|
||||||
|
)
|
||||||
|
|
||||||
assert registry_addrs
|
assert registry_addrs
|
||||||
|
|
||||||
|
@ -245,10 +248,10 @@ async def open_root_actor(
|
||||||
enable_stack_on_sig()
|
enable_stack_on_sig()
|
||||||
|
|
||||||
# closed into below ping task-func
|
# closed into below ping task-func
|
||||||
ponged_addrs: list[AddressTypes] = []
|
ponged_addrs: list[UnwrappedAddress] = []
|
||||||
|
|
||||||
async def ping_tpt_socket(
|
async def ping_tpt_socket(
|
||||||
addr: AddressTypes,
|
addr: UnwrappedAddress,
|
||||||
timeout: float = 1,
|
timeout: float = 1,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -284,7 +287,7 @@ async def open_root_actor(
|
||||||
addr,
|
addr,
|
||||||
)
|
)
|
||||||
|
|
||||||
trans_bind_addrs: list[AddressTypes] = []
|
trans_bind_addrs: list[UnwrappedAddress] = []
|
||||||
|
|
||||||
# Create a new local root-actor instance which IS NOT THE
|
# Create a new local root-actor instance which IS NOT THE
|
||||||
# REGISTRAR
|
# REGISTRAR
|
||||||
|
@ -302,6 +305,7 @@ async def open_root_actor(
|
||||||
|
|
||||||
actor = Actor(
|
actor = Actor(
|
||||||
name=name or 'anonymous',
|
name=name or 'anonymous',
|
||||||
|
uuid=mk_uuid(),
|
||||||
registry_addrs=ponged_addrs,
|
registry_addrs=ponged_addrs,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
enable_modules=enable_modules,
|
enable_modules=enable_modules,
|
||||||
|
@ -336,7 +340,8 @@ async def open_root_actor(
|
||||||
# https://github.com/goodboy/tractor/issues/296
|
# https://github.com/goodboy/tractor/issues/296
|
||||||
|
|
||||||
actor = Arbiter(
|
actor = Arbiter(
|
||||||
name or 'registrar',
|
name=name or 'registrar',
|
||||||
|
uuid=mk_uuid(),
|
||||||
registry_addrs=registry_addrs,
|
registry_addrs=registry_addrs,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
enable_modules=enable_modules,
|
enable_modules=enable_modules,
|
||||||
|
@ -462,7 +467,7 @@ def run_daemon(
|
||||||
|
|
||||||
# runtime kwargs
|
# runtime kwargs
|
||||||
name: str | None = 'root',
|
name: str | None = 'root',
|
||||||
registry_addrs: list[AddressTypes]|None = None,
|
registry_addrs: list[UnwrappedAddress]|None = None,
|
||||||
|
|
||||||
start_method: str | None = None,
|
start_method: str | None = None,
|
||||||
debug_mode: bool = False,
|
debug_mode: bool = False,
|
||||||
|
|
|
@ -52,6 +52,7 @@ import sys
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
|
Type,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
import uuid
|
import uuid
|
||||||
|
@ -75,11 +76,12 @@ from tractor.msg import (
|
||||||
)
|
)
|
||||||
from .ipc import Channel
|
from .ipc import Channel
|
||||||
from ._addr import (
|
from ._addr import (
|
||||||
AddressTypes,
|
UnwrappedAddress,
|
||||||
Address,
|
Address,
|
||||||
wrap_address,
|
default_lo_addrs,
|
||||||
|
get_address_cls,
|
||||||
preferred_transport,
|
preferred_transport,
|
||||||
default_lo_addrs
|
wrap_address,
|
||||||
)
|
)
|
||||||
from ._context import (
|
from ._context import (
|
||||||
mk_context,
|
mk_context,
|
||||||
|
@ -182,15 +184,15 @@ class Actor:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
uuid: str,
|
||||||
*,
|
*,
|
||||||
enable_modules: list[str] = [],
|
enable_modules: list[str] = [],
|
||||||
uid: str|None = None,
|
|
||||||
loglevel: str|None = None,
|
loglevel: str|None = None,
|
||||||
registry_addrs: list[AddressTypes]|None = None,
|
registry_addrs: list[UnwrappedAddress]|None = None,
|
||||||
spawn_method: str|None = None,
|
spawn_method: str|None = None,
|
||||||
|
|
||||||
# TODO: remove!
|
# TODO: remove!
|
||||||
arbiter_addr: AddressTypes|None = None,
|
arbiter_addr: UnwrappedAddress|None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -199,10 +201,7 @@ class Actor:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
self.name = name
|
self.name = name
|
||||||
self.uid = (
|
self.uid = (name, uuid)
|
||||||
name,
|
|
||||||
uid or str(uuid.uuid4())
|
|
||||||
)
|
|
||||||
|
|
||||||
self._cancel_complete = trio.Event()
|
self._cancel_complete = trio.Event()
|
||||||
self._cancel_called_by_remote: tuple[str, tuple]|None = None
|
self._cancel_called_by_remote: tuple[str, tuple]|None = None
|
||||||
|
@ -230,7 +229,7 @@ class Actor:
|
||||||
DeprecationWarning,
|
DeprecationWarning,
|
||||||
stacklevel=2,
|
stacklevel=2,
|
||||||
)
|
)
|
||||||
registry_addrs: list[AddressTypes] = [arbiter_addr]
|
registry_addrs: list[UnwrappedAddress] = [arbiter_addr]
|
||||||
|
|
||||||
# marked by the process spawning backend at startup
|
# marked by the process spawning backend at startup
|
||||||
# will be None for the parent most process started manually
|
# will be None for the parent most process started manually
|
||||||
|
@ -277,13 +276,13 @@ class Actor:
|
||||||
|
|
||||||
# when provided, init the registry addresses property from
|
# when provided, init the registry addresses property from
|
||||||
# input via the validator.
|
# input via the validator.
|
||||||
self._reg_addrs: list[AddressTypes] = []
|
self._reg_addrs: list[UnwrappedAddress] = []
|
||||||
if registry_addrs:
|
if registry_addrs:
|
||||||
self.reg_addrs: list[AddressTypes] = registry_addrs
|
self.reg_addrs: list[UnwrappedAddress] = registry_addrs
|
||||||
_state._runtime_vars['_registry_addrs'] = registry_addrs
|
_state._runtime_vars['_registry_addrs'] = registry_addrs
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def reg_addrs(self) -> list[AddressTypes]:
|
def reg_addrs(self) -> list[UnwrappedAddress]:
|
||||||
'''
|
'''
|
||||||
List of (socket) addresses for all known (and contactable)
|
List of (socket) addresses for all known (and contactable)
|
||||||
registry actors.
|
registry actors.
|
||||||
|
@ -294,7 +293,7 @@ class Actor:
|
||||||
@reg_addrs.setter
|
@reg_addrs.setter
|
||||||
def reg_addrs(
|
def reg_addrs(
|
||||||
self,
|
self,
|
||||||
addrs: list[AddressTypes],
|
addrs: list[UnwrappedAddress],
|
||||||
) -> None:
|
) -> None:
|
||||||
if not addrs:
|
if not addrs:
|
||||||
log.warning(
|
log.warning(
|
||||||
|
@ -1023,11 +1022,12 @@ class Actor:
|
||||||
|
|
||||||
async def _from_parent(
|
async def _from_parent(
|
||||||
self,
|
self,
|
||||||
parent_addr: AddressTypes|None,
|
parent_addr: UnwrappedAddress|None,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
Channel,
|
Channel,
|
||||||
list[AddressTypes]|None,
|
list[UnwrappedAddress]|None,
|
||||||
|
list[str]|None, # preferred tpts
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Bootstrap this local actor's runtime config from its parent by
|
Bootstrap this local actor's runtime config from its parent by
|
||||||
|
@ -1039,20 +1039,26 @@ class Actor:
|
||||||
# Connect back to the parent actor and conduct initial
|
# Connect back to the parent actor and conduct initial
|
||||||
# handshake. From this point on if we error, we
|
# handshake. From this point on if we error, we
|
||||||
# attempt to ship the exception back to the parent.
|
# attempt to ship the exception back to the parent.
|
||||||
chan = await Channel.from_addr(wrap_address(parent_addr))
|
chan = await Channel.from_addr(
|
||||||
|
addr=wrap_address(parent_addr)
|
||||||
|
)
|
||||||
|
assert isinstance(chan, Channel)
|
||||||
|
|
||||||
# TODO: move this into a `Channel.handshake()`?
|
# TODO: move this into a `Channel.handshake()`?
|
||||||
# Initial handshake: swap names.
|
# Initial handshake: swap names.
|
||||||
await self._do_handshake(chan)
|
await self._do_handshake(chan)
|
||||||
|
|
||||||
accept_addrs: list[AddressTypes]|None = None
|
accept_addrs: list[UnwrappedAddress]|None = None
|
||||||
|
|
||||||
if self._spawn_method == "trio":
|
if self._spawn_method == "trio":
|
||||||
|
|
||||||
# Receive post-spawn runtime state from our parent.
|
# Receive post-spawn runtime state from our parent.
|
||||||
spawnspec: msgtypes.SpawnSpec = await chan.recv()
|
spawnspec: msgtypes.SpawnSpec = await chan.recv()
|
||||||
|
match spawnspec:
|
||||||
|
case MsgTypeError():
|
||||||
|
raise spawnspec
|
||||||
|
case msgtypes.SpawnSpec():
|
||||||
self._spawn_spec = spawnspec
|
self._spawn_spec = spawnspec
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Received runtime spec from parent:\n\n'
|
'Received runtime spec from parent:\n\n'
|
||||||
|
|
||||||
|
@ -1062,7 +1068,29 @@ class Actor:
|
||||||
# if "trace"/"util" mode is enabled?
|
# if "trace"/"util" mode is enabled?
|
||||||
f'{pretty_struct.pformat(spawnspec)}\n'
|
f'{pretty_struct.pformat(spawnspec)}\n'
|
||||||
)
|
)
|
||||||
accept_addrs: list[AddressTypes] = spawnspec.bind_addrs
|
|
||||||
|
case _:
|
||||||
|
raise InternalError(
|
||||||
|
f'Received invalid non-`SpawnSpec` payload !?\n'
|
||||||
|
f'{spawnspec}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ^^TODO XXX!! when the `SpawnSpec` fails to decode
|
||||||
|
# the above will raise a `MsgTypeError` which if we
|
||||||
|
# do NOT ALSO RAISE it will tried to be pprinted in
|
||||||
|
# the log.runtime() below..
|
||||||
|
#
|
||||||
|
# SO we gotta look at how other `chan.recv()` calls
|
||||||
|
# are wrapped and do the same for this spec receive!
|
||||||
|
# -[ ] see `._rpc` likely has the answer?
|
||||||
|
#
|
||||||
|
# XXX NOTE, can't be called here in subactor
|
||||||
|
# bc we haven't yet received the
|
||||||
|
# `SpawnSpec._runtime_vars: dict` which would
|
||||||
|
# declare whether `debug_mode` is set!
|
||||||
|
# breakpoint()
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
|
accept_addrs: list[UnwrappedAddress] = spawnspec.bind_addrs
|
||||||
|
|
||||||
# TODO: another `Struct` for rtvs..
|
# TODO: another `Struct` for rtvs..
|
||||||
rvs: dict[str, Any] = spawnspec._runtime_vars
|
rvs: dict[str, Any] = spawnspec._runtime_vars
|
||||||
|
@ -1154,6 +1182,9 @@ class Actor:
|
||||||
return (
|
return (
|
||||||
chan,
|
chan,
|
||||||
accept_addrs,
|
accept_addrs,
|
||||||
|
None,
|
||||||
|
# ^TODO, preferred tpts list from rent!
|
||||||
|
# -[ ] need to extend the `SpawnSpec` tho!
|
||||||
)
|
)
|
||||||
|
|
||||||
except OSError: # failed to connect
|
except OSError: # failed to connect
|
||||||
|
@ -1169,7 +1200,7 @@ class Actor:
|
||||||
self,
|
self,
|
||||||
handler_nursery: Nursery,
|
handler_nursery: Nursery,
|
||||||
*,
|
*,
|
||||||
listen_addrs: list[AddressTypes]|None = None,
|
listen_addrs: list[UnwrappedAddress]|None = None,
|
||||||
|
|
||||||
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -1578,7 +1609,7 @@ class Actor:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def accept_addrs(self) -> list[AddressTypes]:
|
def accept_addrs(self) -> list[UnwrappedAddress]:
|
||||||
'''
|
'''
|
||||||
All addresses to which the transport-channel server binds
|
All addresses to which the transport-channel server binds
|
||||||
and listens for new connections.
|
and listens for new connections.
|
||||||
|
@ -1587,7 +1618,7 @@ class Actor:
|
||||||
return [a.unwrap() for a in self._listen_addrs]
|
return [a.unwrap() for a in self._listen_addrs]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def accept_addr(self) -> AddressTypes:
|
def accept_addr(self) -> UnwrappedAddress:
|
||||||
'''
|
'''
|
||||||
Primary address to which the IPC transport server is
|
Primary address to which the IPC transport server is
|
||||||
bound and listening for new connections.
|
bound and listening for new connections.
|
||||||
|
@ -1639,8 +1670,6 @@ class Actor:
|
||||||
chan.aid = aid
|
chan.aid = aid
|
||||||
|
|
||||||
uid: tuple[str, str] = (
|
uid: tuple[str, str] = (
|
||||||
# str(value[0]),
|
|
||||||
# str(value[1])
|
|
||||||
aid.name,
|
aid.name,
|
||||||
aid.uuid,
|
aid.uuid,
|
||||||
)
|
)
|
||||||
|
@ -1664,7 +1693,7 @@ class Actor:
|
||||||
|
|
||||||
async def async_main(
|
async def async_main(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
accept_addrs: AddressTypes|None = None,
|
accept_addrs: UnwrappedAddress|None = None,
|
||||||
|
|
||||||
# XXX: currently ``parent_addr`` is only needed for the
|
# XXX: currently ``parent_addr`` is only needed for the
|
||||||
# ``multiprocessing`` backend (which pickles state sent to
|
# ``multiprocessing`` backend (which pickles state sent to
|
||||||
|
@ -1673,7 +1702,7 @@ async def async_main(
|
||||||
# change this to a simple ``is_subactor: bool`` which will
|
# change this to a simple ``is_subactor: bool`` which will
|
||||||
# be False when running as root actor and True when as
|
# be False when running as root actor and True when as
|
||||||
# a subactor.
|
# a subactor.
|
||||||
parent_addr: AddressTypes|None = None,
|
parent_addr: UnwrappedAddress|None = None,
|
||||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -1702,16 +1731,31 @@ async def async_main(
|
||||||
(
|
(
|
||||||
actor._parent_chan,
|
actor._parent_chan,
|
||||||
set_accept_addr_says_rent,
|
set_accept_addr_says_rent,
|
||||||
|
maybe_preferred_transports_says_rent,
|
||||||
) = await actor._from_parent(parent_addr)
|
) = await actor._from_parent(parent_addr)
|
||||||
|
|
||||||
|
|
||||||
# either it's passed in because we're not a child or
|
# either it's passed in because we're not a child or
|
||||||
# because we're running in mp mode
|
# because we're running in mp mode
|
||||||
|
accept_addrs: list[UnwrappedAddress] = []
|
||||||
if (
|
if (
|
||||||
set_accept_addr_says_rent
|
set_accept_addr_says_rent
|
||||||
and
|
and
|
||||||
set_accept_addr_says_rent is not None
|
set_accept_addr_says_rent is not None
|
||||||
):
|
):
|
||||||
accept_addrs = set_accept_addr_says_rent
|
accept_addrs = set_accept_addr_says_rent
|
||||||
|
else:
|
||||||
|
enable_transports: list[str] = (
|
||||||
|
maybe_preferred_transports_says_rent
|
||||||
|
or
|
||||||
|
[preferred_transport]
|
||||||
|
)
|
||||||
|
for transport_key in enable_transports:
|
||||||
|
transport_cls: Type[Address] = get_address_cls(
|
||||||
|
transport_key
|
||||||
|
)
|
||||||
|
addr: Address = transport_cls.get_random()
|
||||||
|
accept_addrs.append(addr.unwrap())
|
||||||
|
|
||||||
# The "root" nursery ensures the channel with the immediate
|
# The "root" nursery ensures the channel with the immediate
|
||||||
# parent is kept alive as a resilient service until
|
# parent is kept alive as a resilient service until
|
||||||
|
@ -1779,7 +1823,7 @@ async def async_main(
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
accept_addrs: list[AddressTypes] = actor.accept_addrs
|
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
|
||||||
|
|
||||||
# NOTE: only set the loopback addr for the
|
# NOTE: only set the loopback addr for the
|
||||||
# process-tree-global "root" mailbox since
|
# process-tree-global "root" mailbox since
|
||||||
|
@ -2028,7 +2072,7 @@ class Arbiter(Actor):
|
||||||
|
|
||||||
self._registry: dict[
|
self._registry: dict[
|
||||||
tuple[str, str],
|
tuple[str, str],
|
||||||
AddressTypes,
|
UnwrappedAddress,
|
||||||
] = {}
|
] = {}
|
||||||
self._waiters: dict[
|
self._waiters: dict[
|
||||||
str,
|
str,
|
||||||
|
@ -2044,7 +2088,7 @@ class Arbiter(Actor):
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
|
||||||
) -> AddressTypes|None:
|
) -> UnwrappedAddress|None:
|
||||||
|
|
||||||
for uid, addr in self._registry.items():
|
for uid, addr in self._registry.items():
|
||||||
if name in uid:
|
if name in uid:
|
||||||
|
@ -2055,7 +2099,7 @@ class Arbiter(Actor):
|
||||||
async def get_registry(
|
async def get_registry(
|
||||||
self
|
self
|
||||||
|
|
||||||
) -> dict[str, AddressTypes]:
|
) -> dict[str, UnwrappedAddress]:
|
||||||
'''
|
'''
|
||||||
Return current name registry.
|
Return current name registry.
|
||||||
|
|
||||||
|
@ -2075,7 +2119,7 @@ class Arbiter(Actor):
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
|
||||||
) -> list[AddressTypes]:
|
) -> list[UnwrappedAddress]:
|
||||||
'''
|
'''
|
||||||
Wait for a particular actor to register.
|
Wait for a particular actor to register.
|
||||||
|
|
||||||
|
@ -2083,8 +2127,8 @@ class Arbiter(Actor):
|
||||||
registered.
|
registered.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
addrs: list[AddressTypes] = []
|
addrs: list[UnwrappedAddress] = []
|
||||||
addr: AddressTypes
|
addr: UnwrappedAddress
|
||||||
|
|
||||||
mailbox_info: str = 'Actor registry contact infos:\n'
|
mailbox_info: str = 'Actor registry contact infos:\n'
|
||||||
for uid, addr in self._registry.items():
|
for uid, addr in self._registry.items():
|
||||||
|
@ -2110,7 +2154,7 @@ class Arbiter(Actor):
|
||||||
async def register_actor(
|
async def register_actor(
|
||||||
self,
|
self,
|
||||||
uid: tuple[str, str],
|
uid: tuple[str, str],
|
||||||
addr: AddressTypes
|
addr: UnwrappedAddress
|
||||||
) -> None:
|
) -> None:
|
||||||
uid = name, hash = (str(uid[0]), str(uid[1]))
|
uid = name, hash = (str(uid[0]), str(uid[1]))
|
||||||
waddr: Address = wrap_address(addr)
|
waddr: Address = wrap_address(addr)
|
||||||
|
|
|
@ -46,7 +46,7 @@ from tractor._state import (
|
||||||
_runtime_vars,
|
_runtime_vars,
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor._addr import AddressTypes
|
from tractor._addr import UnwrappedAddress
|
||||||
from tractor._portal import Portal
|
from tractor._portal import Portal
|
||||||
from tractor._runtime import Actor
|
from tractor._runtime import Actor
|
||||||
from tractor._entry import _mp_main
|
from tractor._entry import _mp_main
|
||||||
|
@ -393,8 +393,8 @@ async def new_proc(
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[tuple[str, str], Exception],
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[AddressTypes],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
parent_addr: AddressTypes,
|
parent_addr: UnwrappedAddress,
|
||||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||||
|
|
||||||
*,
|
*,
|
||||||
|
@ -432,8 +432,8 @@ async def trio_proc(
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[tuple[str, str], Exception],
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[AddressTypes],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
parent_addr: AddressTypes,
|
parent_addr: UnwrappedAddress,
|
||||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
@ -639,8 +639,8 @@ async def mp_proc(
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[tuple[str, str], Exception],
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[AddressTypes],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
parent_addr: AddressTypes,
|
parent_addr: UnwrappedAddress,
|
||||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||||
*,
|
*,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
|
|
@ -22,7 +22,9 @@ from contextlib import asynccontextmanager as acm
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import inspect
|
import inspect
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import TYPE_CHECKING
|
from typing import (
|
||||||
|
TYPE_CHECKING,
|
||||||
|
)
|
||||||
import typing
|
import typing
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
|
@ -31,9 +33,9 @@ import trio
|
||||||
|
|
||||||
from .devx._debug import maybe_wait_for_debugger
|
from .devx._debug import maybe_wait_for_debugger
|
||||||
from ._addr import (
|
from ._addr import (
|
||||||
AddressTypes,
|
UnwrappedAddress,
|
||||||
preferred_transport,
|
preferred_transport,
|
||||||
get_address_cls
|
mk_uuid,
|
||||||
)
|
)
|
||||||
from ._state import current_actor, is_main_process
|
from ._state import current_actor, is_main_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
|
@ -134,7 +136,7 @@ class ActorNursery:
|
||||||
|
|
||||||
*,
|
*,
|
||||||
|
|
||||||
bind_addrs: list[AddressTypes]|None = None,
|
bind_addrs: list[UnwrappedAddress]|None = None,
|
||||||
rpc_module_paths: list[str]|None = None,
|
rpc_module_paths: list[str]|None = None,
|
||||||
enable_transports: list[str] = [preferred_transport],
|
enable_transports: list[str] = [preferred_transport],
|
||||||
enable_modules: list[str]|None = None,
|
enable_modules: list[str]|None = None,
|
||||||
|
@ -161,12 +163,6 @@ class ActorNursery:
|
||||||
or get_loglevel()
|
or get_loglevel()
|
||||||
)
|
)
|
||||||
|
|
||||||
if not bind_addrs:
|
|
||||||
bind_addrs: list[AddressTypes] = [
|
|
||||||
get_address_cls(transport).get_random().unwrap()
|
|
||||||
for transport in enable_transports
|
|
||||||
]
|
|
||||||
|
|
||||||
# configure and pass runtime state
|
# configure and pass runtime state
|
||||||
_rtv = _state._runtime_vars.copy()
|
_rtv = _state._runtime_vars.copy()
|
||||||
_rtv['_is_root'] = False
|
_rtv['_is_root'] = False
|
||||||
|
@ -189,7 +185,9 @@ class ActorNursery:
|
||||||
enable_modules.extend(rpc_module_paths)
|
enable_modules.extend(rpc_module_paths)
|
||||||
|
|
||||||
subactor = Actor(
|
subactor = Actor(
|
||||||
name,
|
name=name,
|
||||||
|
uuid=mk_uuid(),
|
||||||
|
|
||||||
# modules allowed to invoked funcs from
|
# modules allowed to invoked funcs from
|
||||||
enable_modules=enable_modules,
|
enable_modules=enable_modules,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
@ -197,7 +195,7 @@ class ActorNursery:
|
||||||
# verbatim relay this actor's registrar addresses
|
# verbatim relay this actor's registrar addresses
|
||||||
registry_addrs=current_actor().reg_addrs,
|
registry_addrs=current_actor().reg_addrs,
|
||||||
)
|
)
|
||||||
parent_addr = self._actor.accept_addr
|
parent_addr: UnwrappedAddress = self._actor.accept_addr
|
||||||
assert parent_addr
|
assert parent_addr
|
||||||
|
|
||||||
# start a task to spawn a process
|
# start a task to spawn a process
|
||||||
|
@ -235,7 +233,7 @@ class ActorNursery:
|
||||||
*,
|
*,
|
||||||
|
|
||||||
name: str | None = None,
|
name: str | None = None,
|
||||||
bind_addrs: AddressTypes|None = None,
|
bind_addrs: UnwrappedAddress|None = None,
|
||||||
rpc_module_paths: list[str] | None = None,
|
rpc_module_paths: list[str] | None = None,
|
||||||
enable_modules: list[str] | None = None,
|
enable_modules: list[str] | None = None,
|
||||||
loglevel: str | None = None, # set log level per subactor
|
loglevel: str | None = None, # set log level per subactor
|
||||||
|
|
|
@ -42,24 +42,15 @@ class MsgpackTCPStream(MsgpackTransport):
|
||||||
address_type = TCPAddress
|
address_type = TCPAddress
|
||||||
layer_key: int = 4
|
layer_key: int = 4
|
||||||
|
|
||||||
# def __init__(
|
|
||||||
# self,
|
|
||||||
# stream: trio.SocketStream,
|
|
||||||
# prefix_size: int = 4,
|
|
||||||
# codec: CodecType = None,
|
|
||||||
|
|
||||||
# ) -> None:
|
|
||||||
# super().__init__(
|
|
||||||
# stream,
|
|
||||||
# prefix_size=prefix_size,
|
|
||||||
# codec=codec
|
|
||||||
# )
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def maddr(self) -> str:
|
def maddr(self) -> str:
|
||||||
host, port = self.raddr.unwrap()
|
host, port = self.raddr.unwrap()
|
||||||
return (
|
return (
|
||||||
|
# TODO, use `ipaddress` from stdlib to handle
|
||||||
|
# first detecting which of `ipv4/6` before
|
||||||
|
# choosing the routing prefix part.
|
||||||
f'/ipv4/{host}'
|
f'/ipv4/{host}'
|
||||||
|
|
||||||
f'/{self.address_type.name_key}/{port}'
|
f'/{self.address_type.name_key}/{port}'
|
||||||
# f'/{self.chan.uid[0]}'
|
# f'/{self.chan.uid[0]}'
|
||||||
# f'/{self.cid}'
|
# f'/{self.cid}'
|
||||||
|
@ -94,12 +85,15 @@ class MsgpackTCPStream(MsgpackTransport):
|
||||||
cls,
|
cls,
|
||||||
stream: trio.SocketStream
|
stream: trio.SocketStream
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
tuple[str, int],
|
TCPAddress,
|
||||||
tuple[str, int]
|
TCPAddress,
|
||||||
]:
|
]:
|
||||||
|
# TODO, what types are these?
|
||||||
lsockname = stream.socket.getsockname()
|
lsockname = stream.socket.getsockname()
|
||||||
|
l_sockaddr: tuple[str, int] = tuple(lsockname[:2])
|
||||||
rsockname = stream.socket.getpeername()
|
rsockname = stream.socket.getpeername()
|
||||||
|
r_sockaddr: tuple[str, int] = tuple(rsockname[:2])
|
||||||
return (
|
return (
|
||||||
TCPAddress.from_addr(tuple(lsockname[:2])),
|
TCPAddress.from_addr(l_sockaddr),
|
||||||
TCPAddress.from_addr(tuple(rsockname[:2])),
|
TCPAddress.from_addr(r_sockaddr),
|
||||||
)
|
)
|
||||||
|
|
|
@ -18,8 +18,23 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from pathlib import Path
|
||||||
|
import os
|
||||||
|
from socket import (
|
||||||
|
# socket,
|
||||||
|
AF_UNIX,
|
||||||
|
SOCK_STREAM,
|
||||||
|
SO_PASSCRED,
|
||||||
|
SO_PEERCRED,
|
||||||
|
SOL_SOCKET,
|
||||||
|
)
|
||||||
|
import struct
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
from trio._highlevel_open_unix_stream import (
|
||||||
|
close_on_error,
|
||||||
|
has_unix,
|
||||||
|
)
|
||||||
|
|
||||||
from tractor.msg import MsgCodec
|
from tractor.msg import MsgCodec
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
@ -30,33 +45,80 @@ from tractor.ipc._transport import MsgpackTransport
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def open_unix_socket_w_passcred(
|
||||||
|
filename: str|bytes|os.PathLike[str]|os.PathLike[bytes],
|
||||||
|
) -> trio.SocketStream:
|
||||||
|
'''
|
||||||
|
Literally the exact same as `trio.open_unix_socket()` except we set the additiona
|
||||||
|
`socket.SO_PASSCRED` option to ensure the server side (the process calling `accept()`)
|
||||||
|
can extract the connecting peer's credentials, namely OS specific process
|
||||||
|
related IDs.
|
||||||
|
|
||||||
|
See this SO for "why" the extra opts,
|
||||||
|
- https://stackoverflow.com/a/7982749
|
||||||
|
|
||||||
|
'''
|
||||||
|
if not has_unix:
|
||||||
|
raise RuntimeError("Unix sockets are not supported on this platform")
|
||||||
|
|
||||||
|
# much more simplified logic vs tcp sockets - one socket type and only one
|
||||||
|
# possible location to connect to
|
||||||
|
sock = trio.socket.socket(AF_UNIX, SOCK_STREAM)
|
||||||
|
sock.setsockopt(SOL_SOCKET, SO_PASSCRED, 1)
|
||||||
|
with close_on_error(sock):
|
||||||
|
await sock.connect(os.fspath(filename))
|
||||||
|
|
||||||
|
return trio.SocketStream(sock)
|
||||||
|
|
||||||
|
|
||||||
|
def get_peer_info(sock: trio.socket.socket) -> tuple[
|
||||||
|
int, # pid
|
||||||
|
int, # uid
|
||||||
|
int, # guid
|
||||||
|
]:
|
||||||
|
'''
|
||||||
|
Deliver the connecting peer's "credentials"-info as defined in
|
||||||
|
a very Linux specific way..
|
||||||
|
|
||||||
|
For more deats see,
|
||||||
|
- `man accept`,
|
||||||
|
- `man unix`,
|
||||||
|
|
||||||
|
this great online guide to all things sockets,
|
||||||
|
- https://beej.us/guide/bgnet/html/split-wide/man-pages.html#setsockoptman
|
||||||
|
|
||||||
|
AND this **wonderful SO answer**
|
||||||
|
- https://stackoverflow.com/a/7982749
|
||||||
|
|
||||||
|
'''
|
||||||
|
creds: bytes = sock.getsockopt(
|
||||||
|
SOL_SOCKET,
|
||||||
|
SO_PEERCRED,
|
||||||
|
struct.calcsize('3i')
|
||||||
|
)
|
||||||
|
# i.e a tuple of the fields,
|
||||||
|
# pid: int, "process"
|
||||||
|
# uid: int, "user"
|
||||||
|
# gid: int, "group"
|
||||||
|
return struct.unpack('3i', creds)
|
||||||
|
|
||||||
|
|
||||||
class MsgpackUDSStream(MsgpackTransport):
|
class MsgpackUDSStream(MsgpackTransport):
|
||||||
'''
|
'''
|
||||||
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
A `trio.SocketStream` around a Unix-Domain-Socket transport
|
||||||
using the ``msgspec`` codec lib.
|
delivering `msgpack` encoded msgs using the `msgspec` codec lib.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
address_type = UDSAddress
|
address_type = UDSAddress
|
||||||
layer_key: int = 7
|
layer_key: int = 4
|
||||||
|
|
||||||
# def __init__(
|
|
||||||
# self,
|
|
||||||
# stream: trio.SocketStream,
|
|
||||||
# prefix_size: int = 4,
|
|
||||||
# codec: CodecType = None,
|
|
||||||
|
|
||||||
# ) -> None:
|
|
||||||
# super().__init__(
|
|
||||||
# stream,
|
|
||||||
# prefix_size=prefix_size,
|
|
||||||
# codec=codec
|
|
||||||
# )
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def maddr(self) -> str:
|
def maddr(self) -> str:
|
||||||
filepath = self.raddr.unwrap()
|
if not self.raddr:
|
||||||
|
return '<unknown-peer>'
|
||||||
|
|
||||||
|
filepath: Path = Path(self.raddr.unwrap()[0])
|
||||||
return (
|
return (
|
||||||
f'/ipv4/localhost'
|
|
||||||
f'/{self.address_type.name_key}/{filepath}'
|
f'/{self.address_type.name_key}/{filepath}'
|
||||||
# f'/{self.chan.uid[0]}'
|
# f'/{self.chan.uid[0]}'
|
||||||
# f'/{self.cid}'
|
# f'/{self.cid}'
|
||||||
|
@ -76,22 +138,72 @@ class MsgpackUDSStream(MsgpackTransport):
|
||||||
codec: MsgCodec|None = None,
|
codec: MsgCodec|None = None,
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> MsgpackUDSStream:
|
) -> MsgpackUDSStream:
|
||||||
stream = await trio.open_unix_socket(
|
|
||||||
addr.unwrap(),
|
filepath: Path
|
||||||
|
pid: int
|
||||||
|
(
|
||||||
|
filepath,
|
||||||
|
pid,
|
||||||
|
) = addr.unwrap()
|
||||||
|
|
||||||
|
# XXX NOTE, we don't need to provide the `.pid` part from
|
||||||
|
# the addr since the OS does this implicitly! .. lel
|
||||||
|
# stream = await trio.open_unix_socket(
|
||||||
|
stream = await open_unix_socket_w_passcred(
|
||||||
|
str(filepath),
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
return MsgpackUDSStream(
|
stream = MsgpackUDSStream(
|
||||||
stream,
|
stream,
|
||||||
prefix_size=prefix_size,
|
prefix_size=prefix_size,
|
||||||
codec=codec
|
codec=codec
|
||||||
)
|
)
|
||||||
|
stream._raddr = addr
|
||||||
|
return stream
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_stream_addrs(
|
def get_stream_addrs(
|
||||||
cls,
|
cls,
|
||||||
stream: trio.SocketStream
|
stream: trio.SocketStream
|
||||||
) -> tuple[UDSAddress, UDSAddress]:
|
) -> tuple[
|
||||||
return (
|
Path,
|
||||||
UDSAddress.from_addr(stream.socket.getsockname()),
|
int,
|
||||||
UDSAddress.from_addr(stream.socket.getsockname()),
|
]:
|
||||||
|
sock: trio.socket.socket = stream.socket
|
||||||
|
|
||||||
|
# NOTE XXX, it's unclear why one or the other ends up being
|
||||||
|
# `bytes` versus the socket-file-path, i presume it's
|
||||||
|
# something to do with who is the server (called `.listen()`)?
|
||||||
|
# maybe could be better implemented using another info-query
|
||||||
|
# on the socket like,
|
||||||
|
# https://beej.us/guide/bgnet/html/split-wide/system-calls-or-bust.html#gethostnamewho-am-i
|
||||||
|
sockname: str|bytes = sock.getsockname()
|
||||||
|
# https://beej.us/guide/bgnet/html/split-wide/system-calls-or-bust.html#getpeernamewho-are-you
|
||||||
|
peername: str|bytes = sock.getpeername()
|
||||||
|
match (peername, sockname):
|
||||||
|
case (str(), bytes()):
|
||||||
|
sock_path: Path = Path(peername)
|
||||||
|
case (bytes(), str()):
|
||||||
|
sock_path: Path = Path(sockname)
|
||||||
|
(
|
||||||
|
pid,
|
||||||
|
uid,
|
||||||
|
gid,
|
||||||
|
) = get_peer_info(sock)
|
||||||
|
log.info(
|
||||||
|
f'UDS connection from process {pid!r}\n'
|
||||||
|
f'>[\n'
|
||||||
|
f'|_{sock_path}\n'
|
||||||
|
f' |_pid: {pid}\n'
|
||||||
|
f' |_uid: {uid}\n'
|
||||||
|
f' |_gid: {gid}\n'
|
||||||
)
|
)
|
||||||
|
laddr = UDSAddress.from_addr((
|
||||||
|
sock_path,
|
||||||
|
os.getpid(),
|
||||||
|
))
|
||||||
|
raddr = UDSAddress.from_addr((
|
||||||
|
sock_path,
|
||||||
|
pid
|
||||||
|
))
|
||||||
|
return (laddr, raddr)
|
||||||
|
|
|
@ -31,6 +31,7 @@ from typing import (
|
||||||
Type,
|
Type,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
TypeAlias,
|
TypeAlias,
|
||||||
|
# TYPE_CHECKING,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -47,7 +48,7 @@ from tractor.msg import (
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor._addr import AddressTypes
|
from tractor._addr import UnwrappedAddress
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('tractor.msgspec')
|
log = get_logger('tractor.msgspec')
|
||||||
|
@ -142,9 +143,15 @@ class Aid(
|
||||||
'''
|
'''
|
||||||
name: str
|
name: str
|
||||||
uuid: str
|
uuid: str
|
||||||
# TODO: use built-in support for UUIDs?
|
|
||||||
# -[ ] `uuid.UUID` which has multi-protocol support
|
# TODO? can/should we extend this field set?
|
||||||
|
# -[ ] use built-in support for UUIDs? `uuid.UUID` which has
|
||||||
|
# multi-protocol support
|
||||||
# https://jcristharif.com/msgspec/supported-types.html#uuid
|
# https://jcristharif.com/msgspec/supported-types.html#uuid
|
||||||
|
#
|
||||||
|
# -[ ] as per the `.ipc._uds` / `._addr` comments, maybe we
|
||||||
|
# should also include at least `.pid` (equiv to port for tcp)
|
||||||
|
# and/or host-part always?
|
||||||
|
|
||||||
|
|
||||||
class SpawnSpec(
|
class SpawnSpec(
|
||||||
|
@ -168,8 +175,8 @@ class SpawnSpec(
|
||||||
|
|
||||||
# TODO: not just sockaddr pairs?
|
# TODO: not just sockaddr pairs?
|
||||||
# -[ ] abstract into a `TransportAddr` type?
|
# -[ ] abstract into a `TransportAddr` type?
|
||||||
reg_addrs: list[AddressTypes]
|
reg_addrs: list[UnwrappedAddress]
|
||||||
bind_addrs: list[AddressTypes]
|
bind_addrs: list[UnwrappedAddress]|None
|
||||||
|
|
||||||
|
|
||||||
# TODO: caps based RPC support in the payload?
|
# TODO: caps based RPC support in the payload?
|
||||||
|
|
Loading…
Reference in New Issue