Support multi-homed service actors and multiaddrs

This commit requires an equivalent commit in `tractor` which adds
multi-homed transport server support to the runtime and thus the ability
ability to listen on multiple (embedded protocol) addrs / networks as
well as exposing registry actors similarly. Multiple bind addresses can
now be (bare bones) specified either in the `conf.toml:[network]`
section, or passed on the `pikerd` CLI.

This patch specifically requires the ability to pass a `registry_addrs:
list[tuple]` into `tractor.open_root_actor()` as well as adjusts all
internal runtime routines to do the same, mostly inside the `.service`
pkg.

Further details include:
- adding a new `.service._multiaddr` parser module (which will likely be
  moved into `tractor`'s core) which supports loading lib2p2 style
  "multiaddresses" both from the `conf.toml` and the `pikerd` CLI as
  per,
- reworking the `pikerd` cmd to accept a new `--maddr`/`-m` param that
  accepts multiaddresses.
- adjust the actor-registry subsys to support multi-homing by also
  accepting a list of addrs to its top level API eps.
- various internal name changes to reflect the multi-address interface
  changes throughout.
- non-working CLI tweaks to `piker chart` (ui-client cmds) to begin
  accepting maddrs.
- dropping all elasticsearch and marketstore flags / usage from `pikerd`
  for now since we're planning to drop mkts and elasticsearch will be an
  optional dep in the future.
distribute_dis
Tyler Goodlet 2023-09-28 12:13:34 -04:00
parent f94244aad4
commit 57010d479d
6 changed files with 375 additions and 120 deletions

View File

@ -19,7 +19,7 @@ CLI commons.
''' '''
import os import os
from contextlib import AsyncExitStack # from contextlib import AsyncExitStack
from types import ModuleType from types import ModuleType
import click import click
@ -43,34 +43,52 @@ log = get_logger('piker.cli')
@click.command() @click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--pdb', is_flag=True, help='Enable tractor debug mode')
@click.option('--host', '-h', default=None, help='Host addr to bind')
@click.option('--port', '-p', default=None, help='Port number to bind')
@click.option( @click.option(
'--tsdb', '--loglevel',
is_flag=True, '-l',
help='Enable local ``marketstore`` instance' default='warning',
help='Logging level',
) )
@click.option( @click.option(
'--es', '--tl',
is_flag=True, is_flag=True,
help='Enable local ``elasticsearch`` instance' help='Enable tractor-runtime logs',
) )
@click.option(
'--pdb',
is_flag=True,
help='Enable tractor debug mode',
)
@click.option(
'--maddr',
'-m',
default=None,
help='Multiaddrs to bind or contact',
)
# @click.option(
# '--tsdb',
# is_flag=True,
# help='Enable local ``marketstore`` instance'
# )
# @click.option(
# '--es',
# is_flag=True,
# help='Enable local ``elasticsearch`` instance'
# )
def pikerd( def pikerd(
maddr: str | None,
loglevel: str, loglevel: str,
host: str,
port: int,
tl: bool, tl: bool,
pdb: bool, pdb: bool,
tsdb: bool, # tsdb: bool,
es: bool, # es: bool,
): ):
''' '''
Spawn the piker broker-daemon. Spawn the piker broker-daemon.
''' '''
from cornerboi._debug import open_crash_handler
with open_crash_handler():
log = get_console_log(loglevel, name='cli') log = get_console_log(loglevel, name='cli')
if pdb: if pdb:
@ -82,45 +100,98 @@ def pikerd(
"\n" "\n"
)) ))
reg_addr: None | tuple[str, int] = None # service-actor registry endpoint socket-address
if host or port: regaddrs: list[tuple[str, int]] | None = None
reg_addr = (
host or _default_registry_host, conf, _ = config.load(
int(port) or _default_registry_port, conf_name='conf',
) )
network: dict = conf.get('network')
if network is None:
regaddrs = [(
_default_registry_host,
_default_registry_port,
)]
from .. import service from .. import service
from ..service._multiaddr import parse_addr
# transport-oriented endpoint multi-addresses
eps: dict[
str, # service name, eg. `pikerd`, `emsd`..
# libp2p style multi-addresses parsed into prot layers
list[dict[str, str | int]]
] = {}
if (
not maddr
and network
):
# load network section and (attempt to) connect all endpoints
# which are reachable B)
for key, maddrs in network.items():
match key:
# TODO: resolve table across multiple discov
# prots Bo
case 'resolv':
pass
case 'pikerd':
dname: str = key
for maddr in maddrs:
layers: dict = parse_addr(maddr)
eps.setdefault(
dname,
[],
).append(layers)
else:
# presume user is manually specifying the root actor ep.
eps['pikerd'] = [parse_addr(maddr)]
regaddrs: list[tuple[str, int]] = []
for layers in eps['pikerd']:
regaddrs.append((
layers['ipv4']['addr'],
layers['tcp']['port'],
))
async def main(): async def main():
service_mngr: service.Services service_mngr: service.Services
async with ( async with (
service.open_pikerd( service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel, loglevel=loglevel,
debug_mode=pdb, debug_mode=pdb,
registry_addr=reg_addr,
) as service_mngr, # normally delivers a ``Services`` handle ) as service_mngr, # normally delivers a ``Services`` handle
AsyncExitStack() as stack, # AsyncExitStack() as stack,
): ):
if tsdb: # TODO: spawn all other sub-actor daemons according to
dname, conf = await stack.enter_async_context( # multiaddress endpoint spec defined by user config
service.marketstore.start_ahab_daemon( assert service_mngr
service_mngr,
loglevel=loglevel,
)
)
log.info(f'TSDB `{dname}` up with conf:\n{conf}')
if es: # if tsdb:
dname, conf = await stack.enter_async_context( # dname, conf = await stack.enter_async_context(
service.elastic.start_ahab_daemon( # service.marketstore.start_ahab_daemon(
service_mngr, # service_mngr,
loglevel=loglevel, # loglevel=loglevel,
) # )
) # )
log.info(f'DB `{dname}` up with conf:\n{conf}') # log.info(f'TSDB `{dname}` up with conf:\n{conf}')
# if es:
# dname, conf = await stack.enter_async_context(
# service.elastic.start_ahab_daemon(
# service_mngr,
# loglevel=loglevel,
# )
# )
# log.info(f'DB `{dname}` up with conf:\n{conf}')
await trio.sleep_forever() await trio.sleep_forever()
@ -137,8 +208,8 @@ def pikerd(
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--configdir', '-c', help='Configuration directory') @click.option('--configdir', '-c', help='Configuration directory')
@click.option('--host', '-h', default=None, help='Host addr to bind') @click.option('--maddr', '-m', default=None, help='Multiaddr to bind')
@click.option('--port', '-p', default=None, help='Port number to bind') @click.option('--raddr', '-r', default=None, help='Registrar addr to contact')
@click.pass_context @click.pass_context
def cli( def cli(
ctx: click.Context, ctx: click.Context,
@ -146,8 +217,10 @@ def cli(
loglevel: str, loglevel: str,
tl: bool, tl: bool,
configdir: str, configdir: str,
host: str,
port: int, # TODO: make these list[str] with multiple -m maddr0 -m maddr1
maddr: str,
raddr: str,
) -> None: ) -> None:
if configdir is not None: if configdir is not None:
@ -168,11 +241,9 @@ def cli(
} }
assert brokermods assert brokermods
reg_addr: None | tuple[str, int] = None regaddr: tuple[str, int] = (
if host or port: _default_registry_host,
reg_addr = ( _default_registry_port,
host or _default_registry_host,
int(port) or _default_registry_port,
) )
ctx.obj.update({ ctx.obj.update({
@ -183,7 +254,7 @@ def cli(
'log': get_console_log(loglevel), 'log': get_console_log(loglevel),
'confdir': config._config_dir, 'confdir': config._config_dir,
'wl_path': config._watchlists_data_path, 'wl_path': config._watchlists_data_path,
'registry_addr': reg_addr, 'registry_addr': regaddr,
}) })
# allow enabling same loglevel in ``tractor`` machinery # allow enabling same loglevel in ``tractor`` machinery
@ -230,7 +301,7 @@ def services(config, tl, ports):
def _load_clis() -> None: def _load_clis() -> None:
from ..service import elastic # noqa # from ..service import elastic # noqa
from ..brokers import cli # noqa from ..brokers import cli # noqa
from ..ui import cli # noqa from ..ui import cli # noqa
from ..watchlists import cli # noqa from ..watchlists import cli # noqa

View File

@ -39,7 +39,7 @@ from ._actor_runtime import (
open_piker_runtime, open_piker_runtime,
maybe_open_pikerd, maybe_open_pikerd,
open_pikerd, open_pikerd,
get_tractor_runtime_kwargs, get_runtime_vars,
) )
from ..brokers._daemon import ( from ..brokers._daemon import (
spawn_brokerd, spawn_brokerd,
@ -58,5 +58,5 @@ __all__ = [
'open_piker_runtime', 'open_piker_runtime',
'maybe_open_pikerd', 'maybe_open_pikerd',
'open_pikerd', 'open_pikerd',
'get_tractor_runtime_kwargs', 'get_runtime_vars',
] ]

View File

@ -45,7 +45,7 @@ from ._registry import ( # noqa
) )
def get_tractor_runtime_kwargs() -> dict[str, Any]: def get_runtime_vars() -> dict[str, Any]:
''' '''
Deliver ``tractor`` related runtime variables in a `dict`. Deliver ``tractor`` related runtime variables in a `dict`.
@ -56,6 +56,8 @@ def get_tractor_runtime_kwargs() -> dict[str, Any]:
@acm @acm
async def open_piker_runtime( async def open_piker_runtime(
name: str, name: str,
registry_addrs: list[tuple[str, int]],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
@ -63,8 +65,6 @@ async def open_piker_runtime(
# for data daemons when running in production. # for data daemons when running in production.
debug_mode: bool = False, debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
# TODO: once we have `rsyscall` support we will read a config # TODO: once we have `rsyscall` support we will read a config
# and spawn the service tree distributed per that. # and spawn the service tree distributed per that.
start_method: str = 'trio', start_method: str = 'trio',
@ -74,7 +74,7 @@ async def open_piker_runtime(
) -> tuple[ ) -> tuple[
tractor.Actor, tractor.Actor,
tuple[str, int], list[tuple[str, int]],
]: ]:
''' '''
Start a piker actor who's runtime will automatically sync with Start a piker actor who's runtime will automatically sync with
@ -90,15 +90,19 @@ async def open_piker_runtime(
except tractor._exceptions.NoRuntime: except tractor._exceptions.NoRuntime:
tractor._state._runtime_vars[ tractor._state._runtime_vars[
'piker_vars'] = tractor_runtime_overrides 'piker_vars'
] = tractor_runtime_overrides
registry_addr = registry_addr or _default_reg_addr registry_addrs = (
registry_addrs
or [_default_reg_addr]
)
async with ( async with (
tractor.open_root_actor( tractor.open_root_actor(
# passed through to ``open_root_actor`` # passed through to ``open_root_actor``
arbiter_addr=registry_addr, registry_addrs=registry_addrs,
name=name, name=name,
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,
@ -112,22 +116,27 @@ async def open_piker_runtime(
**tractor_kwargs, **tractor_kwargs,
) as _, ) as _,
open_registry(registry_addr, ensure_exists=False) as addr, open_registry(
registry_addrs,
ensure_exists=False,
) as addrs,
): ):
yield ( yield (
tractor.current_actor(), tractor.current_actor(),
addr, addrs,
) )
else: else:
async with open_registry(registry_addr) as addr: async with open_registry(
registry_addrs
) as addrs:
yield ( yield (
actor, actor,
addr, addrs,
) )
_root_dname = 'pikerd' _root_dname: str = 'pikerd'
_root_modules = [ _root_modules: list[str] = [
__name__, __name__,
'piker.service._daemon', 'piker.service._daemon',
'piker.brokers._daemon', 'piker.brokers._daemon',
@ -141,13 +150,13 @@ _root_modules = [
@acm @acm
async def open_pikerd( async def open_pikerd(
registry_addrs: list[tuple[str, int]],
loglevel: str | None = None, loglevel: str | None = None,
# XXX: you should pretty much never want debug mode # XXX: you should pretty much never want debug mode
# for data daemons when running in production. # for data daemons when running in production.
debug_mode: bool = False, debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
**kwargs, **kwargs,
@ -169,17 +178,21 @@ async def open_pikerd(
enable_modules=_root_modules, enable_modules=_root_modules,
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,
registry_addr=registry_addr, registry_addrs=registry_addrs,
**kwargs, **kwargs,
) as (root_actor, reg_addr), ) as (
root_actor,
reg_addrs,
),
tractor.open_nursery() as actor_nursery, tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery, trio.open_nursery() as service_nursery,
): ):
if root_actor.accept_addr != reg_addr: for addr in reg_addrs:
if addr not in root_actor.accept_addrs:
raise RuntimeError( raise RuntimeError(
f'`pikerd` failed to bind on {reg_addr}!\n' f'`pikerd` failed to bind on {addr}!\n'
'Maybe you have another daemon already running?' 'Maybe you have another daemon already running?'
) )
@ -225,9 +238,9 @@ async def open_pikerd(
@acm @acm
async def maybe_open_pikerd( async def maybe_open_pikerd(
loglevel: Optional[str] = None, registry_addrs: list[tuple[str, int]] | None = None,
registry_addr: None | tuple = None,
loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[Services]: ) -> tractor._portal.Portal | ClassVar[Services]:
@ -253,17 +266,20 @@ async def maybe_open_pikerd(
# async with open_portal(chan) as arb_portal: # async with open_portal(chan) as arb_portal:
# yield arb_portal # yield arb_portal
registry_addrs = registry_addrs or [_default_reg_addr]
async with ( async with (
open_piker_runtime( open_piker_runtime(
name=query_name, name=query_name,
registry_addr=registry_addr, registry_addrs=registry_addrs,
loglevel=loglevel, loglevel=loglevel,
**kwargs, **kwargs,
) as _, ) as _,
tractor.find_actor( tractor.find_actor(
_root_dname, _root_dname,
arbiter_sockaddr=registry_addr, registry_addrs=registry_addrs,
only_first=True,
) as portal ) as portal
): ):
# connect to any existing daemon presuming # connect to any existing daemon presuming
@ -278,7 +294,7 @@ async def maybe_open_pikerd(
# configured address # configured address
async with open_pikerd( async with open_pikerd(
loglevel=loglevel, loglevel=loglevel,
registry_addr=registry_addr, registry_addrs=registry_addrs,
# passthrough to ``tractor`` init # passthrough to ``tractor`` init
**kwargs, **kwargs,

View File

@ -70,7 +70,10 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name] lock = Services.locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service(service_name) as portal: async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
) as portal:
if portal is not None: if portal is not None:
lock.release() lock.release()
yield portal yield portal

View File

@ -0,0 +1,142 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Multiaddress parser and utils according the spec(s) defined by
`libp2p` and used in dependent project such as `ipfs`:
- https://docs.libp2p.io/concepts/fundamentals/addressing/
- https://github.com/libp2p/specs/blob/master/addressing/README.md
'''
from typing import Iterator
from bidict import bidict
# TODO: see if we can leverage libp2p ecosys projects instead of
# rolling our own (parser) impls of the above addressing specs:
# - https://github.com/libp2p/py-libp2p
# - https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses
# prots: bidict[int, str] = bidict({
prots: bidict[int, str] = {
'ipv4': 3,
'ipv6': 3,
'wg': 3,
'tcp': 4,
'udp': 4,
# TODO: support the next-gen shite Bo
# 'quic': 4,
# 'ssh': 7, # via rsyscall bootstrapping
}
prot_params: dict[str, tuple[str]] = {
'ipv4': ('addr',),
'ipv6': ('addr',),
'wg': ('addr', 'port', 'pubkey'),
'tcp': ('port',),
'udp': ('port',),
# 'quic': ('port',),
# 'ssh': ('port',),
}
def iter_prot_layers(
multiaddr: str,
) -> Iterator[
tuple[
int,
list[str]
]
]:
'''
Unpack a libp2p style "multiaddress" into multiple "segments"
for each "layer" of the protocoll stack (in OSI terms).
'''
tokens: list[str] = multiaddr.split('/')
root, tokens = tokens[0], tokens[1:]
assert not root # there is a root '/' on LHS
itokens = iter(tokens)
prot: str | None = None
params: list[str] = []
for token in itokens:
# every prot path should start with a known
# key-str.
if token in prots:
if prot is None:
prot: str = token
else:
yield prot, params
prot = token
params = []
elif token not in prots:
params.append(token)
else:
yield prot, params
def parse_addr(
multiaddr: str,
) -> dict[str, str | int | dict]:
'''
Parse a libp2p style "multiaddress" into it's distinct protocol
segments where each segment:
`../<protocol>/<param0>/<param1>/../<paramN>`
is loaded into a layers `dict[str, dict[str, Any]` which holds
each prot segment of the path as a separate entry sortable by
it's approx OSI "layer number".
Any `paramN` in the path must be distinctly defined in order
according to the (global) `prot_params` table in this module.
'''
layers: dict[str, str | int | dict] = {}
for (
prot_key,
params,
) in iter_prot_layers(multiaddr):
layer: int = prots[prot_key] # OSI layer used for sorting
ep: dict[str, int | str] = {'layer': layer}
layers[prot_key] = ep
# TODO; validation and resolving of names:
# - each param via a validator provided as part of the
# prot_params def? (also see `"port"` case below..)
# - do a resolv step that will check addrs against
# any loaded network.resolv: dict[str, str]
rparams: list = list(reversed(params))
for key in prot_params[prot_key]:
val: str | int = rparams.pop()
# TODO: UGHH, dunno what we should do for validation
# here, put it in the params spec somehow?
if key == 'port':
val = int(val)
ep[key] = val
return layers

View File

@ -46,7 +46,9 @@ _registry: Registry | None = None
class Registry: class Registry:
addr: None | tuple[str, int] = None # TODO: should this be a set or should we complain
# on duplicates?
addrs: list[tuple[str, int]] = []
# TODO: table of uids to sockaddrs # TODO: table of uids to sockaddrs
peers: dict[ peers: dict[
@ -60,69 +62,90 @@ _tractor_kwargs: dict[str, Any] = {}
@acm @acm
async def open_registry( async def open_registry(
addr: None | tuple[str, int] = None, addrs: list[tuple[str, int]],
ensure_exists: bool = True, ensure_exists: bool = True,
) -> tuple[str, int]: ) -> list[tuple[str, int]]:
global _tractor_kwargs global _tractor_kwargs
actor = tractor.current_actor() actor = tractor.current_actor()
uid = actor.uid uid = actor.uid
preset_reg_addrs: list[tuple[str, int]] = Registry.addrs
if ( if (
Registry.addr is not None preset_reg_addrs
and addr and addrs
): ):
if preset_reg_addrs != addrs:
raise RuntimeError( raise RuntimeError(
f'`{uid}` registry addr already bound @ {_registry.sockaddr}' f'`{uid}` has non-matching registrar addresses?\n'
f'request: {addrs}\n'
f'already set: {preset_reg_addrs}'
) )
was_set: bool = False was_set: bool = False
if ( if (
not tractor.is_root_process() not tractor.is_root_process()
and Registry.addr is None and not Registry.addrs
): ):
Registry.addr = actor._arb_addr Registry.addrs.extend(actor._reg_addrs)
if ( if (
ensure_exists ensure_exists
and Registry.addr is None and not Registry.addrs
): ):
raise RuntimeError( raise RuntimeError(
f"`{uid}` registry should already exist bug doesn't?" f"`{uid}` registry should already exist but doesn't?"
) )
if ( if (
Registry.addr is None not Registry.addrs
): ):
was_set = True was_set = True
Registry.addr = addr or _default_reg_addr Registry.addrs = addrs or [_default_reg_addr]
_tractor_kwargs['arbiter_addr'] = Registry.addr # NOTE: only spot this seems currently used is inside
# `.ui._exec` which is the (eventual qtloops) bootstrapping
# with guest mode.
_tractor_kwargs['registry_addrs'] = Registry.addrs
try: try:
yield Registry.addr yield Registry.addrs
finally: finally:
# XXX: always clear the global addr if we set it so that the # XXX: always clear the global addr if we set it so that the
# next (set of) calls will apply whatever new one is passed # next (set of) calls will apply whatever new one is passed
# in. # in.
if was_set: if was_set:
Registry.addr = None Registry.addrs = None
@acm @acm
async def find_service( async def find_service(
service_name: str, service_name: str,
registry_addrs: list[tuple[str, int]],
first_only: bool = True,
) -> tractor.Portal | None: ) -> tractor.Portal | None:
async with open_registry() as reg_addr: reg_addrs: list[tuple[str, int]]
async with open_registry(
addrs=registry_addrs,
) as reg_addrs:
log.info(f'Scanning for service `{service_name}`') log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible # attach to existing daemon by name if possible
async with tractor.find_actor( async with tractor.find_actor(
service_name, service_name,
arbiter_sockaddr=reg_addr, registry_addrs=reg_addrs,
) as maybe_portal: ) as maybe_portals:
yield maybe_portal if not maybe_portals:
yield None
return
if first_only:
yield maybe_portals[0]
else:
yield maybe_portals[0]
async def check_for_service( async def check_for_service(