Moved database initialization code inside the open_pikerd context manager

explicit_write_pps_on_exit
Esmeralda Gallardo 2023-02-21 13:21:35 -03:00 committed by Guillermo Rodriguez
parent bf9ca4a4a8
commit 3ce8bfa012
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
2 changed files with 55 additions and 39 deletions

View File

@ -41,6 +41,9 @@ from .log import (
) )
from .brokers import get_brokermod from .brokers import get_brokermod
from pprint import pformat
from functools import partial
log = get_logger(__name__) log = get_logger(__name__)
@ -313,6 +316,9 @@ async def open_piker_runtime(
@acm @acm
async def open_pikerd( async def open_pikerd(
tsdb: bool,
es: bool,
loglevel: str | None = None, loglevel: str | None = None,
# XXX: you should pretty much never want debug mode # XXX: you should pretty much never want debug mode
@ -349,12 +355,54 @@ async def open_pikerd(
): ):
assert root_actor.accept_addr == reg_addr assert root_actor.accept_addr == reg_addr
if tsdb:
from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
start_ahab,
'marketstored',
start_marketstore,
)
log.info(
f'`marketstored` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
if es:
from piker.data._ahab import start_ahab
from piker.data.elastic import start_elasticsearch
log.info('Spawning `elasticsearch` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
partial(
start_ahab,
'elasticsearch',
start_elasticsearch,
start_timeout=30.0
)
)
log.info(
f'`elasticsearch` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
# assign globally for future daemon/task creation # assign globally for future daemon/task creation
Services.actor_n = actor_nursery Services.actor_n = actor_nursery
Services.service_n = service_nursery Services.service_n = service_nursery
Services.debug_mode = debug_mode Services.debug_mode = debug_mode
try: try:
yield Services yield Services
finally: finally:
# TODO: is this more clever/efficient? # TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks: # if 'samplerd' in Services.service_tasks:
@ -388,6 +436,8 @@ async def maybe_open_runtime(
@acm @acm
async def maybe_open_pikerd( async def maybe_open_pikerd(
tsdb: bool = False,
es: bool = False,
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
registry_addr: None | tuple = None, registry_addr: None | tuple = None,
@ -436,6 +486,8 @@ async def maybe_open_pikerd(
# presume pikerd role since no daemon could be found at # presume pikerd role since no daemon could be found at
# configured address # configured address
async with open_pikerd( async with open_pikerd(
tsdb=tsdb,
es=es,
loglevel=loglevel, loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False), debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr, registry_addr=registry_addr,

View File

@ -90,6 +90,8 @@ def pikerd(
async def main(): async def main():
async with ( async with (
open_pikerd( open_pikerd(
tsdb=tsdb,
es=es,
loglevel=loglevel, loglevel=loglevel,
debug_mode=pdb, debug_mode=pdb,
registry_addr=reg_addr, registry_addr=reg_addr,
@ -97,44 +99,6 @@ def pikerd(
), # normally delivers a ``Services`` handle ), # normally delivers a ``Services`` handle
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
if tsdb:
from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await n.start(
start_ahab,
'marketstored',
start_marketstore,
)
log.info(
f'`marketstored` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
if es:
from piker.data._ahab import start_ahab
from piker.data.elasticsearch import start_elasticsearch
log.info('Spawning `elasticsearch` supervisor')
ctn_ready, config, (cid, pid) = await n.start(
partial(
start_ahab,
'elasticsearch',
start_elasticsearch,
start_timeout=30.0
)
)
log.info(
f'`elasticsearch` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
await trio.sleep_forever() await trio.sleep_forever()
@ -241,7 +205,7 @@ def services(config, tl, ports):
def _load_clis() -> None: def _load_clis() -> None:
from ..data import marketstore # noqa from ..data import marketstore # noqa
from ..data import elasticsearch from ..data import elastic
from ..data import cli # noqa from ..data import cli # noqa
from ..brokers import cli # noqa from ..brokers import cli # noqa
from ..ui import cli # noqa from ..ui import cli # noqa