From e7a172b6565c9f65b42d545869aa7a087f6cf90f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 23 May 2023 11:39:18 -0400 Subject: [PATCH] Reimplement marketstore and elasticsearch daemons Using the new `._ahab.start_ahab_service()` mngr of course, and now support user config overrides (such that our defaults can be modified by a keen user, say using a config file, or for testing). This is where the functionality moved out of the `pikerd` init has been moved - instead of being triggered by bool flag inputs to that factory. For marketstore actually support overriding the entire yaml config via runtime `_yaml_config_str: str` formatting with any passed user dict, primarily focussing on supporting override of the sockaddrs for testing. --- piker/service/elastic.py | 45 +++++++++++++++++++ piker/service/marketstore.py | 87 ++++++++++++++++++++++++++++++------ 2 files changed, 119 insertions(+), 13 deletions(-) diff --git a/piker/service/elastic.py b/piker/service/elastic.py index 6714a9ec..902f4fde 100644 --- a/piker/service/elastic.py +++ b/piker/service/elastic.py @@ -15,6 +15,7 @@ # along with this program. If not, see . from __future__ import annotations +from contextlib import asynccontextmanager as acm from typing import ( Any, TYPE_CHECKING, @@ -122,3 +123,47 @@ def start_elasticsearch( health_query, chk_for_closed_msg, ) + + +@acm +async def start_ahab_daemon( + service_mngr: Services, + user_config: dict | None = None, + loglevel: str | None = None, + +) -> tuple[str, dict]: + ''' + Task entrypoint to start the estasticsearch docker container using + the service manager. + + ''' + from ._ahab import start_ahab_service + + # dict-merge any user settings + conf: dict = _config.copy() + if user_config: + conf = conf | user_config + + dname: str = 'esd' + log.info(f'Spawning `{dname}` supervisor') + async with start_ahab_service( + service_mngr, + dname, + + # NOTE: docker-py client is passed at runtime + start_elasticsearch, + ep_kwargs={'user_config': conf}, + loglevel=loglevel, + + ) as ( + ctn_ready, + config, + (cid, pid), + ): + log.info( + f'`{dname}` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + yield dname, conf diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index 4ca496b5..930c44da 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -26,6 +26,8 @@ from __future__ import annotations from contextlib import asynccontextmanager as acm from datetime import datetime +from functools import partial +from pprint import pformat from typing import ( Any, Union, @@ -70,7 +72,7 @@ _config = { 'startup_timeout': 2, } -_yaml_config = ''' +_yaml_config_str: str = ''' # piker's ``marketstore`` config. # mount this config using: @@ -112,18 +114,18 @@ triggers: # config: # filter: "nasdaq" -'''.format(**_config) +''' def start_marketstore( client: docker.DockerClient, - + user_config: dict, **kwargs, ) -> tuple[DockerContainer, dict[str, Any]]: ''' - Start and supervise a marketstore instance with its config bind-mounted - in from the piker config directory on the system. + Start and supervise a marketstore instance with its config + bind-mounted in from the piker config directory on the system. The equivalent cli cmd to this code is: @@ -147,14 +149,16 @@ def start_marketstore( os.mkdir(mktsdir) yml_file = os.path.join(mktsdir, 'mkts.yml') + yaml_config = _yaml_config_str.format(**user_config) + if not os.path.isfile(yml_file): log.warning( f'No `marketstore` config exists?: {yml_file}\n' 'Generating new file from template:\n' - f'{_yaml_config}\n' + f'{yaml_config}\n' ) with open(yml_file, 'w') as yf: - yf.write(_yaml_config) + yf.write(yaml_config) # create a mount from user's local piker config dir into container config_dir_mnt = docker.types.Mount( @@ -177,6 +181,9 @@ def start_marketstore( type='bind', ) + grpc_listen_port = int(user_config['grpc_listen_port']) + ws_listen_port = int(user_config['ws_listen_port']) + dcntr: DockerContainer = client.containers.run( 'alpacamarkets/marketstore:latest', # do we need this for cmds? @@ -184,8 +191,8 @@ def start_marketstore( # '-p 5993:5993', ports={ - '5993/tcp': 5993, # jsonrpc / ws? - '5995/tcp': 5995, # grpc + f'{ws_listen_port}/tcp': ws_listen_port, + f'{grpc_listen_port}/tcp': grpc_listen_port, }, mounts=[ config_dir_mnt, @@ -205,7 +212,13 @@ def start_marketstore( return "launching tcp listener for all services..." in msg async def stop_matcher(msg: str): - return "exiting..." in msg + return ( + # not sure when this happens, some kinda stop condition + "exiting..." in msg + + # after we send SIGINT.. + or "initiating graceful shutdown due to 'interrupt' request" in msg + ) return ( dcntr, @@ -217,6 +230,49 @@ def start_marketstore( ) +@acm +async def start_ahab_daemon( + service_mngr: Services, + user_config: dict | None = None, + loglevel: str | None = None, + +) -> tuple[str, dict]: + ''' + Task entrypoint to start the marketstore docker container using the + service manager. + + ''' + from ._ahab import start_ahab_service + + # dict-merge any user settings + conf: dict = _config.copy() + if user_config: + conf: dict = conf | user_config + + dname: str = 'marketstored' + log.info(f'Spawning `{dname}` supervisor') + async with start_ahab_service( + service_mngr, + dname, + + # NOTE: docker-py client is passed at runtime + start_marketstore, + ep_kwargs={'user_config': conf}, + loglevel=loglevel, + ) as ( + ctn_ready, + config, + (cid, pid), + ): + log.info( + f'`{dname}` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + yield dname, conf + + _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) @@ -681,9 +737,12 @@ async def open_tsdb_client( delayed=False, ) - # load any user service settings for connecting to tsdb - conf, path = config.load('conf') - tsdbconf = conf['network'].get('tsdb') + # load any user service settings for connecting to + rootconf, path = config.load( + 'conf', + touch_if_dne=True, + ) + tsdbconf = rootconf['network'].get('tsdb') backend = tsdbconf.pop('backend') async with ( open_storage_client( @@ -903,3 +962,5 @@ async def stream_quotes( if quotes: yield quotes + +