diff --git a/piker/_daemon.py b/piker/_daemon.py new file mode 100644 index 00000000..a7030693 --- /dev/null +++ b/piker/_daemon.py @@ -0,0 +1,85 @@ +""" +pikerd daemon lifecylcle & rpc +""" +from typing import Optional +from contextlib import asynccontextmanager + +import tractor + +from .log import get_logger, get_console_log +from .brokers import get_brokermod + + +log = get_logger(__name__) + +_root_nursery: Optional[tractor._trionics.ActorNursery] = None +root_dname = 'pikerd' +root_modules = [ + __name__, + 'piker._ems' +] + + +@asynccontextmanager +async def maybe_open_pikerd( + loglevel: Optional[str] = None +) -> Optional[tractor._portal.Portal]: + """If no ``pikerd`` daemon-root-actor can be found, + assume that role and return a portal to myself + + """ + global _root_nursery + + if loglevel: + get_console_log(loglevel) + + async with tractor.find_actor(root_dname) as portal: + + if portal is not None: # pikerd exists + yield portal + + else: # assume role + async with tractor.open_root_actor( + name=root_dname, + loglevel=loglevel, + enable_modules=root_modules + ): + # init root nursery + try: + async with tractor.open_nursery() as nursery: + _root_nursery = nursery + yield None + finally: + # client code may block indefinitely so cancel when + # teardown is invoked + await nursery.cancel() + + +# brokerd enable modules +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', + 'piker.data', + 'piker.data._buffer' +] + +async def spawn_brokerd( + brokername, + loglevel: Optional[str] = None, + **tractor_kwargs +): + + brokermod = get_brokermod(brokername) + dname = f'brokerd.{brokername}' + log.info(f'Spawning {brokername} broker daemon') + tractor_kwargs = getattr(brokermod, '_spawnkwargs', {}) + + # TODO: raise exception when _root_nursery == None? + global _root_nursery + await _root_nursery.start_actor( + dname, + enable_modules=_data_mods + [brokermod.__name__], + loglevel=loglevel, + **tractor_kwargs + ) + diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 49b0acb9..0559cbfb 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -29,11 +29,11 @@ from typing import ( Dict, Any, Sequence, AsyncIterator, Optional ) -import trio import tractor from ..brokers import get_brokermod from ..log import get_logger, get_console_log +from .._daemon import spawn_brokerd, maybe_open_pikerd from ._normalize import iterticks from ._sharedmem import ( maybe_open_shm_array, @@ -48,7 +48,6 @@ from ._buffer import ( subscribe_ohlc_for_increment ) - __all__ = [ 'iterticks', 'maybe_open_shm_array', @@ -75,15 +74,6 @@ def get_ingestormod(name: str) -> ModuleType: return module -# capable rpc modules -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', - 'piker.data', - 'piker.data._buffer', -] - - @asynccontextmanager async def maybe_spawn_brokerd( brokername: str, @@ -95,11 +85,11 @@ async def maybe_spawn_brokerd( ) -> tractor._portal.Portal: """If no ``brokerd.{brokername}`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. + """ if loglevel: get_console_log(loglevel) - brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: @@ -107,32 +97,29 @@ async def maybe_spawn_brokerd( if portal is not None: yield portal - else: # no daemon has been spawned yet + else: + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel + ) as pikerd_portal: - log.info(f"Spawning {brokername} broker daemon") + if pikerd_portal is None: + # we are root so spawn brokerd directly in our tree + # the root nursery is accessed through process global state + await spawn_brokerd(brokername, loglevel=loglevel) - # retrieve any special config from the broker mod - tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - - async with tractor.open_nursery( - #debug_mode=debug_mode, - ) as nursery: - try: - # spawn new daemon - portal = await nursery.start_actor( - dname, - enable_modules=_data_mods + [brokermod.__name__], + else: + await pikerd_portal.run( + spawn_brokerd, + brokername=brokername, loglevel=loglevel, debug_mode=debug_mode, - **tractor_kwargs ) - async with tractor.wait_for_actor(dname) as portal: - yield portal - finally: - # client code may block indefinitely so cancel when - # teardown is invoked - await nursery.cancel() + async with tractor.wait_for_actor(dname) as portal: + yield portal @dataclass