piker/piker/_daemon.py

592 lines
16 KiB
Python

# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Structured, daemon tree service management.
"""
from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm
from collections import defaultdict
from msgspec import Struct
import tractor
import trio
from trio_typing import TaskStatus
from .log import get_logger, get_console_log
from .brokers import get_brokermod
log = get_logger(__name__)
_root_dname = 'pikerd'
_default_registry_host: str = '127.0.0.1'
_default_registry_port: int = 6116
_default_reg_addr: tuple[str, int] = (
_default_registry_host,
_default_registry_port,
)
# NOTE: this value is set as an actor-global once the first endpoint
# who is capable, spawns a `pikerd` service tree.
_registry_addr: tuple[str, int] | None = None
_tractor_kwargs: dict[str, Any] = {
# use a different registry addr then tractor's default
'arbiter_addr': _registry_addr
}
_root_modules = [
__name__,
'piker.clearing._ems',
'piker.clearing._client',
]
class Services(Struct):
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
async def start_service_task(
self,
name: str,
portal: tractor.Portal,
target: Callable,
**kwargs,
) -> (trio.CancelScope, tractor.Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
async def open_context_in_task(
task_status: TaskStatus[
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target,
**kwargs,
) as (ctx, first):
# unblock once the remote context has started
task_status.started((cs, first))
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# wait on any context's return value
ctx_res = await ctx.result()
except tractor.ContextCancelled:
return await self.cancel_service(name)
else:
# wait on any error from the sub-actor
# NOTE: this will block indefinitely until
# cancelled either by error from the target
# context function or by being cancelled here by
# the surrounding cancel scope
return (await portal.result(), ctx_res)
cs, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal)
return cs, first
# TODO: per service cancellation by scope, we aren't using this
# anywhere right?
async def cancel_service(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
# XXX: not entirely sure why this is required,
# and should probably be better fine tuned in
# ``tractor``?
cs.cancel()
return await portal.cancel_actor()
_services: Optional[Services] = None
@acm
async def open_pikerd(
start_method: str = 'trio',
loglevel: Optional[str] = None,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
) -> Optional[tractor._portal.Portal]:
'''
Start a root piker daemon who's lifetime extends indefinitely
until cancelled.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
'''
global _services
global _registry_addr
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
) as _,
tractor.open_nursery() as actor_nursery,
):
async with trio.open_nursery() as service_nursery:
# # setup service mngr singleton instance
# async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
)
yield _services
@acm
async def open_piker_runtime(
name: str,
enable_modules: list[str] = [],
start_method: str = 'trio',
loglevel: Optional[str] = None,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
) -> tractor.Actor:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
'''
global _services
global _registry_addr
if (
_registry_addr is None
or registry_addr
):
_registry_addr = registry_addr or _default_reg_addr
# XXX: this may open a root actor as well
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=_registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules + enable_modules,
) as _,
):
yield tractor.current_actor()
@acm
async def maybe_open_runtime(
loglevel: Optional[str] = None,
**kwargs,
) -> None:
"""
Start the ``tractor`` runtime (a root actor) if none exists.
"""
settings = _tractor_kwargs
settings.update(kwargs)
if not tractor.current_actor(err_on_no_runtime=False):
async with tractor.open_root_actor(
loglevel=loglevel,
**settings,
):
yield
else:
yield
@acm
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
**kwargs,
) -> Union[tractor._portal.Portal, Services]:
"""If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
though).
"""
if loglevel:
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
async with (
maybe_open_runtime(loglevel, **kwargs),
tractor.find_actor(_root_dname) as portal
):
# connect to any existing daemon presuming
# its registry socket was selected.
if (
portal is not None
and (
registry_addr is None
or portal.channel.raddr == registry_addr
)
):
yield portal
return
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
yield None
# brokerd enabled modules
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]
class Brokerd:
locks = defaultdict(trio.Lock)
@acm
async def find_service(
service_name: str,
) -> Optional[tractor.Portal]:
log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as maybe_portal:
yield maybe_portal
async def check_for_service(
service_name: str,
) -> bool:
'''
Service daemon "liveness" predicate.
'''
async with tractor.query_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as sockaddr:
return sockaddr
@acm
async def maybe_spawn_daemon(
service_name: str,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
**kwargs,
) -> tractor.Portal:
'''
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
If this function is called from a non-pikerd actor, the
spawned service will persist as long as pikerd does or
it is requested to be cancelled.
This can be seen as a service starting api for remote-actor
clients.
'''
if loglevel:
get_console_log(loglevel)
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[service_name]
await lock.acquire()
async with find_service(service_name) as portal:
if portal is not None:
lock.release()
yield portal
return
log.warning(f"Couldn't find any existing {service_name}")
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**kwargs,
) as pikerd_portal:
if pikerd_portal is None:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
await service_task_target(**spawn_args)
else:
# tell the remote `pikerd` to start the target,
# the target can't return a non-serializable value
# since it is expected that service startingn is
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
await pikerd_portal.run(
service_task_target,
**spawn_args,
)
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
async def spawn_brokerd(
brokername: str,
loglevel: Optional[str] = None,
**tractor_kwargs,
) -> bool:
log.info(f'Spawning {brokername} broker daemon')
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)
global _services
assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
broker_enable = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)
portal = await _services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
loglevel=loglevel,
debug_mode=_services.debug_mode,
**tractor_kwargs
)
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
await _services.start_service_task(
dname,
portal,
_setup_persistent_brokerd,
brokername=brokername,
)
return True
@acm
async def maybe_spawn_brokerd(
brokername: str,
loglevel: Optional[str] = None,
**kwargs,
) -> tractor.Portal:
'''
Helper to spawn a brokerd service *from* a client
who wishes to use the sub-actor-daemon.
'''
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as portal:
yield portal
async def spawn_emsd(
loglevel: Optional[str] = None,
**extra_tractor_kwargs
) -> bool:
"""
Start the clearing engine under ``pikerd``.
"""
log.info('Spawning emsd')
global _services
assert _services
portal = await _services.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=_services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
await _services.start_service_task(
'emsd',
portal,
_setup_persistent_emsd,
)
return True
@acm
async def maybe_open_emsd(
brokername: str,
loglevel: Optional[str] = None,
**kwargs,
) -> tractor._portal.Portal: # noqa
async with maybe_spawn_daemon(
'emsd',
service_task_target=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as portal:
yield portal
# TODO: ideally we can start the tsdb "on demand" but it's
# probably going to require "rootless" docker, at least if we don't
# want to expect the user to start ``pikerd`` with root perms all the
# time.
# async def maybe_open_marketstored(
# loglevel: Optional[str] = None,
# **kwargs,
# ) -> tractor._portal.Portal: # noqa
# async with maybe_spawn_daemon(
# 'marketstored',
# service_task_target=spawn_emsd,
# spawn_args={'loglevel': loglevel},
# loglevel=loglevel,
# **kwargs,
# ) as portal:
# yield portal