From 2a51582ec0f74f778a82ae71a87e391df838720e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 25 Mar 2021 10:19:35 -0400 Subject: [PATCH] Start forming an services api Add a `Services` nurseries container singleton for spawning sub-daemons inside the long running `pikerd` tree. Bring in `brokerd` spawning util funcs to start getting eyes on what can be factored into a service api. --- piker/_daemon.py | 137 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 114 insertions(+), 23 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 25adb5ec..a844708a 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -1,9 +1,12 @@ """ pikerd daemon lifecylcle & rpc """ -from typing import Optional +from typing import Optional, Union from contextlib import asynccontextmanager +from functools import partial +from pydantic import BaseModel +import trio import tractor from .log import get_logger, get_console_log @@ -12,7 +15,6 @@ from .brokers import get_brokermod log = get_logger(__name__) -_root_nursery: Optional[tractor._trionics.ActorNursery] = None _root_dname = 'pikerd' _root_modules = [ __name__, @@ -21,17 +23,39 @@ _root_modules = [ ] +# @dataclass +class Services(BaseModel): + actor_n: tractor._trionics.ActorNursery + service_n: trio.Nursery + + class Config: + arbitrary_types_allowed = True + + +_services: Optional[Services] = None + + @asynccontextmanager async def open_pikerd( loglevel: Optional[str] = None, **kwargs, ) -> Optional[tractor._portal.Portal]: + """Start a root piker daemon who's lifetime extends indefinitely + until cancelled. - global _root_nursery + A root actor nursery is created which can be used to create and keep + alive underling services (see below). + + """ + global _services + assert _services is None # XXX: this may open a root actor as well async with tractor.open_nursery( + + # passed through to ``open_root_actor`` name=_root_dname, + loglevel=loglevel, # TODO: eventually we should be able to avoid # having the root have more then permissions to @@ -39,23 +63,28 @@ async def open_pikerd( # enable_modules=[__name__], enable_modules=_root_modules, - loglevel=loglevel, - ) as nursery: - _root_nursery = nursery - yield nursery + ) as actor_nursery: + async with trio.open_nursery() as service_nursery: + + # assign globally for future daemon/task creation + _services = Services( + actor_n=actor_nursery, + service_n=service_nursery + ) + + yield _services @asynccontextmanager async def maybe_open_pikerd( loglevel: Optional[str] = None, **kwargs, -) -> Optional[tractor._portal.Portal]: - """If no ``pikerd`` daemon-root-actor can be found, - assume that role and return a portal to myself +) -> Union[tractor._portal.Portal, Services]: + """If no ``pikerd`` daemon-root-actor can be found start it and + yield up (we should probably figure out returning a portal to self + though). """ - global _root_nursery - if loglevel: get_console_log(loglevel) @@ -72,13 +101,10 @@ async def maybe_open_pikerd( async with open_pikerd( loglevel, **kwargs, - ): - assert _root_nursery - + ) as _: # in the case where we're starting up the # tractor-piker runtime stack in **this** process - # we want to hand off a nursery for starting (as a sub) - # whatever actor is requesting pikerd. + # we return no portal to self. yield None @@ -97,6 +123,8 @@ async def spawn_brokerd( **tractor_kwargs ) -> tractor._portal.Portal: + from .data import _setup_persistent_feeds + log.info(f'Spawning {brokername} broker daemon') brokermod = get_brokermod(brokername) @@ -105,19 +133,82 @@ async def spawn_brokerd( extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) tractor_kwargs.update(extra_tractor_kwargs) - # TODO: raise exception when _root_nursery == None? - global _root_nursery + global _services + assert _services - await _root_nursery.start_actor( + portal = await _services.actor_n.start_actor( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, **tractor_kwargs ) + # TODO: so i think this is the perfect use case for supporting + # a cross-actor async context manager api instead of this + # shoort-and-forget task spawned in the root nursery, we'd have an + # async exit stack that we'd register the `portal.open_context()` + # call with and then have the ability to unwind the call whenevs. + + # non-blocking setup of brokerd service nursery + _services.service_n.start_soon( + partial( + portal.run, + _setup_persistent_feeds, + brokername=brokername, + ) + ) + return dname +@asynccontextmanager +async def maybe_spawn_brokerd( + brokername: str, + loglevel: Optional[str] = None, + + # XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = True, +) -> tractor._portal.Portal: + """If no ``brokerd.{brokername}`` daemon-actor can be found, + spawn one in a local subactor and return a portal to it. + + """ + if loglevel: + get_console_log(loglevel) + + dname = f'brokerd.{brokername}' + + # attach to existing brokerd if possible + async with tractor.find_actor(dname) as portal: + if portal is not None: + yield portal + return + + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel + ) as pikerd_portal: + + if pikerd_portal is None: + # we are root so spawn brokerd directly in our tree + # the root nursery is accessed through process global state + await spawn_brokerd(brokername, loglevel=loglevel) + + else: + await pikerd_portal.run( + spawn_brokerd, + brokername=brokername, + loglevel=loglevel, + debug_mode=debug_mode, + ) + + async with tractor.wait_for_actor(dname) as portal: + yield portal + + async def spawn_emsd( brokername, loglevel: Optional[str] = None, @@ -126,10 +217,10 @@ async def spawn_emsd( log.info('Spawning emsd') - # TODO: raise exception when _root_nursery == None? - global _root_nursery + # TODO: raise exception when _services == None? + global _services - await _root_nursery.start_actor( + await _services.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems',