diff --git a/piker/_daemon.py b/piker/_daemon.py index d4ca7f21..4e13e1ec 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -22,7 +22,6 @@ from typing import Optional, Union, Callable, Any from contextlib import asynccontextmanager as acm from collections import defaultdict -from msgspec import Struct import tractor import trio from trio_typing import TaskStatus @@ -54,16 +53,19 @@ _root_modules = [ __name__, 'piker.clearing._ems', 'piker.clearing._client', + 'piker.data._sampling', ] -class Services(Struct): +class Services: actor_n: tractor._supervise.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} + locks = defaultdict(trio.Lock) + @classmethod async def start_service_task( self, name: str, @@ -119,11 +121,11 @@ class Services(Struct): return cs, first - # TODO: per service cancellation by scope, we aren't using this - # anywhere right? + @classmethod async def cancel_service( self, name: str, + ) -> Any: log.info(f'Cancelling `pikerd` service {name}') cs, portal = self.service_tasks[name] @@ -134,29 +136,25 @@ class Services(Struct): return await portal.cancel_actor() -_services: Optional[Services] = None - - @acm async def open_pikerd( start_method: str = 'trio', - loglevel: Optional[str] = None, + loglevel: str | None = None, # XXX: you should pretty much never want debug mode # for data daemons when running in production. debug_mode: bool = False, registry_addr: None | tuple[str, int] = None, -) -> Optional[tractor._portal.Portal]: +) -> None: ''' - Start a root piker daemon who's lifetime extends indefinitely - until cancelled. + Start a root piker daemon who's lifetime extends indefinitely until + cancelled. A root actor nursery is created which can be used to create and keep alive underling services (see below). ''' - global _services global _registry_addr if ( @@ -186,17 +184,11 @@ async def open_pikerd( ): async with trio.open_nursery() as service_nursery: - # # setup service mngr singleton instance - # async with AsyncExitStack() as stack: - # assign globally for future daemon/task creation - _services = Services( - actor_n=actor_nursery, - service_n=service_nursery, - debug_mode=debug_mode, - ) - - yield _services + Services.actor_n = actor_nursery + Services.service_n = service_nursery + Services.debug_mode = debug_mode + yield @acm @@ -217,7 +209,6 @@ async def open_piker_runtime( existing piker actors on the local link based on configuration. ''' - global _services global _registry_addr if ( @@ -276,11 +267,12 @@ async def maybe_open_pikerd( **kwargs, ) -> Union[tractor._portal.Portal, Services]: - """If no ``pikerd`` daemon-root-actor can be found start it and + ''' + If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self though). - """ + ''' if loglevel: get_console_log(loglevel) @@ -316,7 +308,9 @@ async def maybe_open_pikerd( yield None -# brokerd enabled modules +# `brokerd` enabled modules +# NOTE: keeping this list as small as possible is part of our caps-sec +# model and should be treated with utmost care! _data_mods = [ 'piker.brokers.core', 'piker.brokers.data', @@ -326,10 +320,6 @@ _data_mods = [ ] -class Brokerd: - locks = defaultdict(trio.Lock) - - @acm async def find_service( service_name: str, @@ -366,6 +356,8 @@ async def maybe_spawn_daemon( service_task_target: Callable, spawn_args: dict[str, Any], loglevel: Optional[str] = None, + + singleton: bool = False, **kwargs, ) -> tractor.Portal: @@ -386,7 +378,7 @@ async def maybe_spawn_daemon( # serialize access to this section to avoid # 2 or more tasks racing to create a daemon - lock = Brokerd.locks[service_name] + lock = Services.locks[service_name] await lock.acquire() async with find_service(service_name) as portal: @@ -397,6 +389,9 @@ async def maybe_spawn_daemon( log.warning(f"Couldn't find any existing {service_name}") + # TODO: really shouldn't the actor spawning be part of the service + # starting method `Services.start_service()` ? + # ask root ``pikerd`` daemon to spawn the daemon we need if # pikerd is not live we now become the root of the # process tree @@ -407,15 +402,16 @@ async def maybe_spawn_daemon( ) as pikerd_portal: + # we are the root and thus are `pikerd` + # so spawn the target service directly by calling + # the provided target routine. + # XXX: this assumes that the target is well formed and will + # do the right things to setup both a sub-actor **and** call + # the ``_Services`` api from above to start the top level + # service task for that actor. + started: bool if pikerd_portal is None: - # we are the root and thus are `pikerd` - # so spawn the target service directly by calling - # the provided target routine. - # XXX: this assumes that the target is well formed and will - # do the right things to setup both a sub-actor **and** call - # the ``_Services`` api from above to start the top level - # service task for that actor. - await service_task_target(**spawn_args) + started = await service_task_target(**spawn_args) else: # tell the remote `pikerd` to start the target, @@ -424,11 +420,14 @@ async def maybe_spawn_daemon( # non-blocking and the target task will persist running # on `pikerd` after the client requesting it's start # disconnects. - await pikerd_portal.run( + started = await pikerd_portal.run( service_task_target, **spawn_args, ) + if started: + log.info(f'Service {service_name} started!') + async with tractor.wait_for_actor(service_name) as portal: lock.release() yield portal @@ -451,9 +450,6 @@ async def spawn_brokerd( extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) tractor_kwargs.update(extra_tractor_kwargs) - global _services - assert _services - # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery modpath = brokermod.__name__ @@ -466,18 +462,18 @@ async def spawn_brokerd( subpath = f'{modpath}.{submodname}' broker_enable.append(subpath) - portal = await _services.actor_n.start_actor( + portal = await Services.actor_n.start_actor( dname, enable_modules=_data_mods + broker_enable, loglevel=loglevel, - debug_mode=_services.debug_mode, + debug_mode=Services.debug_mode, **tractor_kwargs ) # non-blocking setup of brokerd service nursery from .data import _setup_persistent_brokerd - await _services.start_service_task( + await Services.start_service_task( dname, portal, _setup_persistent_brokerd, @@ -523,24 +519,21 @@ async def spawn_emsd( """ log.info('Spawning emsd') - global _services - assert _services - - portal = await _services.actor_n.start_actor( + portal = await Services.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', 'piker.clearing._client', ], loglevel=loglevel, - debug_mode=_services.debug_mode, # set by pikerd flag + debug_mode=Services.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) # non-blocking setup of clearing service from .clearing._ems import _setup_persistent_emsd - await _services.start_service_task( + await Services.start_service_task( 'emsd', portal, _setup_persistent_emsd,