Make `._daemon.Services` for use as singleton

Drop the `_services` module level ref and adjust all client code to
match. Drop struct inheritance and convert all methods to class level.
Move `Brokerd.locks` -> `Services.locks` and add sampling mod to pikerd
enabled set.
samplerd_service
Tyler Goodlet 2023-01-04 22:01:28 -05:00
parent 2c76cee928
commit a342f7d2d4
1 changed files with 45 additions and 52 deletions

View File

@ -22,7 +22,6 @@ from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from collections import defaultdict from collections import defaultdict
from msgspec import Struct
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -54,16 +53,19 @@ _root_modules = [
__name__, __name__,
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
'piker.data._sampling',
] ]
class Services(Struct): class Services:
actor_n: tractor._supervise.ActorNursery actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}
locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,
@ -119,11 +121,11 @@ class Services(Struct):
return cs, first return cs, first
# TODO: per service cancellation by scope, we aren't using this @classmethod
# anywhere right?
async def cancel_service( async def cancel_service(
self, self,
name: str, name: str,
) -> Any: ) -> Any:
log.info(f'Cancelling `pikerd` service {name}') log.info(f'Cancelling `pikerd` service {name}')
cs, portal = self.service_tasks[name] cs, portal = self.service_tasks[name]
@ -134,29 +136,25 @@ class Services(Struct):
return await portal.cancel_actor() return await portal.cancel_actor()
_services: Optional[Services] = None
@acm @acm
async def open_pikerd( async def open_pikerd(
start_method: str = 'trio', start_method: str = 'trio',
loglevel: Optional[str] = None, loglevel: str | None = None,
# XXX: you should pretty much never want debug mode # XXX: you should pretty much never want debug mode
# for data daemons when running in production. # for data daemons when running in production.
debug_mode: bool = False, debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None, registry_addr: None | tuple[str, int] = None,
) -> Optional[tractor._portal.Portal]: ) -> None:
''' '''
Start a root piker daemon who's lifetime extends indefinitely Start a root piker daemon who's lifetime extends indefinitely until
until cancelled. cancelled.
A root actor nursery is created which can be used to create and keep A root actor nursery is created which can be used to create and keep
alive underling services (see below). alive underling services (see below).
''' '''
global _services
global _registry_addr global _registry_addr
if ( if (
@ -186,17 +184,11 @@ async def open_pikerd(
): ):
async with trio.open_nursery() as service_nursery: async with trio.open_nursery() as service_nursery:
# # setup service mngr singleton instance
# async with AsyncExitStack() as stack:
# assign globally for future daemon/task creation # assign globally for future daemon/task creation
_services = Services( Services.actor_n = actor_nursery
actor_n=actor_nursery, Services.service_n = service_nursery
service_n=service_nursery, Services.debug_mode = debug_mode
debug_mode=debug_mode, yield
)
yield _services
@acm @acm
@ -217,7 +209,6 @@ async def open_piker_runtime(
existing piker actors on the local link based on configuration. existing piker actors on the local link based on configuration.
''' '''
global _services
global _registry_addr global _registry_addr
if ( if (
@ -276,11 +267,12 @@ async def maybe_open_pikerd(
**kwargs, **kwargs,
) -> Union[tractor._portal.Portal, Services]: ) -> Union[tractor._portal.Portal, Services]:
"""If no ``pikerd`` daemon-root-actor can be found start it and '''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self
though). though).
""" '''
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
@ -316,7 +308,9 @@ async def maybe_open_pikerd(
yield None yield None
# brokerd enabled modules # `brokerd` enabled modules
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods = [ _data_mods = [
'piker.brokers.core', 'piker.brokers.core',
'piker.brokers.data', 'piker.brokers.data',
@ -326,10 +320,6 @@ _data_mods = [
] ]
class Brokerd:
locks = defaultdict(trio.Lock)
@acm @acm
async def find_service( async def find_service(
service_name: str, service_name: str,
@ -366,6 +356,8 @@ async def maybe_spawn_daemon(
service_task_target: Callable, service_task_target: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
singleton: bool = False,
**kwargs, **kwargs,
) -> tractor.Portal: ) -> tractor.Portal:
@ -386,7 +378,7 @@ async def maybe_spawn_daemon(
# serialize access to this section to avoid # serialize access to this section to avoid
# 2 or more tasks racing to create a daemon # 2 or more tasks racing to create a daemon
lock = Brokerd.locks[service_name] lock = Services.locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service(service_name) as portal: async with find_service(service_name) as portal:
@ -397,6 +389,9 @@ async def maybe_spawn_daemon(
log.warning(f"Couldn't find any existing {service_name}") log.warning(f"Couldn't find any existing {service_name}")
# TODO: really shouldn't the actor spawning be part of the service
# starting method `Services.start_service()` ?
# ask root ``pikerd`` daemon to spawn the daemon we need if # ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the # pikerd is not live we now become the root of the
# process tree # process tree
@ -407,7 +402,6 @@ async def maybe_spawn_daemon(
) as pikerd_portal: ) as pikerd_portal:
if pikerd_portal is None:
# we are the root and thus are `pikerd` # we are the root and thus are `pikerd`
# so spawn the target service directly by calling # so spawn the target service directly by calling
# the provided target routine. # the provided target routine.
@ -415,7 +409,9 @@ async def maybe_spawn_daemon(
# do the right things to setup both a sub-actor **and** call # do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level # the ``_Services`` api from above to start the top level
# service task for that actor. # service task for that actor.
await service_task_target(**spawn_args) started: bool
if pikerd_portal is None:
started = await service_task_target(**spawn_args)
else: else:
# tell the remote `pikerd` to start the target, # tell the remote `pikerd` to start the target,
@ -424,11 +420,14 @@ async def maybe_spawn_daemon(
# non-blocking and the target task will persist running # non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start # on `pikerd` after the client requesting it's start
# disconnects. # disconnects.
await pikerd_portal.run( started = await pikerd_portal.run(
service_task_target, service_task_target,
**spawn_args, **spawn_args,
) )
if started:
log.info(f'Service {service_name} started!')
async with tractor.wait_for_actor(service_name) as portal: async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal yield portal
@ -451,9 +450,6 @@ async def spawn_brokerd(
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs) tractor_kwargs.update(extra_tractor_kwargs)
global _services
assert _services
# ask `pikerd` to spawn a new sub-actor and manage it under its # ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery # actor nursery
modpath = brokermod.__name__ modpath = brokermod.__name__
@ -466,18 +462,18 @@ async def spawn_brokerd(
subpath = f'{modpath}.{submodname}' subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath) broker_enable.append(subpath)
portal = await _services.actor_n.start_actor( portal = await Services.actor_n.start_actor(
dname, dname,
enable_modules=_data_mods + broker_enable, enable_modules=_data_mods + broker_enable,
loglevel=loglevel, loglevel=loglevel,
debug_mode=_services.debug_mode, debug_mode=Services.debug_mode,
**tractor_kwargs **tractor_kwargs
) )
# non-blocking setup of brokerd service nursery # non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd from .data import _setup_persistent_brokerd
await _services.start_service_task( await Services.start_service_task(
dname, dname,
portal, portal,
_setup_persistent_brokerd, _setup_persistent_brokerd,
@ -523,24 +519,21 @@ async def spawn_emsd(
""" """
log.info('Spawning emsd') log.info('Spawning emsd')
global _services portal = await Services.actor_n.start_actor(
assert _services
portal = await _services.actor_n.start_actor(
'emsd', 'emsd',
enable_modules=[ enable_modules=[
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=_services.debug_mode, # set by pikerd flag debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
# non-blocking setup of clearing service # non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd from .clearing._ems import _setup_persistent_emsd
await _services.start_service_task( await Services.start_service_task(
'emsd', 'emsd',
portal, portal,
_setup_persistent_emsd, _setup_persistent_emsd,