From e196e9d1a0894e4308acc23f393139ec0ad616cf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 May 2022 14:38:38 -0400 Subject: [PATCH] Factor `marketstore` container specifics into `piker.data.marketstore` --- piker/cli/__init__.py | 7 ++- piker/data/_ahab.py | 117 +++++++------------------------------ piker/data/marketstore.py | 119 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 100 deletions(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 7d58ff4d..516a1b96 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -72,14 +72,15 @@ def pikerd(loglevel, host, tl, pdb, tsdb): trio.open_nursery() as n, ): if tsdb: - # TODO: - # async with maybe_open_marketstored(): - from piker.data._ahab import start_ahab + from piker.data.marketstore import start_marketstore + log.info('Spawning `marketstore` supervisor') ctn_ready, config, (cid, pid) = await n.start( start_ahab, 'marketstored', + start_marketstore, + ) log.info( f'`marketstored` up pid:{pid}\n' diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 87f7ef59..e0f79be4 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -21,6 +21,7 @@ Supervisor for docker with included specific-image service helpers. import os from typing import ( Optional, + Callable, Any, ) from contextlib import asynccontextmanager as acm @@ -28,6 +29,7 @@ from contextlib import asynccontextmanager as acm import trio from trio_typing import TaskStatus import tractor +from tractor.msg import NamespacePath import docker import json from docker.models.containers import Container as DockerContainer @@ -39,50 +41,6 @@ from .. import config log = get_logger(__name__) -_config = { - 'grpc_listen_port': 5995, - 'ws_listen_port': 5993, - 'log_level': 'debug', -} - -_yaml_config = ''' -# piker's ``marketstore`` config. - -# mount this config using: -# sudo docker run --mount \ -# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ -# 5993:5993 alpacamarkets/marketstore:latest - -root_directory: data -listen_port: {ws_listen_port} -grpc_listen_port: {grpc_listen_port} -log_level: {log_level} -queryable: true -stop_grace_period: 0 -wal_rotate_interval: 5 -stale_threshold: 5 -enable_add: true -enable_remove: false - -triggers: - - module: ondiskagg.so - on: "*/1Sec/OHLCV" - config: - # filter: "nasdaq" - destinations: - - 1Min - - 5Min - - 15Min - - 1H - - 1D - - - module: stream.so - on: '*/*/*' - # config: - # filter: "nasdaq" - -'''.format(**_config) - class DockerNotStarted(Exception): 'Prolly you dint start da daemon bruh' @@ -263,63 +221,22 @@ class Container: @tractor.context -async def open_marketstored( +async def open_ahabd( ctx: tractor.Context, + endpoint: str, # ns-pointer str-msg-type + **kwargs, ) -> None: - ''' - Start and supervise a marketstore instance with its config bind-mounted - in from the piker config directory on the system. - - The equivalent cli cmd to this code is: - - sudo docker run --mount \ - type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ - 5993:5993 alpacamarkets/marketstore:latest - - ''' get_console_log('info', name=__name__) async with open_docker() as client: - # create a mount from user's local piker config dir into container - config_dir_mnt = docker.types.Mount( - target='/etc', - source=config._config_dir, - type='bind', - ) - - # create a user config subdir where the marketstore - # backing filesystem database can be persisted. - persistent_data_dir = os.path.join( - config._config_dir, 'data', - ) - if not os.path.isdir(persistent_data_dir): - os.mkdir(persistent_data_dir) - - data_dir_mnt = docker.types.Mount( - target='/data', - source=persistent_data_dir, - type='bind', - ) - - dcntr: DockerContainer = client.containers.run( - 'alpacamarkets/marketstore:latest', - # do we need this for cmds? - # '-i', - - # '-p 5993:5993', - ports={ - '5993/tcp': 5993, # jsonrpc - '5995/tcp': 5995, # grpc - }, - mounts=[config_dir_mnt, data_dir_mnt], - detach=True, - # stop_signal='SIGINT', - init=True, - # remove=True, - ) + # TODO: eventually offer a config-oriented API to do the mounts, + # params, etc. passing to ``Containter.run()``? + # call into endpoint for container config/init + ep_func = NamespacePath(endpoint).load_ref() + dcntr, cntr_config = ep_func(client) cntr = Container(dcntr) with trio.move_on_after(1): @@ -332,7 +249,11 @@ async def open_marketstored( 'Failed to start `marketstore` check logs deats' ) - await ctx.started((cntr.cntr.id, os.getpid())) + await ctx.started(( + cntr.cntr.id, + os.getpid(), + cntr_config, + )) try: @@ -355,6 +276,7 @@ async def open_marketstored( async def start_ahab( service_name: str, + endpoint: Callable[docker.DockerClient, DockerContainer], task_status: TaskStatus[ tuple[ trio.Event, @@ -400,14 +322,15 @@ async def start_ahab( ) async with portal.open_context( - open_marketstored, + open_ahabd, + endpoint=str(NamespacePath.from_ref(endpoint)), ) as (ctx, first): - cid, pid = first + cid, pid, cntr_config = first task_status.started(( cn_ready, - _config, + cntr_config, (cid, pid), )) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 92d860da..fd0e9318 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -23,6 +23,7 @@ - todo: tick sequence stream-cloning for testing ''' +from __future__ import annotations from contextlib import asynccontextmanager as acm from datetime import datetime from pprint import pformat @@ -30,6 +31,7 @@ from typing import ( Any, Optional, Union, + TYPE_CHECKING, ) import time from math import isnan @@ -49,12 +51,129 @@ from anyio_marketstore import ( import pendulum import purerpc +if TYPE_CHECKING: + import docker + from ._ahab import DockerContainer + from .feed import maybe_open_feed from ..log import get_logger, get_console_log log = get_logger(__name__) + +# container level config +_config = { + 'grpc_listen_port': 5995, + 'ws_listen_port': 5993, + 'log_level': 'debug', +} + +_yaml_config = ''' +# piker's ``marketstore`` config. + +# mount this config using: +# sudo docker run --mount \ +# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ +# 5993:5993 alpacamarkets/marketstore:latest + +root_directory: data +listen_port: {ws_listen_port} +grpc_listen_port: {grpc_listen_port} +log_level: {log_level} +queryable: true +stop_grace_period: 0 +wal_rotate_interval: 5 +stale_threshold: 5 +enable_add: true +enable_remove: false + +triggers: + - module: ondiskagg.so + on: "*/1Sec/OHLCV" + config: + # filter: "nasdaq" + destinations: + - 1Min + - 5Min + - 15Min + - 1H + - 1D + + - module: stream.so + on: '*/*/*' + # config: + # filter: "nasdaq" + +'''.format(**_config) + + +def start_marketstore( + client: docker.DockerClient, + + **kwargs, + +) -> tuple[DockerContainer, dict[str, Any]]: + ''' + Start and supervise a marketstore instance with its config bind-mounted + in from the piker config directory on the system. + + The equivalent cli cmd to this code is: + + sudo docker run --mount \ + type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ + 5993:5993 alpacamarkets/marketstore:latest + + ''' + import os + import docker + from .. import config + + get_console_log('info', name=__name__) + + # create a mount from user's local piker config dir into container + config_dir_mnt = docker.types.Mount( + target='/etc', + source=config._config_dir, + type='bind', + ) + + # create a user config subdir where the marketstore + # backing filesystem database can be persisted. + persistent_data_dir = os.path.join( + config._config_dir, 'data', + ) + if not os.path.isdir(persistent_data_dir): + os.mkdir(persistent_data_dir) + + data_dir_mnt = docker.types.Mount( + target='/data', + source=persistent_data_dir, + type='bind', + ) + + dcntr: DockerContainer = client.containers.run( + 'alpacamarkets/marketstore:latest', + # do we need this for cmds? + # '-i', + + # '-p 5993:5993', + ports={ + '5993/tcp': 5993, # jsonrpc / ws? + '5995/tcp': 5995, # grpc + }, + mounts=[ + config_dir_mnt, + data_dir_mnt, + ], + detach=True, + # stop_signal='SIGINT', + init=True, + # remove=True, + ) + return dcntr, _config + + _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)