Reimplement marketstore and elasticsearch daemons

Using the new `._ahab.start_ahab_service()` mngr of course, and now
support user config overrides (such that our defaults can be modified by
a keen user, say using a config file, or for testing). This is where the
functionality moved out of the `pikerd` init has been moved - instead of
being triggered by bool flag inputs to that factory.

For marketstore actually support overriding the entire yaml config via
runtime `_yaml_config_str: str` formatting with any passed user dict,
primarily focussing on supporting override of the sockaddrs for testing.
master
Tyler Goodlet 2023-05-23 11:39:18 -04:00
parent bd919f9d66
commit e7a172b656
2 changed files with 119 additions and 13 deletions

View File

@ -15,6 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
@ -122,3 +123,47 @@ def start_elasticsearch(
health_query, health_query,
chk_for_closed_msg, chk_for_closed_msg,
) )
@acm
async def start_ahab_daemon(
service_mngr: Services,
user_config: dict | None = None,
loglevel: str | None = None,
) -> tuple[str, dict]:
'''
Task entrypoint to start the estasticsearch docker container using
the service manager.
'''
from ._ahab import start_ahab_service
# dict-merge any user settings
conf: dict = _config.copy()
if user_config:
conf = conf | user_config
dname: str = 'esd'
log.info(f'Spawning `{dname}` supervisor')
async with start_ahab_service(
service_mngr,
dname,
# NOTE: docker-py client is passed at runtime
start_elasticsearch,
ep_kwargs={'user_config': conf},
loglevel=loglevel,
) as (
ctn_ready,
config,
(cid, pid),
):
log.info(
f'`{dname}` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
yield dname, conf

View File

@ -26,6 +26,8 @@
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from functools import partial
from pprint import pformat
from typing import ( from typing import (
Any, Any,
Union, Union,
@ -70,7 +72,7 @@ _config = {
'startup_timeout': 2, 'startup_timeout': 2,
} }
_yaml_config = ''' _yaml_config_str: str = '''
# piker's ``marketstore`` config. # piker's ``marketstore`` config.
# mount this config using: # mount this config using:
@ -112,18 +114,18 @@ triggers:
# config: # config:
# filter: "nasdaq" # filter: "nasdaq"
'''.format(**_config) '''
def start_marketstore( def start_marketstore(
client: docker.DockerClient, client: docker.DockerClient,
user_config: dict,
**kwargs, **kwargs,
) -> tuple[DockerContainer, dict[str, Any]]: ) -> tuple[DockerContainer, dict[str, Any]]:
''' '''
Start and supervise a marketstore instance with its config bind-mounted Start and supervise a marketstore instance with its config
in from the piker config directory on the system. bind-mounted in from the piker config directory on the system.
The equivalent cli cmd to this code is: The equivalent cli cmd to this code is:
@ -147,14 +149,16 @@ def start_marketstore(
os.mkdir(mktsdir) os.mkdir(mktsdir)
yml_file = os.path.join(mktsdir, 'mkts.yml') yml_file = os.path.join(mktsdir, 'mkts.yml')
yaml_config = _yaml_config_str.format(**user_config)
if not os.path.isfile(yml_file): if not os.path.isfile(yml_file):
log.warning( log.warning(
f'No `marketstore` config exists?: {yml_file}\n' f'No `marketstore` config exists?: {yml_file}\n'
'Generating new file from template:\n' 'Generating new file from template:\n'
f'{_yaml_config}\n' f'{yaml_config}\n'
) )
with open(yml_file, 'w') as yf: with open(yml_file, 'w') as yf:
yf.write(_yaml_config) yf.write(yaml_config)
# create a mount from user's local piker config dir into container # create a mount from user's local piker config dir into container
config_dir_mnt = docker.types.Mount( config_dir_mnt = docker.types.Mount(
@ -177,6 +181,9 @@ def start_marketstore(
type='bind', type='bind',
) )
grpc_listen_port = int(user_config['grpc_listen_port'])
ws_listen_port = int(user_config['ws_listen_port'])
dcntr: DockerContainer = client.containers.run( dcntr: DockerContainer = client.containers.run(
'alpacamarkets/marketstore:latest', 'alpacamarkets/marketstore:latest',
# do we need this for cmds? # do we need this for cmds?
@ -184,8 +191,8 @@ def start_marketstore(
# '-p 5993:5993', # '-p 5993:5993',
ports={ ports={
'5993/tcp': 5993, # jsonrpc / ws? f'{ws_listen_port}/tcp': ws_listen_port,
'5995/tcp': 5995, # grpc f'{grpc_listen_port}/tcp': grpc_listen_port,
}, },
mounts=[ mounts=[
config_dir_mnt, config_dir_mnt,
@ -205,7 +212,13 @@ def start_marketstore(
return "launching tcp listener for all services..." in msg return "launching tcp listener for all services..." in msg
async def stop_matcher(msg: str): async def stop_matcher(msg: str):
return "exiting..." in msg return (
# not sure when this happens, some kinda stop condition
"exiting..." in msg
# after we send SIGINT..
or "initiating graceful shutdown due to 'interrupt' request" in msg
)
return ( return (
dcntr, dcntr,
@ -217,6 +230,49 @@ def start_marketstore(
) )
@acm
async def start_ahab_daemon(
service_mngr: Services,
user_config: dict | None = None,
loglevel: str | None = None,
) -> tuple[str, dict]:
'''
Task entrypoint to start the marketstore docker container using the
service manager.
'''
from ._ahab import start_ahab_service
# dict-merge any user settings
conf: dict = _config.copy()
if user_config:
conf: dict = conf | user_config
dname: str = 'marketstored'
log.info(f'Spawning `{dname}` supervisor')
async with start_ahab_service(
service_mngr,
dname,
# NOTE: docker-py client is passed at runtime
start_marketstore,
ep_kwargs={'user_config': conf},
loglevel=loglevel,
) as (
ctn_ready,
config,
(cid, pid),
):
log.info(
f'`{dname}` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
yield dname, conf
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
@ -681,9 +737,12 @@ async def open_tsdb_client(
delayed=False, delayed=False,
) )
# load any user service settings for connecting to tsdb # load any user service settings for connecting to
conf, path = config.load('conf') rootconf, path = config.load(
tsdbconf = conf['network'].get('tsdb') 'conf',
touch_if_dne=True,
)
tsdbconf = rootconf['network'].get('tsdb')
backend = tsdbconf.pop('backend') backend = tsdbconf.pop('backend')
async with ( async with (
open_storage_client( open_storage_client(
@ -903,3 +962,5 @@ async def stream_quotes(
if quotes: if quotes:
yield quotes yield quotes