Add piker root daemon spawning machinery

Refactor maybe_spawn_brokerd to adapt to new process tree structure
and add a ``maybe_open_pikerd``.
supervise
Guillermo Rodriguez 2021-01-31 18:11:50 -03:00 committed by Tyler Goodlet
parent 4a590edcc3
commit 189c56c012
2 changed files with 104 additions and 32 deletions

85
piker/_daemon.py 100644
View File

@ -0,0 +1,85 @@
"""
pikerd daemon lifecylcle & rpc
"""
from typing import Optional
from contextlib import asynccontextmanager
import tractor
from .log import get_logger, get_console_log
from .brokers import get_brokermod
log = get_logger(__name__)
_root_nursery: Optional[tractor._trionics.ActorNursery] = None
root_dname = 'pikerd'
root_modules = [
__name__,
'piker._ems'
]
@asynccontextmanager
async def maybe_open_pikerd(
loglevel: Optional[str] = None
) -> Optional[tractor._portal.Portal]:
"""If no ``pikerd`` daemon-root-actor can be found,
assume that role and return a portal to myself
"""
global _root_nursery
if loglevel:
get_console_log(loglevel)
async with tractor.find_actor(root_dname) as portal:
if portal is not None: # pikerd exists
yield portal
else: # assume role
async with tractor.open_root_actor(
name=root_dname,
loglevel=loglevel,
enable_modules=root_modules
):
# init root nursery
try:
async with tractor.open_nursery() as nursery:
_root_nursery = nursery
yield None
finally:
# client code may block indefinitely so cancel when
# teardown is invoked
await nursery.cancel()
# brokerd enable modules
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data._buffer'
]
async def spawn_brokerd(
brokername,
loglevel: Optional[str] = None,
**tractor_kwargs
):
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
log.info(f'Spawning {brokername} broker daemon')
tractor_kwargs = getattr(brokermod, '_spawnkwargs', {})
# TODO: raise exception when _root_nursery == None?
global _root_nursery
await _root_nursery.start_actor(
dname,
enable_modules=_data_mods + [brokermod.__name__],
loglevel=loglevel,
**tractor_kwargs
)

View File

@ -29,11 +29,11 @@ from typing import (
Dict, Any, Sequence, AsyncIterator, Optional
)
import trio
import tractor
from ..brokers import get_brokermod
from ..log import get_logger, get_console_log
from .._daemon import spawn_brokerd, maybe_open_pikerd
from ._normalize import iterticks
from ._sharedmem import (
maybe_open_shm_array,
@ -48,7 +48,6 @@ from ._buffer import (
subscribe_ohlc_for_increment
)
__all__ = [
'iterticks',
'maybe_open_shm_array',
@ -75,15 +74,6 @@ def get_ingestormod(name: str) -> ModuleType:
return module
# capable rpc modules
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data._buffer',
]
@asynccontextmanager
async def maybe_spawn_brokerd(
brokername: str,
@ -95,11 +85,11 @@ async def maybe_spawn_brokerd(
) -> tractor._portal.Portal:
"""If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
"""
if loglevel:
get_console_log(loglevel)
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
async with tractor.find_actor(dname) as portal:
@ -107,33 +97,30 @@ async def maybe_spawn_brokerd(
if portal is not None:
yield portal
else: # no daemon has been spawned yet
else:
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel
) as pikerd_portal:
log.info(f"Spawning {brokername} broker daemon")
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_brokerd(brokername, loglevel=loglevel)
# retrieve any special config from the broker mod
tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
async with tractor.open_nursery(
#debug_mode=debug_mode,
) as nursery:
try:
# spawn new daemon
portal = await nursery.start_actor(
dname,
enable_modules=_data_mods + [brokermod.__name__],
else:
await pikerd_portal.run(
spawn_brokerd,
brokername=brokername,
loglevel=loglevel,
debug_mode=debug_mode,
**tractor_kwargs
)
async with tractor.wait_for_actor(dname) as portal:
yield portal
finally:
# client code may block indefinitely so cancel when
# teardown is invoked
await nursery.cancel()
@dataclass
class Feed: