From 0dc24bd475eafdd4cfe130db0d82902d5c71acf1 Mon Sep 17 00:00:00 2001 From: Esmeralda Gallardo Date: Wed, 15 Feb 2023 14:08:23 -0300 Subject: [PATCH 1/9] Added dockerfile, yaml file and script to statrt an elasticsearch's docker instance. --- dockering/elastic/Dockerfile | 11 ++ dockering/elastic/elasticsearch.yaml | 5 + piker/cli/__init__.py | 27 ++- piker/data/elasticsearch.py | 277 +++++++++++++++++++++++++++ 4 files changed, 319 insertions(+), 1 deletion(-) create mode 100644 dockering/elastic/Dockerfile create mode 100644 dockering/elastic/elasticsearch.yaml create mode 100644 piker/data/elasticsearch.py diff --git a/dockering/elastic/Dockerfile b/dockering/elastic/Dockerfile new file mode 100644 index 00000000..091b8d08 --- /dev/null +++ b/dockering/elastic/Dockerfile @@ -0,0 +1,11 @@ +FROM elasticsearch:7.17.4 + +ENV ES_JAVA_OPTS "-Xms2g -Xmx2g" +ENV ELASTIC_USERNAME "elastic" +ENV ELASTIC_PASSWORD "password" + +COPY elasticsearch.yml /usr/share/elasticsearch/config/ + +RUN printf "password" | ./bin/elasticsearch-keystore add -f -x "bootstrap.password" + +EXPOSE 9200 \ No newline at end of file diff --git a/dockering/elastic/elasticsearch.yaml b/dockering/elastic/elasticsearch.yaml new file mode 100644 index 00000000..25eff005 --- /dev/null +++ b/dockering/elastic/elasticsearch.yaml @@ -0,0 +1,5 @@ +network.host: 0.0.0.0 + +http.port: 9200 + +discovery.type: single-node \ No newline at end of file diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 07484634..d2a07b75 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -48,6 +48,11 @@ log = get_logger('cli') is_flag=True, help='Enable local ``marketstore`` instance' ) +@click.option( + '--es', + is_flag=True, + help='Enable local ``elasticsearch`` instance' +) def pikerd( loglevel: str, host: str, @@ -55,11 +60,13 @@ def pikerd( tl: bool, pdb: bool, tsdb: bool, + es: bool, ): ''' Spawn the piker broker-daemon. ''' + from .._daemon import open_pikerd log = get_console_log(loglevel) @@ -80,7 +87,6 @@ def pikerd( ) async def main(): - async with ( open_pikerd( loglevel=loglevel, @@ -108,6 +114,24 @@ def pikerd( f'config: {pformat(config)}' ) + if es: + from piker.data._ahab import start_ahab + from piker.data.elasticsearch import start_elasticsearch + + log.info('Spawning `elasticsearch` supervisor') + ctn_ready, config, (cid, pid) = await n.start( + start_ahab, + 'elasticsearch', + start_elasticsearch, + ) + + log.info( + f'`elasticsearch` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + await trio.sleep_forever() trio.run(main) @@ -213,6 +237,7 @@ def services(config, tl, ports): def _load_clis() -> None: from ..data import marketstore # noqa + from ..data import elasticsearch from ..data import cli # noqa from ..brokers import cli # noqa from ..ui import cli # noqa diff --git a/piker/data/elasticsearch.py b/piker/data/elasticsearch.py new file mode 100644 index 00000000..6fa9c1d6 --- /dev/null +++ b/piker/data/elasticsearch.py @@ -0,0 +1,277 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +``elasticsearch`` integration. + +- client management routines +- ticK data ingest routines +- websocket client for subscribing to write triggers +- todo: tick sequence stream-cloning for testing + +''' +from __future__ import annotations +from contextlib import asynccontextmanager as acm +from datetime import datetime +from pprint import pformat +from typing import ( + Any, + Optional, + Union, + TYPE_CHECKING, +) +import time +from math import isnan + +from bidict import bidict +from msgspec.msgpack import encode, decode +import pyqtgraph as pg +import numpy as np +import tractor +from trio_websocket import open_websocket_url +import pendulum +import purerpc + +if TYPE_CHECKING: + import docker + from ._ahab import DockerContainer + +from .feed import maybe_open_feed +from ..log import get_logger, get_console_log +from .._profile import Profiler + +from elasticsearch import Elasticsearch +from docker.types import LogConfig + +log = get_logger(__name__) + + +# container level config +_config = { + 'port': 9200, + 'log_level': 'debug', +} + +_yaml_config = ''' +# piker's ``elasticsearch`` config. + +# mount this config using: +# sudo docker run \ + # -itd \ + # --rm \ + # --network=host \ + # --mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ + # --env "elastic_username=elastic" \ + # --env "elastic_password=password" \ + # --env "xpack.security.enabled=false" \ + # elastic + +root_directory: data +port: {port} +log_level: {log_level} +queryable: true +stop_grace_period: 0 +wal_rotate_interval: 5 +stale_threshold: 5 +enable_add: true +enable_remove: false + +triggers: + - module: ondiskagg.so + on: "*/1Sec/OHLCV" + config: + # filter: "nasdaq" + destinations: + - 1Min + - 5Min + - 15Min + - 1H + - 1D + + - module: stream.so + on: '*/*/*' + # config: + # filter: "nasdaq" + +'''.format(**_config) + + +def start_elasticsearch( + client: docker.DockerClient, + + **kwargs, + +) -> tuple[DockerContainer, dict[str, Any]]: + ''' + Start and supervise an elasticsearch instance with its config bind-mounted + in from the piker config directory on the system. + + The equivalent cli cmd to this code is: + + sudo docker run \ + -itd \ + --rm \ + --network=host \ + --mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ + --env "elastic_username=elastic" \ + --env "elastic_password=password" \ + --env "xpack.security.enabled=false" \ + elastic + + ''' + import os + import docker + from .. import config + get_console_log('info', name=__name__) + + esdir = os.path.join(config._config_dir, 'elasticsearch') + + # create dirs when dne + if not os.path.isdir(config._config_dir): + os.mkdir(config._config_dir) + + if not os.path.isdir(esdir): + os.mkdir(esdir) + + yml_file = os.path.join(esdir, 'es.yml') + if not os.path.isfile(yml_file): + log.warning( + f'No `elasticsearch` config exists?: {yml_file}\n' + 'Generating new file from template:\n' + f'{_yaml_config}\n' + ) + with open(yml_file, 'w') as yf: + yf.write(_yaml_config) + + # create a mount from user's local piker config dir into container + config_dir_mnt = docker.types.Mount( + target='/etc', + source=esdir, + type='bind', + ) + + # create a user config subdir where the elasticsearch + # backing filesystem database can be persisted. + persistent_data_dir = os.path.join( + esdir, 'data', + ) + if not os.path.isdir(persistent_data_dir): + os.mkdir(persistent_data_dir) + + data_dir_mnt = docker.types.Mount( + target='/data', + source=persistent_data_dir, + type='bind', + ) + + dcntr: DockerContainer = client.containers.run( + 'elastic', + ports={ + '9200':9200, + }, + mounts=[ + config_dir_mnt, + data_dir_mnt, + ], + # log_config=log_cf, + detach=True, + # stop_signal='SIGINT', + # init=True, + # remove=True, + ) + + return ( + dcntr, + _config, + + # expected startup and stop msgs + "launching listener for all services...", + "exiting...", + ) + + +_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') +_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) + +_tick_dt = [ + # these two are required for as a "primary key" + ('Epoch', 'i8'), + ('Nanoseconds', 'i4'), + ('IsTrade', 'i1'), + ('IsBid', 'i1'), + ('Price', 'f4'), + ('Size', 'f4') +] + +_quote_dt = [ + # these two are required for as a "primary key" + ('Epoch', 'i8'), + ('Nanoseconds', 'i4'), + + ('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask) + # ('fill_time', 'f4'), + ('Last', 'f4'), + ('Bid', 'f4'), + ('Bsize', 'i8'), + ('Asize', 'i8'), + ('Ask', 'f4'), + ('Size', 'i8'), + ('Volume', 'i8'), + # ('brokerd_ts', 'i64'), + # ('VWAP', 'f4') +] + +_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) +_tick_map = { + 'Up': 1, + 'Equal': 0, + 'Down': -1, + None: np.nan, +} + +_ohlcv_dt = [ + # these two are required for as a "primary key" + ('Epoch', 'i8'), + # ('Nanoseconds', 'i4'), + + # ohlcv sampling + ('Open', 'f4'), + ('High', 'f4'), + ('Low', 'f4'), + ('Close', 'f4'), + ('Volume', 'f4'), +] + + +ohlc_key_map = bidict({ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', +}) + +# # map of seconds ints to "time frame" accepted keys +tf_in_1s = bidict({ + 1: '1Sec', + 60: '1Min', + 60*5: '5Min', + 60*15: '15Min', + 60*30: '30Min', + 60*60: '1H', + 60*60*24: '1D', +}) From 17a4fe4b2fa0c82a114979242b760ae02b42fac7 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Wed, 15 Feb 2023 15:00:57 -0300 Subject: [PATCH 2/9] Trim unnecesary stuff left from marketstore copy, also fix elastic config name for docker build, add elasticsearch to dependencies --- dockering/elastic/Dockerfile | 2 +- .../{elasticsearch.yaml => elasticsearch.yml} | 0 piker/data/elasticsearch.py | 191 +----------------- setup.py | 5 +- 4 files changed, 16 insertions(+), 182 deletions(-) rename dockering/elastic/{elasticsearch.yaml => elasticsearch.yml} (100%) diff --git a/dockering/elastic/Dockerfile b/dockering/elastic/Dockerfile index 091b8d08..16f84cb2 100644 --- a/dockering/elastic/Dockerfile +++ b/dockering/elastic/Dockerfile @@ -8,4 +8,4 @@ COPY elasticsearch.yml /usr/share/elasticsearch/config/ RUN printf "password" | ./bin/elasticsearch-keystore add -f -x "bootstrap.password" -EXPOSE 9200 \ No newline at end of file +EXPOSE 9200 diff --git a/dockering/elastic/elasticsearch.yaml b/dockering/elastic/elasticsearch.yml similarity index 100% rename from dockering/elastic/elasticsearch.yaml rename to dockering/elastic/elasticsearch.yml diff --git a/piker/data/elasticsearch.py b/piker/data/elasticsearch.py index 6fa9c1d6..28028350 100644 --- a/piker/data/elasticsearch.py +++ b/piker/data/elasticsearch.py @@ -65,49 +65,6 @@ _config = { 'log_level': 'debug', } -_yaml_config = ''' -# piker's ``elasticsearch`` config. - -# mount this config using: -# sudo docker run \ - # -itd \ - # --rm \ - # --network=host \ - # --mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ - # --env "elastic_username=elastic" \ - # --env "elastic_password=password" \ - # --env "xpack.security.enabled=false" \ - # elastic - -root_directory: data -port: {port} -log_level: {log_level} -queryable: true -stop_grace_period: 0 -wal_rotate_interval: 5 -stale_threshold: 5 -enable_add: true -enable_remove: false - -triggers: - - module: ondiskagg.so - on: "*/1Sec/OHLCV" - config: - # filter: "nasdaq" - destinations: - - 1Min - - 5Min - - 15Min - - 1H - - 1D - - - module: stream.so - on: '*/*/*' - # config: - # filter: "nasdaq" - -'''.format(**_config) - def start_elasticsearch( client: docker.DockerClient, @@ -122,156 +79,30 @@ def start_elasticsearch( The equivalent cli cmd to this code is: sudo docker run \ - -itd \ - --rm \ - --network=host \ - --mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ - --env "elastic_username=elastic" \ - --env "elastic_password=password" \ - --env "xpack.security.enabled=false" \ - elastic + -itd \ + --rm \ + --network=host \ + --mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ + --env "elastic_username=elastic" \ + --env "elastic_password=password" \ + --env "xpack.security.enabled=false" \ + elastic ''' - import os import docker - from .. import config get_console_log('info', name=__name__) - esdir = os.path.join(config._config_dir, 'elasticsearch') - - # create dirs when dne - if not os.path.isdir(config._config_dir): - os.mkdir(config._config_dir) - - if not os.path.isdir(esdir): - os.mkdir(esdir) - - yml_file = os.path.join(esdir, 'es.yml') - if not os.path.isfile(yml_file): - log.warning( - f'No `elasticsearch` config exists?: {yml_file}\n' - 'Generating new file from template:\n' - f'{_yaml_config}\n' - ) - with open(yml_file, 'w') as yf: - yf.write(_yaml_config) - - # create a mount from user's local piker config dir into container - config_dir_mnt = docker.types.Mount( - target='/etc', - source=esdir, - type='bind', - ) - - # create a user config subdir where the elasticsearch - # backing filesystem database can be persisted. - persistent_data_dir = os.path.join( - esdir, 'data', - ) - if not os.path.isdir(persistent_data_dir): - os.mkdir(persistent_data_dir) - - data_dir_mnt = docker.types.Mount( - target='/data', - source=persistent_data_dir, - type='bind', - ) - dcntr: DockerContainer = client.containers.run( 'elastic', - ports={ - '9200':9200, - }, - mounts=[ - config_dir_mnt, - data_dir_mnt, - ], - # log_config=log_cf, + network='host', detach=True, - # stop_signal='SIGINT', - # init=True, - # remove=True, + remove=True, ) return ( dcntr, - _config, - + {}, # expected startup and stop msgs "launching listener for all services...", "exiting...", ) - - -_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') -_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) - -_tick_dt = [ - # these two are required for as a "primary key" - ('Epoch', 'i8'), - ('Nanoseconds', 'i4'), - ('IsTrade', 'i1'), - ('IsBid', 'i1'), - ('Price', 'f4'), - ('Size', 'f4') -] - -_quote_dt = [ - # these two are required for as a "primary key" - ('Epoch', 'i8'), - ('Nanoseconds', 'i4'), - - ('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask) - # ('fill_time', 'f4'), - ('Last', 'f4'), - ('Bid', 'f4'), - ('Bsize', 'i8'), - ('Asize', 'i8'), - ('Ask', 'f4'), - ('Size', 'i8'), - ('Volume', 'i8'), - # ('brokerd_ts', 'i64'), - # ('VWAP', 'f4') -] - -_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) -_tick_map = { - 'Up': 1, - 'Equal': 0, - 'Down': -1, - None: np.nan, -} - -_ohlcv_dt = [ - # these two are required for as a "primary key" - ('Epoch', 'i8'), - # ('Nanoseconds', 'i4'), - - # ohlcv sampling - ('Open', 'f4'), - ('High', 'f4'), - ('Low', 'f4'), - ('Close', 'f4'), - ('Volume', 'f4'), -] - - -ohlc_key_map = bidict({ - 'Epoch': 'time', - 'Open': 'open', - 'High': 'high', - 'Low': 'low', - 'Close': 'close', - 'Volume': 'volume', -}) - -# # map of seconds ints to "time frame" accepted keys -tf_in_1s = bidict({ - 1: '1Sec', - 60: '1Min', - 60*5: '5Min', - 60*15: '15Min', - 60*30: '30Min', - 60*60: '1H', - 60*60*24: '1D', -}) diff --git a/setup.py b/setup.py index bd6363c5..2a686cc5 100755 --- a/setup.py +++ b/setup.py @@ -85,7 +85,10 @@ setup( 'tsdb': [ 'docker', ], - + 'es': [ + 'docker', + 'elasticsearch' + ] }, tests_require=['pytest'], python_requires=">=3.10", From bf9ca4a4a863ed7b76b85b46dac4acf50268f669 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Wed, 15 Feb 2023 21:22:01 -0300 Subject: [PATCH 3/9] Generalize ahab to support elasticsearch logs and init procedure --- dockering/elastic/Dockerfile | 2 +- dockering/elastic/elasticsearch.yml | 4 +-- piker/cli/__init__.py | 10 ++++-- piker/data/_ahab.py | 47 ++++++++++++++++++----------- piker/data/elasticsearch.py | 6 ++-- piker/data/marketstore.py | 4 +-- 6 files changed, 45 insertions(+), 28 deletions(-) diff --git a/dockering/elastic/Dockerfile b/dockering/elastic/Dockerfile index 16f84cb2..f497a7a3 100644 --- a/dockering/elastic/Dockerfile +++ b/dockering/elastic/Dockerfile @@ -8,4 +8,4 @@ COPY elasticsearch.yml /usr/share/elasticsearch/config/ RUN printf "password" | ./bin/elasticsearch-keystore add -f -x "bootstrap.password" -EXPOSE 9200 +EXPOSE 19200 diff --git a/dockering/elastic/elasticsearch.yml b/dockering/elastic/elasticsearch.yml index 25eff005..fdaa905f 100644 --- a/dockering/elastic/elasticsearch.yml +++ b/dockering/elastic/elasticsearch.yml @@ -1,5 +1,5 @@ network.host: 0.0.0.0 -http.port: 9200 +http.port: 19200 -discovery.type: single-node \ No newline at end of file +discovery.type: single-node diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index d2a07b75..5bc6b2f4 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -20,6 +20,7 @@ CLI commons. ''' import os from pprint import pformat +from functools import partial import click import trio @@ -120,9 +121,12 @@ def pikerd( log.info('Spawning `elasticsearch` supervisor') ctn_ready, config, (cid, pid) = await n.start( - start_ahab, - 'elasticsearch', - start_elasticsearch, + partial( + start_ahab, + 'elasticsearch', + start_elasticsearch, + start_timeout=30.0 + ) ) log.info( diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 218d46e0..d7113b61 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -124,7 +124,7 @@ class Container: async def process_logs_until( self, - patt: str, + patt_matcher: Callable[[str], bool], bp_on_msg: bool = False, ) -> bool: ''' @@ -143,27 +143,37 @@ class Container: if not entry: continue + entry = entry.strip() try: - record = json.loads(entry.strip()) - except json.JSONDecodeError: - if 'Error' in entry: - raise RuntimeError(entry) - raise + record = json.loads(entry) + + if 'msg' in record: + msg = record['msg'] + elif 'message' in record: + msg = record['message'] + else: + raise KeyError('Unexpected log format') + + level = record['level'] + + except json.JSONDecodeError: + # if 'Error' in entry: + # raise RuntimeError(entry) + # raise + msg = entry + level = 'error' - msg = record['msg'] - level = record['level'] if msg and entry not in seen_so_far: seen_so_far.add(entry) if bp_on_msg: await tractor.breakpoint() - getattr(log, level, log.error)(f'{msg}') + getattr(log, level.lower(), log.error)(f'{msg}') - # print(f'level: {level}') - if level in ('error', 'fatal'): + if level == 'fatal': raise ApplicationLogError(msg) - if patt in msg: + if patt_matcher(msg): return True # do a checkpoint so we don't block if cancelled B) @@ -285,6 +295,7 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type + start_timeout: float = 1.0, **kwargs, @@ -300,13 +311,13 @@ async def open_ahabd( ( dcntr, cntr_config, - start_msg, - stop_msg, + start_lambda, + stop_lambda, ) = ep_func(client) cntr = Container(dcntr) - with trio.move_on_after(1): - found = await cntr.process_logs_until(start_msg) + with trio.move_on_after(start_timeout): + found = await cntr.process_logs_until(start_lambda) if not found and cntr not in client.containers.list(): raise RuntimeError( @@ -326,12 +337,13 @@ async def open_ahabd( await trio.sleep_forever() finally: - await cntr.cancel(stop_msg) + await cntr.cancel(stop_lambda) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], + start_timeout: float = 1.0, task_status: TaskStatus[ tuple[ trio.Event, @@ -379,6 +391,7 @@ async def start_ahab( async with portal.open_context( open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), + start_timeout=start_timeout ) as (ctx, first): cid, pid, cntr_config = first diff --git a/piker/data/elasticsearch.py b/piker/data/elasticsearch.py index 28028350..c3344aa3 100644 --- a/piker/data/elasticsearch.py +++ b/piker/data/elasticsearch.py @@ -93,7 +93,7 @@ def start_elasticsearch( get_console_log('info', name=__name__) dcntr: DockerContainer = client.containers.run( - 'elastic', + 'piker:elastic', network='host', detach=True, remove=True, @@ -103,6 +103,6 @@ def start_elasticsearch( dcntr, {}, # expected startup and stop msgs - "launching listener for all services...", - "exiting...", + lambda msg: msg == "started", + lambda msg: msg == "closed", ) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 88553af7..2595a2d6 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -194,8 +194,8 @@ def start_marketstore( _config, # expected startup and stop msgs - "launching tcp listener for all services...", - "exiting...", + lambda msg: "launching tcp listener for all services..." in msg, + lambda msg: "exiting..." in msg, ) From 3ce8bfa012041dbd65c4bbae3a4aea480a39d893 Mon Sep 17 00:00:00 2001 From: Esmeralda Gallardo Date: Tue, 21 Feb 2023 13:21:35 -0300 Subject: [PATCH 4/9] Moved database initialization code inside the open_pikerd context manager --- piker/_daemon.py | 52 +++++++++++++++++++++++++++++++++++++++++++ piker/cli/__init__.py | 42 +++------------------------------- 2 files changed, 55 insertions(+), 39 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index f4acf9f3..f1ced6e9 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -41,6 +41,9 @@ from .log import ( ) from .brokers import get_brokermod +from pprint import pformat +from functools import partial + log = get_logger(__name__) @@ -313,6 +316,9 @@ async def open_piker_runtime( @acm async def open_pikerd( + tsdb: bool, + es: bool, + loglevel: str | None = None, # XXX: you should pretty much never want debug mode @@ -349,12 +355,54 @@ async def open_pikerd( ): assert root_actor.accept_addr == reg_addr + if tsdb: + from piker.data._ahab import start_ahab + from piker.data.marketstore import start_marketstore + + log.info('Spawning `marketstore` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + start_ahab, + 'marketstored', + start_marketstore, + + ) + log.info( + f'`marketstored` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + + if es: + from piker.data._ahab import start_ahab + from piker.data.elastic import start_elasticsearch + + log.info('Spawning `elasticsearch` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + partial( + start_ahab, + 'elasticsearch', + start_elasticsearch, + start_timeout=30.0 + ) + ) + + log.info( + f'`elasticsearch` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + # assign globally for future daemon/task creation Services.actor_n = actor_nursery Services.service_n = service_nursery Services.debug_mode = debug_mode + + try: yield Services + finally: # TODO: is this more clever/efficient? # if 'samplerd' in Services.service_tasks: @@ -388,6 +436,8 @@ async def maybe_open_runtime( @acm async def maybe_open_pikerd( + tsdb: bool = False, + es: bool = False, loglevel: Optional[str] = None, registry_addr: None | tuple = None, @@ -436,6 +486,8 @@ async def maybe_open_pikerd( # presume pikerd role since no daemon could be found at # configured address async with open_pikerd( + tsdb=tsdb, + es=es, loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), registry_addr=registry_addr, diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 5bc6b2f4..9b6f225c 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -90,6 +90,8 @@ def pikerd( async def main(): async with ( open_pikerd( + tsdb=tsdb, + es=es, loglevel=loglevel, debug_mode=pdb, registry_addr=reg_addr, @@ -97,44 +99,6 @@ def pikerd( ), # normally delivers a ``Services`` handle trio.open_nursery() as n, ): - if tsdb: - from piker.data._ahab import start_ahab - from piker.data.marketstore import start_marketstore - - log.info('Spawning `marketstore` supervisor') - ctn_ready, config, (cid, pid) = await n.start( - start_ahab, - 'marketstored', - start_marketstore, - - ) - log.info( - f'`marketstored` up!\n' - f'pid: {pid}\n' - f'container id: {cid[:12]}\n' - f'config: {pformat(config)}' - ) - - if es: - from piker.data._ahab import start_ahab - from piker.data.elasticsearch import start_elasticsearch - - log.info('Spawning `elasticsearch` supervisor') - ctn_ready, config, (cid, pid) = await n.start( - partial( - start_ahab, - 'elasticsearch', - start_elasticsearch, - start_timeout=30.0 - ) - ) - - log.info( - f'`elasticsearch` up!\n' - f'pid: {pid}\n' - f'container id: {cid[:12]}\n' - f'config: {pformat(config)}' - ) await trio.sleep_forever() @@ -241,7 +205,7 @@ def services(config, tl, ports): def _load_clis() -> None: from ..data import marketstore # noqa - from ..data import elasticsearch + from ..data import elastic from ..data import cli # noqa from ..brokers import cli # noqa from ..ui import cli # noqa From b5cdf140367608aa90fc52af7e65c1add51cc084 Mon Sep 17 00:00:00 2001 From: Esmeralda Gallardo Date: Tue, 21 Feb 2023 13:25:53 -0300 Subject: [PATCH 5/9] Modified elasticsearch file name to 'elastic' to avoid name errors. Applied changes suggested in the pr. --- piker/data/_ahab.py | 4 ++- piker/data/{elasticsearch.py => elastic.py} | 28 ++++++++------------- piker/data/marketstore.py | 4 +-- tests/conftest.py | 4 +++ 4 files changed, 20 insertions(+), 20 deletions(-) rename piker/data/{elasticsearch.py => elastic.py} (83%) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index d7113b61..1964eb1b 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -124,6 +124,8 @@ class Container: async def process_logs_until( self, + # this is a predicate func for matching log msgs emitted by the + # underlying containerized app patt_matcher: Callable[[str], bool], bp_on_msg: bool = False, ) -> bool: @@ -152,7 +154,7 @@ class Container: elif 'message' in record: msg = record['message'] else: - raise KeyError('Unexpected log format') + raise KeyError(f'Unexpected log format\n{record}') level = record['level'] diff --git a/piker/data/elasticsearch.py b/piker/data/elastic.py similarity index 83% rename from piker/data/elasticsearch.py rename to piker/data/elastic.py index c3344aa3..f9ed7b16 100644 --- a/piker/data/elasticsearch.py +++ b/piker/data/elastic.py @@ -25,43 +25,37 @@ ''' from __future__ import annotations from contextlib import asynccontextmanager as acm -from datetime import datetime from pprint import pformat from typing import ( Any, - Optional, - Union, + # Optional, + #Union, TYPE_CHECKING, ) -import time -from math import isnan -from bidict import bidict -from msgspec.msgpack import encode, decode import pyqtgraph as pg import numpy as np import tractor -from trio_websocket import open_websocket_url -import pendulum -import purerpc + if TYPE_CHECKING: import docker from ._ahab import DockerContainer -from .feed import maybe_open_feed -from ..log import get_logger, get_console_log -from .._profile import Profiler +from piker.log import ( + get_logger, + get_console_log +) from elasticsearch import Elasticsearch -from docker.types import LogConfig + log = get_logger(__name__) # container level config _config = { - 'port': 9200, + 'port': 19200, 'log_level': 'debug', } @@ -103,6 +97,6 @@ def start_elasticsearch( dcntr, {}, # expected startup and stop msgs - lambda msg: msg == "started", - lambda msg: msg == "closed", + lambda start_msg: start_msg == "started", + lambda stop_msg: stop_msg == "closed", ) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 2595a2d6..4a6131b2 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -194,8 +194,8 @@ def start_marketstore( _config, # expected startup and stop msgs - lambda msg: "launching tcp listener for all services..." in msg, - lambda msg: "exiting..." in msg, + lambda start_msg: "launching tcp listener for all services..." in start_msg, + lambda stop_msg: "exiting..." in stop_msg, ) diff --git a/tests/conftest.py b/tests/conftest.py index 2cfaad7a..9ccc11ab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -119,6 +119,8 @@ def cse_symbols(): @acm async def _open_test_pikerd( + tsdb: bool = False, + es: bool = False, reg_addr: tuple[str, int] | None = None, **kwargs, @@ -143,6 +145,8 @@ async def _open_test_pikerd( # try: async with ( maybe_open_pikerd( + tsdb=tsdb, + es=es, registry_addr=reg_addr, **kwargs, ) as service_manager, From 4122c482ba35443a859259df8175674ae3a89b2f Mon Sep 17 00:00:00 2001 From: Esmeralda Gallardo Date: Tue, 21 Feb 2023 13:29:35 -0300 Subject: [PATCH 6/9] Added new tests for elasticsearch's and marketstore's initialization and stop --- tests/test_databases.py | 45 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 tests/test_databases.py diff --git a/tests/test_databases.py b/tests/test_databases.py new file mode 100644 index 00000000..32e9bbc1 --- /dev/null +++ b/tests/test_databases.py @@ -0,0 +1,45 @@ +import pytest +import trio + +from typing import AsyncContextManager + +from piker._daemon import Services +from piker.log import get_logger + + +# def test_marketstore( open_test_pikerd: AsyncContextManager): + +''' +Verify marketstore starts and closes correctly + +''' + + +def test_elasticsearch( + open_test_pikerd: AsyncContextManager, +): + ''' + Verify elasticsearch starts and closes correctly + + ''' + + # log = get_logger(__name__) + + # log.info('#################### Starting test ####################') + + async def main(): + port = 19200 + daemon_addr = ('127.0.0.1', port) + + async with ( + open_test_pikerd( + tsdb=False, + es=True, + reg_addr=daemon_addr, + ) as (s, i, pikerd_portal, services), + # pikerd(), + ): + assert pikerd_portal.channel.raddr == daemon_addr + + + trio.run(main) \ No newline at end of file From acc6249d8887e5b71f73eb2fd2449db09eb1a431 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Tue, 21 Feb 2023 13:58:04 -0300 Subject: [PATCH 7/9] Remove unnesesary arguments to some pikerd functions, fix container init error by switching from log reading to quering es health endpoint, fix install on ci and add more logging. --- .github/workflows/ci.yml | 5 ++++- piker/_daemon.py | 16 +++++++++------- piker/data/_ahab.py | 26 ++++++++++++++++---------- piker/data/elastic.py | 26 ++++++++++++++++++++++---- piker/data/marketstore.py | 11 +++++++++-- tests/conftest.py | 4 ---- tests/test_databases.py | 35 +++++++++++++++++------------------ 7 files changed, 77 insertions(+), 46 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ca18f2c4..f799bc22 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,13 +42,16 @@ jobs: - name: Checkout uses: actions/checkout@v3 + - name: Build DB container + run: docker build -t piker:elastic dockering/elastic + - name: Setup python uses: actions/setup-python@v3 with: python-version: '3.10' - name: Install dependencies - run: pip install -U . -r requirements-test.txt -r requirements.txt --upgrade-strategy eager + run: pip install -U .[es] -r requirements-test.txt -r requirements.txt --upgrade-strategy eager - name: Test suite run: pytest tests -rs diff --git a/piker/_daemon.py b/piker/_daemon.py index f1ced6e9..8983eccc 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -316,8 +316,6 @@ async def open_piker_runtime( @acm async def open_pikerd( - tsdb: bool, - es: bool, loglevel: str | None = None, @@ -326,6 +324,10 @@ async def open_pikerd( debug_mode: bool = False, registry_addr: None | tuple[str, int] = None, + # db init flags + tsdb: bool = False, + es: bool = False, + ) -> Services: ''' Start a root piker daemon who's lifetime extends indefinitely until @@ -383,7 +385,7 @@ async def open_pikerd( start_ahab, 'elasticsearch', start_elasticsearch, - start_timeout=30.0 + start_timeout=240.0 # high cause ci ) ) @@ -436,10 +438,10 @@ async def maybe_open_runtime( @acm async def maybe_open_pikerd( - tsdb: bool = False, - es: bool = False, loglevel: Optional[str] = None, registry_addr: None | tuple = None, + tsdb: bool = False, + es: bool = False, **kwargs, @@ -486,11 +488,11 @@ async def maybe_open_pikerd( # presume pikerd role since no daemon could be found at # configured address async with open_pikerd( - tsdb=tsdb, - es=es, loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), registry_addr=registry_addr, + tsdb=tsdb, + es=es, ) as service_manager: # in the case where we're starting up the diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 1964eb1b..2ab8680e 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -137,7 +137,13 @@ class Container: seen_so_far = self.seen_so_far while True: - logs = self.cntr.logs() + try: + logs = self.cntr.logs() + except docker.errors.NotFound: + return False + except docker.errors.APIError: + return False + entries = logs.decode().split('\n') for entry in entries: @@ -159,9 +165,6 @@ class Container: level = record['level'] except json.JSONDecodeError: - # if 'Error' in entry: - # raise RuntimeError(entry) - # raise msg = entry level = 'error' @@ -175,11 +178,11 @@ class Container: if level == 'fatal': raise ApplicationLogError(msg) - if patt_matcher(msg): + if await patt_matcher(msg): return True # do a checkpoint so we don't block if cancelled B) - await trio.sleep(0.01) + await trio.sleep(0.1) return False @@ -321,10 +324,13 @@ async def open_ahabd( with trio.move_on_after(start_timeout): found = await cntr.process_logs_until(start_lambda) - if not found and cntr not in client.containers.list(): - raise RuntimeError( - 'Failed to start `marketstore` check logs deats' - ) + if not found and dcntr not in client.containers.list(): + for entry in cntr.seen_so_far: + log.info(entry) + + raise RuntimeError( + f'Failed to start {dcntr.id} check logs deats' + ) await ctx.started(( cntr.cntr.id, diff --git a/piker/data/elastic.py b/piker/data/elastic.py index f9ed7b16..a501de4f 100644 --- a/piker/data/elastic.py +++ b/piker/data/elastic.py @@ -47,7 +47,7 @@ from piker.log import ( get_console_log ) -from elasticsearch import Elasticsearch +import asks log = get_logger(__name__) @@ -88,15 +88,33 @@ def start_elasticsearch( dcntr: DockerContainer = client.containers.run( 'piker:elastic', + name='piker-elastic', network='host', detach=True, - remove=True, + remove=True ) + async def start_matcher(msg: str): + try: + health = (await asks.get( + f'http://localhost:19200/_cat/health', + params={'format': 'json'} + )).json() + + except OSError: + log.error('couldnt reach elastic container') + return False + + log.info(health) + return health[0]['status'] == 'green' + + async def stop_matcher(msg: str): + return msg == 'closed' + return ( dcntr, {}, # expected startup and stop msgs - lambda start_msg: start_msg == "started", - lambda stop_msg: stop_msg == "closed", + start_matcher, + stop_matcher, ) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 4a6131b2..236bcfaf 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -189,13 +189,20 @@ def start_marketstore( init=True, # remove=True, ) + + async def start_matcher(msg: str): + return "launching tcp listener for all services..." in msg + + async def stop_matcher(msg: str): + return "exiting..." in msg + return ( dcntr, _config, # expected startup and stop msgs - lambda start_msg: "launching tcp listener for all services..." in start_msg, - lambda stop_msg: "exiting..." in stop_msg, + start_matcher, + stop_matcher, ) diff --git a/tests/conftest.py b/tests/conftest.py index 9ccc11ab..2cfaad7a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -119,8 +119,6 @@ def cse_symbols(): @acm async def _open_test_pikerd( - tsdb: bool = False, - es: bool = False, reg_addr: tuple[str, int] | None = None, **kwargs, @@ -145,8 +143,6 @@ async def _open_test_pikerd( # try: async with ( maybe_open_pikerd( - tsdb=tsdb, - es=es, registry_addr=reg_addr, **kwargs, ) as service_manager, diff --git a/tests/test_databases.py b/tests/test_databases.py index 32e9bbc1..907b716a 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -6,6 +6,8 @@ from typing import AsyncContextManager from piker._daemon import Services from piker.log import get_logger +from elasticsearch import Elasticsearch + # def test_marketstore( open_test_pikerd: AsyncContextManager): @@ -16,30 +18,27 @@ Verify marketstore starts and closes correctly def test_elasticsearch( - open_test_pikerd: AsyncContextManager, + open_test_pikerd: AsyncContextManager, ): - ''' + ''' Verify elasticsearch starts and closes correctly - ''' + ''' - # log = get_logger(__name__) + log = get_logger(__name__) - # log.info('#################### Starting test ####################') + # log.info('#################### Starting test ####################') - async def main(): - port = 19200 - daemon_addr = ('127.0.0.1', port) + async def main(): + port = 19200 - async with ( - open_test_pikerd( - tsdb=False, - es=True, - reg_addr=daemon_addr, - ) as (s, i, pikerd_portal, services), - # pikerd(), - ): - assert pikerd_portal.channel.raddr == daemon_addr + async with open_test_pikerd( + loglevel='info', + es=True + ) as (s, i, pikerd_portal, services): + + es = Elasticsearch(hosts=[f'http://localhost:{port}']) + assert es.info()['version']['number'] == '7.17.4' - trio.run(main) \ No newline at end of file + trio.run(main) From f96d6a04b618312d10b7ef3d7654114283bf785a Mon Sep 17 00:00:00 2001 From: Esmeralda Gallardo Date: Wed, 22 Feb 2023 13:28:07 -0300 Subject: [PATCH 8/9] Fixed UnboundLocalError on _ahab. Added test for marketstore's initialization --- piker/data/_ahab.py | 12 ++++++------ tests/test_databases.py | 38 +++++++++++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 2ab8680e..bb1d451d 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -324,13 +324,13 @@ async def open_ahabd( with trio.move_on_after(start_timeout): found = await cntr.process_logs_until(start_lambda) - if not found and dcntr not in client.containers.list(): - for entry in cntr.seen_so_far: - log.info(entry) + if not found and dcntr not in client.containers.list(): + for entry in cntr.seen_so_far: + log.info(entry) - raise RuntimeError( - f'Failed to start {dcntr.id} check logs deats' - ) + raise RuntimeError( + f'Failed to start {dcntr.id} check logs deats' + ) await ctx.started(( cntr.cntr.id, diff --git a/tests/test_databases.py b/tests/test_databases.py index 907b716a..dac64b35 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -7,28 +7,48 @@ from piker._daemon import Services from piker.log import get_logger from elasticsearch import Elasticsearch +from piker.data import marketstore + +def test_marketstore( + open_test_pikerd: AsyncContextManager, +): + + ''' + Verify marketstore starts correctly + + ''' + log = get_logger(__name__) + + async def main(): + # port = 5995 + + async with ( + open_test_pikerd( + loglevel='info', + tsdb=True + ) as (s, i, pikerd_portal, services), + marketstore.get_client() as client + ): + + assert ( + len(await client.server_version()) == + len('3862e9973da36cfc6004b88172c08f09269aaf01') + ) -# def test_marketstore( open_test_pikerd: AsyncContextManager): - -''' -Verify marketstore starts and closes correctly - -''' + trio.run(main) def test_elasticsearch( open_test_pikerd: AsyncContextManager, ): ''' - Verify elasticsearch starts and closes correctly + Verify elasticsearch starts correctly ''' log = get_logger(__name__) - # log.info('#################### Starting test ####################') - async def main(): port = 19200 From b96e2c314a1e4d80338f49549f5423e155b8e34a Mon Sep 17 00:00:00 2001 From: Esmeralda Gallardo Date: Fri, 24 Feb 2023 15:11:15 -0300 Subject: [PATCH 9/9] Minor style changes and removed unnecesary comments --- piker/data/_ahab.py | 7 ++++--- piker/data/elastic.py | 11 ----------- tests/test_databases.py | 10 ++++++---- 3 files changed, 10 insertions(+), 18 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index bb1d451d..39a5b46a 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -139,9 +139,10 @@ class Container: while True: try: logs = self.cntr.logs() - except docker.errors.NotFound: - return False - except docker.errors.APIError: + except ( + docker.errors.NotFound, + docker.errors.APIError + ): return False entries = logs.decode().split('\n') diff --git a/piker/data/elastic.py b/piker/data/elastic.py index a501de4f..43c6afd0 100644 --- a/piker/data/elastic.py +++ b/piker/data/elastic.py @@ -14,22 +14,11 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -''' -``elasticsearch`` integration. - -- client management routines -- ticK data ingest routines -- websocket client for subscribing to write triggers -- todo: tick sequence stream-cloning for testing - -''' from __future__ import annotations from contextlib import asynccontextmanager as acm from pprint import pformat from typing import ( Any, - # Optional, - #Union, TYPE_CHECKING, ) diff --git a/tests/test_databases.py b/tests/test_databases.py index dac64b35..4eb444f3 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -9,8 +9,9 @@ from piker.log import get_logger from elasticsearch import Elasticsearch from piker.data import marketstore -def test_marketstore( +def test_marketstore_startup_and_version( open_test_pikerd: AsyncContextManager, + loglevel, ): ''' @@ -24,7 +25,7 @@ def test_marketstore( async with ( open_test_pikerd( - loglevel='info', + loglevel=loglevel, tsdb=True ) as (s, i, pikerd_portal, services), marketstore.get_client() as client @@ -39,8 +40,9 @@ def test_marketstore( trio.run(main) -def test_elasticsearch( +def test_elasticsearch_startup_and_version( open_test_pikerd: AsyncContextManager, + loglevel, ): ''' Verify elasticsearch starts correctly @@ -53,7 +55,7 @@ def test_elasticsearch( port = 19200 async with open_test_pikerd( - loglevel='info', + loglevel=loglevel, es=True ) as (s, i, pikerd_portal, services):