Registry subsys rework

More or less a revamp (and possibly first draft for something similar in
`tractor` core) which ensures all actor trees attempt to discover the
`pikerd` registry actor.

Implementation improvements include:
- new `Registry` singleton which houses the `pikerd` discovery
  socket-address `Registry.addr` + a `open_registry()` manager which
  provides bootstrapped actor-local access.
- refine `open_piker_runtime()` to do the work of opening a root actor
  and call the new `open_registry()` depending on whether a runtime has
  yet been bootstrapped.
- rejig `[maybe_]open_pikerd()` in terms of the above.
samplerd_service
Tyler Goodlet 2023-01-11 20:51:21 -05:00
parent 71ca4c8e1f
commit 09711750bf
1 changed files with 230 additions and 121 deletions

View File

@ -18,10 +18,16 @@
Structured, daemon tree service management.
"""
from typing import Optional, Union, Callable, Any
from __future__ import annotations
import os
from typing import (
Optional,
Union,
Callable,
Any,
)
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from collections import defaultdict
@ -29,7 +35,10 @@ import tractor
import trio
from trio_typing import TaskStatus
from .log import get_logger, get_console_log
from .log import (
get_logger,
get_console_log,
)
from .brokers import get_brokermod
@ -44,14 +53,86 @@ _default_reg_addr: tuple[str, int] = (
_default_registry_port,
)
# NOTE: this value is set as an actor-global once the first endpoint
# who is capable, spawns a `pikerd` service tree.
_registry_addr: tuple[str, int] | None = None
_registry: Registry | None = None
class Registry:
addr: None | tuple[str, int] = None
# TODO: table of uids to sockaddrs
peers: dict[
tuple[str, str],
tuple[str, int],
] = {}
# _registry_addr: None | tuple[str, int] = None
_tractor_kwargs: dict[str, Any] = {}
@acm
async def open_registry(
addr: None | tuple[str, int] = None,
ensure_exists: bool = True,
) -> tuple[str, int]:
global _tractor_kwargs
actor = tractor.current_actor()
uid = actor.uid
if (
Registry.addr is not None
and addr
):
raise RuntimeError(
f'`{uid}` registry addr already bound @ {_registry.sockaddr}'
)
was_set: bool = False
if (
not tractor.is_root_process()
and Registry.addr is None
):
Registry.addr = actor._arb_addr
if (
ensure_exists
and Registry.addr is None
):
raise RuntimeError(
f"`{uid}` registry should already exist bug doesn't?"
)
if (
Registry.addr is None
):
was_set = True
Registry.addr = addr or _default_reg_addr
_tractor_kwargs['arbiter_addr'] = Registry.addr
try:
yield Registry.addr
finally:
# XXX: always clear the global addr if we set it so that the
# next (set of) calls will apply whatever new one is passed
# in.
if was_set:
Registry.addr = None
def get_tractor_runtime_kwargs() -> dict[str, Any]:
'''
Deliver ``tractor`` related runtime variables in a `dict`.
'''
return _tractor_kwargs
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr
}
_root_modules = [
__name__,
'piker.clearing._ems',
@ -161,31 +242,78 @@ class Services:
f'Serice task for {name} not terminated?'
@cm
def maybe_set_global_registry_sockaddr(
registry_addr: None | tuple[str, int] = None,
) -> None:
@acm
async def open_piker_runtime(
name: str,
enable_modules: list[str] = [],
loglevel: Optional[str] = None,
global _registry_addr
was_set: bool = False
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
# XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
# TODO: once we have `rsyscall` support we will read a config
# and spawn the service tree distributed per that.
start_method: str = 'trio',
tractor_kwargs: dict = {},
) -> tuple[
tractor.Actor,
tuple[str, int],
]:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
Can be called from a subactor or any program that needs to start
a root actor.
'''
try:
yield _registry_addr
finally:
# XXX: always clear the global addr if we set it so that the
# next (set of) calls will apply whatever new one is passed
# in.
if was_set:
_registry_addr = None
# check for existing runtime
actor = tractor.current_actor().uid
except tractor._exceptions.NoRuntime:
registry_addr = registry_addr or _default_reg_addr
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=enable_modules,
**tractor_kwargs,
) as _,
open_registry(registry_addr, ensure_exists=False) as addr,
):
yield (
tractor.current_actor(),
addr,
)
else:
async with open_registry(registry_addr) as addr:
yield (
actor,
addr,
)
@acm
async def open_pikerd(
start_method: str = 'trio',
loglevel: str | None = None,
# XXX: you should pretty much never want debug mode
@ -203,26 +331,24 @@ async def open_pikerd(
'''
with maybe_set_global_registry_sockaddr(registry_addr) as reg_addr:
async with (
tractor.open_root_actor(
open_piker_runtime(
# passed through to ``open_root_actor``
arbiter_addr=reg_addr,
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
loglevel=loglevel,
debug_mode=debug_mode,
registry_addr=registry_addr,
) as (root_actor, reg_addr),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
):
async with trio.open_nursery() as service_nursery:
assert root_actor.accept_addr == reg_addr
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
@ -236,44 +362,6 @@ async def open_pikerd(
service_nursery.cancel_scope.cancel()
@acm
async def open_piker_runtime(
name: str,
enable_modules: list[str] = [],
start_method: str = 'trio',
loglevel: Optional[str] = None,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
) -> tractor.Actor:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
'''
with maybe_set_global_registry_sockaddr(registry_addr) as reg_addr:
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=reg_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules + enable_modules,
) as _,
):
yield tractor.current_actor()
@acm
async def maybe_open_runtime(
loglevel: Optional[str] = None,
@ -284,23 +372,30 @@ async def maybe_open_runtime(
Start the ``tractor`` runtime (a root actor) if none exists.
"""
settings = _tractor_kwargs
settings.update(kwargs)
# settings = get_tractor_runtime_kwargs()
# settings.update(kwargs)
name = kwargs.pop('name')
if not tractor.current_actor(err_on_no_runtime=False):
async with tractor.open_root_actor(
async with open_piker_runtime(
name,
loglevel=loglevel,
**settings,
):
yield
**kwargs,
# registry
# tractor_kwargs=kwargs,
) as (_, addr):
yield addr,
else:
yield
async with open_registry() as addr:
yield addr
@acm
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
**kwargs,
) -> Union[tractor._portal.Portal, Services]:
@ -314,18 +409,31 @@ async def maybe_open_pikerd(
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
query_name = kwargs.pop('name', f'piker_query_{os.getpid()}')
# TODO: if we need to make the query part faster we could not init
# an actor runtime and instead just hit the socket?
# from tractor._ipc import _connect_chan, Channel
# async with _connect_chan(host, port) as chan:
# async with open_portal(chan) as arb_portal:
# yield arb_portal
async with (
maybe_open_runtime(loglevel, **kwargs),
tractor.find_actor(_root_dname) as portal
open_piker_runtime(
name=query_name,
registry_addr=registry_addr,
loglevel=loglevel,
**kwargs,
) as _,
tractor.find_actor(
_root_dname,
arbiter_sockaddr=registry_addr,
) as portal
):
# connect to any existing daemon presuming
# its registry socket was selected.
if (
portal is not None
and (
registry_addr is None
or portal.channel.raddr == registry_addr
)
):
yield portal
return
@ -333,7 +441,6 @@ async def maybe_open_pikerd(
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
@ -361,13 +468,14 @@ _data_mods = [
@acm
async def find_service(
service_name: str,
) -> Optional[tractor.Portal]:
) -> tractor.Portal | None:
async with open_registry() as reg_addr:
log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=_registry_addr or _default_reg_addr,
arbiter_sockaddr=reg_addr,
) as maybe_portal:
yield maybe_portal
@ -375,14 +483,15 @@ async def find_service(
async def check_for_service(
service_name: str,
) -> bool:
) -> None | tuple[str, int]:
'''
Service daemon "liveness" predicate.
'''
async with open_registry(ensure_exists=False) as reg_addr:
async with tractor.query_actor(
service_name,
arbiter_sockaddr=_registry_addr or _default_reg_addr,
arbiter_sockaddr=reg_addr,
) as sockaddr:
return sockaddr