Make `._daemon.Services` for use as singleton

Drop the `_services` module level ref and adjust all client code to
match. Drop struct inheritance and convert all methods to class level.
Move `Brokerd.locks` -> `Services.locks` and add sampling mod to pikerd
enabled set.
epoch_index_backup
Tyler Goodlet 2023-01-04 22:01:28 -05:00
parent 33e7e204d8
commit 09b53b133b
1 changed files with 45 additions and 52 deletions

View File

@ -22,7 +22,6 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm
from collections import defaultdict
from msgspec import Struct
import tractor
import trio
from trio_typing import TaskStatus
@ -54,16 +53,19 @@ _root_modules = [
__name__,
'piker.clearing._ems',
'piker.clearing._client',
'piker.data._sampling',
]
class Services(Struct):
class Services:
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task(
self,
name: str,
@ -119,11 +121,11 @@ class Services(Struct):
return cs, first
# TODO: per service cancellation by scope, we aren't using this
# anywhere right?
@classmethod
async def cancel_service(
self,
name: str,
) -> Any:
log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name]
@ -134,29 +136,25 @@ class Services(Struct):
return await portal.cancel_actor()
_services: Optional[Services] = None
@acm
async def open_pikerd(
start_method: str = 'trio',
loglevel: Optional[str] = None,
loglevel: str | None = None,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
) -> Optional[tractor._portal.Portal]:
) -> None:
'''
Start a root piker daemon who's lifetime extends indefinitely
until cancelled.
Start a root piker daemon who's lifetime extends indefinitely until
cancelled.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
'''
global _services
global _registry_addr
if (
@ -186,17 +184,11 @@ async def open_pikerd(
):
async with trio.open_nursery() as service_nursery:
# # setup service mngr singleton instance
# async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery,
debug_mode=debug_mode,
)
yield _services
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
yield
@acm
@ -217,7 +209,6 @@ async def open_piker_runtime(
existing piker actors on the local link based on configuration.
'''
global _services
global _registry_addr
if (
@ -276,11 +267,12 @@ async def maybe_open_pikerd(
**kwargs,
) -> Union[tractor._portal.Portal, Services]:
"""If no ``pikerd`` daemon-root-actor can be found start it and
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
though).
"""
'''
if loglevel:
get_console_log(loglevel)
@ -316,7 +308,9 @@ async def maybe_open_pikerd(
yield None
# brokerd enabled modules
# `brokerd` enabled modules
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
@ -326,10 +320,6 @@ _data_mods = [
]
class Brokerd:
locks = defaultdict(trio.Lock)
@acm
async def find_service(
service_name: str,
@ -366,6 +356,8 @@ async def maybe_spawn_daemon(
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
singleton: bool = False,
**kwargs,
) -> tractor.Portal:
@ -386,7 +378,7 @@ async def maybe_spawn_daemon(
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[service_name]
lock = Services.locks[service_name]
await lock.acquire()
async with find_service(service_name) as portal:
@ -397,6 +389,9 @@ async def maybe_spawn_daemon(
log.warning(f"Couldn't find any existing {service_name}")
# TODO: really shouldn't the actor spawning be part of the service
# starting method `Services.start_service()` ?
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
@ -407,7 +402,6 @@ async def maybe_spawn_daemon(
) as pikerd_portal:
if pikerd_portal is None:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
@ -415,7 +409,9 @@ async def maybe_spawn_daemon(
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
await service_task_target(**spawn_args)
started: bool
if pikerd_portal is None:
started = await service_task_target(**spawn_args)
else:
# tell the remote `pikerd` to start the target,
@ -424,11 +420,14 @@ async def maybe_spawn_daemon(
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
await pikerd_portal.run(
started = await pikerd_portal.run(
service_task_target,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
@ -451,9 +450,6 @@ async def spawn_brokerd(
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)
global _services
assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
@ -466,18 +462,18 @@ async def spawn_brokerd(
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)
portal = await _services.actor_n.start_actor(
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
loglevel=loglevel,
debug_mode=_services.debug_mode,
debug_mode=Services.debug_mode,
**tractor_kwargs
)
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
await _services.start_service_task(
await Services.start_service_task(
dname,
portal,
_setup_persistent_brokerd,
@ -523,24 +519,21 @@ async def spawn_emsd(
"""
log.info('Spawning emsd')
global _services
assert _services
portal = await _services.actor_n.start_actor(
portal = await Services.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=_services.debug_mode, # set by pikerd flag
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
await _services.start_service_task(
await Services.start_service_task(
'emsd',
portal,
_setup_persistent_emsd,