Allocate bind-addrs in subactors
Previously whenever an `ActorNursery.start_actor()` call did not receive a `bind_addrs` arg we would allocate the default `(localhost, 0)` pairs in the parent, for UDS this obviously won't work nor is it ideal bc it's nicer to have the actor to be a socket server (who calls `Address.open_listener()`) define the socket-file-name containing their unique ID info such as pid, actor-uuid etc. As such this moves "random" generation of server addresses to the child-side of a subactor's spawn-sequence when it's sin-`bind_addrs`; i.e. we do the allocation of the `Address.get_random()` addrs inside `._runtime.async_main()` instead of `Portal.start_actor()` and **only when** `accept_addrs`/`bind_addrs` was **not provided by the spawning parent**. Further this patch get's way more rigorous about the `SpawnSpec` processing in the child inside `Actor._from_parent()` such that we handle any invalid msgs **very loudly and pedantically!** Impl deats, - do the "random addr generation" in an explicit `for` loop (instead of prior comprehension) to allow for more detailed typing of the layered calls to the new `._addr` mod. - use a `match:/case:` for process any invalid `SpawnSpec` payload case where we can instead receive a `MsgTypeError` from the `chan.recv()` call in `Actor._from_parent()` to raise it immediately instead of triggering downstream type-errors XD |_ as per the big `#TODO` we prolly want to take from other callers of `Channel.recv()` (like in the `._rpc.process_messages()` loop). |_ always raise `InternalError` on non-match/fall-through case! |_ add a note about not being able to use `breakpoint()` in this section due to causality of `SpawnSpec._runtime_vars` not having been processed yet.. |_ always return a third element from `._from_rent()` eventually to be the `preferred_transports: list[str]` from the spawning rent. - use new `._addr.mk_uuid()` and pass to new `Actor.__init__(uuid: str)` for all actor creation (including in all the mods tweaked here). - Move to new type-alias-name `UnwrappedAddress` throughout.leslies_extra_appendix
parent
23acd0f4cb
commit
6a5ccc2425
|
@ -47,10 +47,11 @@ from .ipc import (
|
|||
_connect_chan,
|
||||
)
|
||||
from ._addr import (
|
||||
AddressTypes,
|
||||
wrap_address,
|
||||
UnwrappedAddress,
|
||||
default_lo_addrs,
|
||||
mk_uuid,
|
||||
preferred_transport,
|
||||
default_lo_addrs
|
||||
wrap_address,
|
||||
)
|
||||
from ._exceptions import is_multi_cancelled
|
||||
|
||||
|
@ -63,10 +64,10 @@ async def open_root_actor(
|
|||
|
||||
*,
|
||||
# defaults are above
|
||||
registry_addrs: list[AddressTypes]|None = None,
|
||||
registry_addrs: list[UnwrappedAddress]|None = None,
|
||||
|
||||
# defaults are above
|
||||
arbiter_addr: tuple[AddressTypes]|None = None,
|
||||
arbiter_addr: tuple[UnwrappedAddress]|None = None,
|
||||
|
||||
enable_transports: list[str] = [preferred_transport],
|
||||
|
||||
|
@ -195,7 +196,9 @@ async def open_root_actor(
|
|||
registry_addrs = [arbiter_addr]
|
||||
|
||||
if not registry_addrs:
|
||||
registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports)
|
||||
registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
|
||||
enable_transports
|
||||
)
|
||||
|
||||
assert registry_addrs
|
||||
|
||||
|
@ -245,10 +248,10 @@ async def open_root_actor(
|
|||
enable_stack_on_sig()
|
||||
|
||||
# closed into below ping task-func
|
||||
ponged_addrs: list[AddressTypes] = []
|
||||
ponged_addrs: list[UnwrappedAddress] = []
|
||||
|
||||
async def ping_tpt_socket(
|
||||
addr: AddressTypes,
|
||||
addr: UnwrappedAddress,
|
||||
timeout: float = 1,
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -284,7 +287,7 @@ async def open_root_actor(
|
|||
addr,
|
||||
)
|
||||
|
||||
trans_bind_addrs: list[AddressTypes] = []
|
||||
trans_bind_addrs: list[UnwrappedAddress] = []
|
||||
|
||||
# Create a new local root-actor instance which IS NOT THE
|
||||
# REGISTRAR
|
||||
|
@ -302,6 +305,7 @@ async def open_root_actor(
|
|||
|
||||
actor = Actor(
|
||||
name=name or 'anonymous',
|
||||
uuid=mk_uuid(),
|
||||
registry_addrs=ponged_addrs,
|
||||
loglevel=loglevel,
|
||||
enable_modules=enable_modules,
|
||||
|
@ -336,7 +340,8 @@ async def open_root_actor(
|
|||
# https://github.com/goodboy/tractor/issues/296
|
||||
|
||||
actor = Arbiter(
|
||||
name or 'registrar',
|
||||
name=name or 'registrar',
|
||||
uuid=mk_uuid(),
|
||||
registry_addrs=registry_addrs,
|
||||
loglevel=loglevel,
|
||||
enable_modules=enable_modules,
|
||||
|
@ -462,7 +467,7 @@ def run_daemon(
|
|||
|
||||
# runtime kwargs
|
||||
name: str | None = 'root',
|
||||
registry_addrs: list[AddressTypes]|None = None,
|
||||
registry_addrs: list[UnwrappedAddress]|None = None,
|
||||
|
||||
start_method: str | None = None,
|
||||
debug_mode: bool = False,
|
||||
|
|
|
@ -52,6 +52,7 @@ import sys
|
|||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
import uuid
|
||||
|
@ -75,11 +76,12 @@ from tractor.msg import (
|
|||
)
|
||||
from .ipc import Channel
|
||||
from ._addr import (
|
||||
AddressTypes,
|
||||
UnwrappedAddress,
|
||||
Address,
|
||||
wrap_address,
|
||||
default_lo_addrs,
|
||||
get_address_cls,
|
||||
preferred_transport,
|
||||
default_lo_addrs
|
||||
wrap_address,
|
||||
)
|
||||
from ._context import (
|
||||
mk_context,
|
||||
|
@ -182,15 +184,15 @@ class Actor:
|
|||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
uuid: str,
|
||||
*,
|
||||
enable_modules: list[str] = [],
|
||||
uid: str|None = None,
|
||||
loglevel: str|None = None,
|
||||
registry_addrs: list[AddressTypes]|None = None,
|
||||
registry_addrs: list[UnwrappedAddress]|None = None,
|
||||
spawn_method: str|None = None,
|
||||
|
||||
# TODO: remove!
|
||||
arbiter_addr: AddressTypes|None = None,
|
||||
arbiter_addr: UnwrappedAddress|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -199,10 +201,7 @@ class Actor:
|
|||
|
||||
'''
|
||||
self.name = name
|
||||
self.uid = (
|
||||
name,
|
||||
uid or str(uuid.uuid4())
|
||||
)
|
||||
self.uid = (name, uuid)
|
||||
|
||||
self._cancel_complete = trio.Event()
|
||||
self._cancel_called_by_remote: tuple[str, tuple]|None = None
|
||||
|
@ -230,7 +229,7 @@ class Actor:
|
|||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
registry_addrs: list[AddressTypes] = [arbiter_addr]
|
||||
registry_addrs: list[UnwrappedAddress] = [arbiter_addr]
|
||||
|
||||
# marked by the process spawning backend at startup
|
||||
# will be None for the parent most process started manually
|
||||
|
@ -277,13 +276,13 @@ class Actor:
|
|||
|
||||
# when provided, init the registry addresses property from
|
||||
# input via the validator.
|
||||
self._reg_addrs: list[AddressTypes] = []
|
||||
self._reg_addrs: list[UnwrappedAddress] = []
|
||||
if registry_addrs:
|
||||
self.reg_addrs: list[AddressTypes] = registry_addrs
|
||||
self.reg_addrs: list[UnwrappedAddress] = registry_addrs
|
||||
_state._runtime_vars['_registry_addrs'] = registry_addrs
|
||||
|
||||
@property
|
||||
def reg_addrs(self) -> list[AddressTypes]:
|
||||
def reg_addrs(self) -> list[UnwrappedAddress]:
|
||||
'''
|
||||
List of (socket) addresses for all known (and contactable)
|
||||
registry actors.
|
||||
|
@ -294,7 +293,7 @@ class Actor:
|
|||
@reg_addrs.setter
|
||||
def reg_addrs(
|
||||
self,
|
||||
addrs: list[AddressTypes],
|
||||
addrs: list[UnwrappedAddress],
|
||||
) -> None:
|
||||
if not addrs:
|
||||
log.warning(
|
||||
|
@ -1023,11 +1022,12 @@ class Actor:
|
|||
|
||||
async def _from_parent(
|
||||
self,
|
||||
parent_addr: AddressTypes|None,
|
||||
parent_addr: UnwrappedAddress|None,
|
||||
|
||||
) -> tuple[
|
||||
Channel,
|
||||
list[AddressTypes]|None,
|
||||
list[UnwrappedAddress]|None,
|
||||
list[str]|None, # preferred tpts
|
||||
]:
|
||||
'''
|
||||
Bootstrap this local actor's runtime config from its parent by
|
||||
|
@ -1039,30 +1039,58 @@ class Actor:
|
|||
# Connect back to the parent actor and conduct initial
|
||||
# handshake. From this point on if we error, we
|
||||
# attempt to ship the exception back to the parent.
|
||||
chan = await Channel.from_addr(wrap_address(parent_addr))
|
||||
chan = await Channel.from_addr(
|
||||
addr=wrap_address(parent_addr)
|
||||
)
|
||||
assert isinstance(chan, Channel)
|
||||
|
||||
# TODO: move this into a `Channel.handshake()`?
|
||||
# Initial handshake: swap names.
|
||||
await self._do_handshake(chan)
|
||||
|
||||
accept_addrs: list[AddressTypes]|None = None
|
||||
accept_addrs: list[UnwrappedAddress]|None = None
|
||||
|
||||
if self._spawn_method == "trio":
|
||||
|
||||
# Receive post-spawn runtime state from our parent.
|
||||
spawnspec: msgtypes.SpawnSpec = await chan.recv()
|
||||
self._spawn_spec = spawnspec
|
||||
match spawnspec:
|
||||
case MsgTypeError():
|
||||
raise spawnspec
|
||||
case msgtypes.SpawnSpec():
|
||||
self._spawn_spec = spawnspec
|
||||
log.runtime(
|
||||
'Received runtime spec from parent:\n\n'
|
||||
|
||||
log.runtime(
|
||||
'Received runtime spec from parent:\n\n'
|
||||
# TODO: eventually all these msgs as
|
||||
# `msgspec.Struct` with a special mode that
|
||||
# pformats them in multi-line mode, BUT only
|
||||
# if "trace"/"util" mode is enabled?
|
||||
f'{pretty_struct.pformat(spawnspec)}\n'
|
||||
)
|
||||
|
||||
# TODO: eventually all these msgs as
|
||||
# `msgspec.Struct` with a special mode that
|
||||
# pformats them in multi-line mode, BUT only
|
||||
# if "trace"/"util" mode is enabled?
|
||||
f'{pretty_struct.pformat(spawnspec)}\n'
|
||||
)
|
||||
accept_addrs: list[AddressTypes] = spawnspec.bind_addrs
|
||||
case _:
|
||||
raise InternalError(
|
||||
f'Received invalid non-`SpawnSpec` payload !?\n'
|
||||
f'{spawnspec}\n'
|
||||
)
|
||||
|
||||
# ^^TODO XXX!! when the `SpawnSpec` fails to decode
|
||||
# the above will raise a `MsgTypeError` which if we
|
||||
# do NOT ALSO RAISE it will tried to be pprinted in
|
||||
# the log.runtime() below..
|
||||
#
|
||||
# SO we gotta look at how other `chan.recv()` calls
|
||||
# are wrapped and do the same for this spec receive!
|
||||
# -[ ] see `._rpc` likely has the answer?
|
||||
#
|
||||
# XXX NOTE, can't be called here in subactor
|
||||
# bc we haven't yet received the
|
||||
# `SpawnSpec._runtime_vars: dict` which would
|
||||
# declare whether `debug_mode` is set!
|
||||
# breakpoint()
|
||||
# import pdbp; pdbp.set_trace()
|
||||
accept_addrs: list[UnwrappedAddress] = spawnspec.bind_addrs
|
||||
|
||||
# TODO: another `Struct` for rtvs..
|
||||
rvs: dict[str, Any] = spawnspec._runtime_vars
|
||||
|
@ -1154,6 +1182,9 @@ class Actor:
|
|||
return (
|
||||
chan,
|
||||
accept_addrs,
|
||||
None,
|
||||
# ^TODO, preferred tpts list from rent!
|
||||
# -[ ] need to extend the `SpawnSpec` tho!
|
||||
)
|
||||
|
||||
except OSError: # failed to connect
|
||||
|
@ -1169,7 +1200,7 @@ class Actor:
|
|||
self,
|
||||
handler_nursery: Nursery,
|
||||
*,
|
||||
listen_addrs: list[AddressTypes]|None = None,
|
||||
listen_addrs: list[UnwrappedAddress]|None = None,
|
||||
|
||||
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
|
||||
) -> None:
|
||||
|
@ -1578,7 +1609,7 @@ class Actor:
|
|||
return False
|
||||
|
||||
@property
|
||||
def accept_addrs(self) -> list[AddressTypes]:
|
||||
def accept_addrs(self) -> list[UnwrappedAddress]:
|
||||
'''
|
||||
All addresses to which the transport-channel server binds
|
||||
and listens for new connections.
|
||||
|
@ -1587,7 +1618,7 @@ class Actor:
|
|||
return [a.unwrap() for a in self._listen_addrs]
|
||||
|
||||
@property
|
||||
def accept_addr(self) -> AddressTypes:
|
||||
def accept_addr(self) -> UnwrappedAddress:
|
||||
'''
|
||||
Primary address to which the IPC transport server is
|
||||
bound and listening for new connections.
|
||||
|
@ -1639,8 +1670,6 @@ class Actor:
|
|||
chan.aid = aid
|
||||
|
||||
uid: tuple[str, str] = (
|
||||
# str(value[0]),
|
||||
# str(value[1])
|
||||
aid.name,
|
||||
aid.uuid,
|
||||
)
|
||||
|
@ -1664,7 +1693,7 @@ class Actor:
|
|||
|
||||
async def async_main(
|
||||
actor: Actor,
|
||||
accept_addrs: AddressTypes|None = None,
|
||||
accept_addrs: UnwrappedAddress|None = None,
|
||||
|
||||
# XXX: currently ``parent_addr`` is only needed for the
|
||||
# ``multiprocessing`` backend (which pickles state sent to
|
||||
|
@ -1673,7 +1702,7 @@ async def async_main(
|
|||
# change this to a simple ``is_subactor: bool`` which will
|
||||
# be False when running as root actor and True when as
|
||||
# a subactor.
|
||||
parent_addr: AddressTypes|None = None,
|
||||
parent_addr: UnwrappedAddress|None = None,
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
|
@ -1702,16 +1731,31 @@ async def async_main(
|
|||
(
|
||||
actor._parent_chan,
|
||||
set_accept_addr_says_rent,
|
||||
maybe_preferred_transports_says_rent,
|
||||
) = await actor._from_parent(parent_addr)
|
||||
|
||||
|
||||
# either it's passed in because we're not a child or
|
||||
# because we're running in mp mode
|
||||
accept_addrs: list[UnwrappedAddress] = []
|
||||
if (
|
||||
set_accept_addr_says_rent
|
||||
and
|
||||
set_accept_addr_says_rent is not None
|
||||
):
|
||||
accept_addrs = set_accept_addr_says_rent
|
||||
else:
|
||||
enable_transports: list[str] = (
|
||||
maybe_preferred_transports_says_rent
|
||||
or
|
||||
[preferred_transport]
|
||||
)
|
||||
for transport_key in enable_transports:
|
||||
transport_cls: Type[Address] = get_address_cls(
|
||||
transport_key
|
||||
)
|
||||
addr: Address = transport_cls.get_random()
|
||||
accept_addrs.append(addr.unwrap())
|
||||
|
||||
# The "root" nursery ensures the channel with the immediate
|
||||
# parent is kept alive as a resilient service until
|
||||
|
@ -1779,7 +1823,7 @@ async def async_main(
|
|||
|
||||
raise
|
||||
|
||||
accept_addrs: list[AddressTypes] = actor.accept_addrs
|
||||
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
|
||||
|
||||
# NOTE: only set the loopback addr for the
|
||||
# process-tree-global "root" mailbox since
|
||||
|
@ -2028,7 +2072,7 @@ class Arbiter(Actor):
|
|||
|
||||
self._registry: dict[
|
||||
tuple[str, str],
|
||||
AddressTypes,
|
||||
UnwrappedAddress,
|
||||
] = {}
|
||||
self._waiters: dict[
|
||||
str,
|
||||
|
@ -2044,7 +2088,7 @@ class Arbiter(Actor):
|
|||
self,
|
||||
name: str,
|
||||
|
||||
) -> AddressTypes|None:
|
||||
) -> UnwrappedAddress|None:
|
||||
|
||||
for uid, addr in self._registry.items():
|
||||
if name in uid:
|
||||
|
@ -2055,7 +2099,7 @@ class Arbiter(Actor):
|
|||
async def get_registry(
|
||||
self
|
||||
|
||||
) -> dict[str, AddressTypes]:
|
||||
) -> dict[str, UnwrappedAddress]:
|
||||
'''
|
||||
Return current name registry.
|
||||
|
||||
|
@ -2075,7 +2119,7 @@ class Arbiter(Actor):
|
|||
self,
|
||||
name: str,
|
||||
|
||||
) -> list[AddressTypes]:
|
||||
) -> list[UnwrappedAddress]:
|
||||
'''
|
||||
Wait for a particular actor to register.
|
||||
|
||||
|
@ -2083,8 +2127,8 @@ class Arbiter(Actor):
|
|||
registered.
|
||||
|
||||
'''
|
||||
addrs: list[AddressTypes] = []
|
||||
addr: AddressTypes
|
||||
addrs: list[UnwrappedAddress] = []
|
||||
addr: UnwrappedAddress
|
||||
|
||||
mailbox_info: str = 'Actor registry contact infos:\n'
|
||||
for uid, addr in self._registry.items():
|
||||
|
@ -2110,7 +2154,7 @@ class Arbiter(Actor):
|
|||
async def register_actor(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
addr: AddressTypes
|
||||
addr: UnwrappedAddress
|
||||
) -> None:
|
||||
uid = name, hash = (str(uid[0]), str(uid[1]))
|
||||
waddr: Address = wrap_address(addr)
|
||||
|
|
|
@ -22,7 +22,9 @@ from contextlib import asynccontextmanager as acm
|
|||
from functools import partial
|
||||
import inspect
|
||||
from pprint import pformat
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
import typing
|
||||
import warnings
|
||||
|
||||
|
@ -31,9 +33,9 @@ import trio
|
|||
|
||||
from .devx._debug import maybe_wait_for_debugger
|
||||
from ._addr import (
|
||||
AddressTypes,
|
||||
UnwrappedAddress,
|
||||
preferred_transport,
|
||||
get_address_cls
|
||||
mk_uuid,
|
||||
)
|
||||
from ._state import current_actor, is_main_process
|
||||
from .log import get_logger, get_loglevel
|
||||
|
@ -134,7 +136,7 @@ class ActorNursery:
|
|||
|
||||
*,
|
||||
|
||||
bind_addrs: list[AddressTypes]|None = None,
|
||||
bind_addrs: list[UnwrappedAddress]|None = None,
|
||||
rpc_module_paths: list[str]|None = None,
|
||||
enable_transports: list[str] = [preferred_transport],
|
||||
enable_modules: list[str]|None = None,
|
||||
|
@ -161,12 +163,6 @@ class ActorNursery:
|
|||
or get_loglevel()
|
||||
)
|
||||
|
||||
if not bind_addrs:
|
||||
bind_addrs: list[AddressTypes] = [
|
||||
get_address_cls(transport).get_random().unwrap()
|
||||
for transport in enable_transports
|
||||
]
|
||||
|
||||
# configure and pass runtime state
|
||||
_rtv = _state._runtime_vars.copy()
|
||||
_rtv['_is_root'] = False
|
||||
|
@ -189,7 +185,9 @@ class ActorNursery:
|
|||
enable_modules.extend(rpc_module_paths)
|
||||
|
||||
subactor = Actor(
|
||||
name,
|
||||
name=name,
|
||||
uuid=mk_uuid(),
|
||||
|
||||
# modules allowed to invoked funcs from
|
||||
enable_modules=enable_modules,
|
||||
loglevel=loglevel,
|
||||
|
@ -197,7 +195,7 @@ class ActorNursery:
|
|||
# verbatim relay this actor's registrar addresses
|
||||
registry_addrs=current_actor().reg_addrs,
|
||||
)
|
||||
parent_addr = self._actor.accept_addr
|
||||
parent_addr: UnwrappedAddress = self._actor.accept_addr
|
||||
assert parent_addr
|
||||
|
||||
# start a task to spawn a process
|
||||
|
@ -235,7 +233,7 @@ class ActorNursery:
|
|||
*,
|
||||
|
||||
name: str | None = None,
|
||||
bind_addrs: AddressTypes|None = None,
|
||||
bind_addrs: UnwrappedAddress|None = None,
|
||||
rpc_module_paths: list[str] | None = None,
|
||||
enable_modules: list[str] | None = None,
|
||||
loglevel: str | None = None, # set log level per subactor
|
||||
|
|
Loading…
Reference in New Issue