Merge pull request #464 from pikers/elasticsearch_integration

Elasticsearch integration
explicit_write_pps_on_exit
Guillermo Rodriguez 2023-02-24 16:38:37 -03:00 committed by GitHub
commit 47bf45f30e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 316 additions and 43 deletions

View File

@ -42,13 +42,16 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: Build DB container
run: docker build -t piker:elastic dockering/elastic
- name: Setup python - name: Setup python
uses: actions/setup-python@v3 uses: actions/setup-python@v3
with: with:
python-version: '3.10' python-version: '3.10'
- name: Install dependencies - name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements.txt --upgrade-strategy eager run: pip install -U .[es] -r requirements-test.txt -r requirements.txt --upgrade-strategy eager
- name: Test suite - name: Test suite
run: pytest tests -rs run: pytest tests -rs

View File

@ -0,0 +1,11 @@
FROM elasticsearch:7.17.4
ENV ES_JAVA_OPTS "-Xms2g -Xmx2g"
ENV ELASTIC_USERNAME "elastic"
ENV ELASTIC_PASSWORD "password"
COPY elasticsearch.yml /usr/share/elasticsearch/config/
RUN printf "password" | ./bin/elasticsearch-keystore add -f -x "bootstrap.password"
EXPOSE 19200

View File

@ -0,0 +1,5 @@
network.host: 0.0.0.0
http.port: 19200
discovery.type: single-node

View File

@ -41,6 +41,9 @@ from .log import (
) )
from .brokers import get_brokermod from .brokers import get_brokermod
from pprint import pformat
from functools import partial
log = get_logger(__name__) log = get_logger(__name__)
@ -313,6 +316,7 @@ async def open_piker_runtime(
@acm @acm
async def open_pikerd( async def open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
# XXX: you should pretty much never want debug mode # XXX: you should pretty much never want debug mode
@ -320,6 +324,10 @@ async def open_pikerd(
debug_mode: bool = False, debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None, registry_addr: None | tuple[str, int] = None,
# db init flags
tsdb: bool = False,
es: bool = False,
) -> Services: ) -> Services:
''' '''
Start a root piker daemon who's lifetime extends indefinitely until Start a root piker daemon who's lifetime extends indefinitely until
@ -349,12 +357,54 @@ async def open_pikerd(
): ):
assert root_actor.accept_addr == reg_addr assert root_actor.accept_addr == reg_addr
if tsdb:
from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
start_ahab,
'marketstored',
start_marketstore,
)
log.info(
f'`marketstored` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
if es:
from piker.data._ahab import start_ahab
from piker.data.elastic import start_elasticsearch
log.info('Spawning `elasticsearch` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
partial(
start_ahab,
'elasticsearch',
start_elasticsearch,
start_timeout=240.0 # high cause ci
)
)
log.info(
f'`elasticsearch` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
# assign globally for future daemon/task creation # assign globally for future daemon/task creation
Services.actor_n = actor_nursery Services.actor_n = actor_nursery
Services.service_n = service_nursery Services.service_n = service_nursery
Services.debug_mode = debug_mode Services.debug_mode = debug_mode
try: try:
yield Services yield Services
finally: finally:
# TODO: is this more clever/efficient? # TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks: # if 'samplerd' in Services.service_tasks:
@ -390,6 +440,8 @@ async def maybe_open_runtime(
async def maybe_open_pikerd( async def maybe_open_pikerd(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
registry_addr: None | tuple = None, registry_addr: None | tuple = None,
tsdb: bool = False,
es: bool = False,
**kwargs, **kwargs,
@ -439,6 +491,8 @@ async def maybe_open_pikerd(
loglevel=loglevel, loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False), debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr, registry_addr=registry_addr,
tsdb=tsdb,
es=es,
) as service_manager: ) as service_manager:
# in the case where we're starting up the # in the case where we're starting up the

View File

@ -20,6 +20,7 @@ CLI commons.
''' '''
import os import os
from pprint import pformat from pprint import pformat
from functools import partial
import click import click
import trio import trio
@ -48,6 +49,11 @@ log = get_logger('cli')
is_flag=True, is_flag=True,
help='Enable local ``marketstore`` instance' help='Enable local ``marketstore`` instance'
) )
@click.option(
'--es',
is_flag=True,
help='Enable local ``elasticsearch`` instance'
)
def pikerd( def pikerd(
loglevel: str, loglevel: str,
host: str, host: str,
@ -55,11 +61,13 @@ def pikerd(
tl: bool, tl: bool,
pdb: bool, pdb: bool,
tsdb: bool, tsdb: bool,
es: bool,
): ):
''' '''
Spawn the piker broker-daemon. Spawn the piker broker-daemon.
''' '''
from .._daemon import open_pikerd from .._daemon import open_pikerd
log = get_console_log(loglevel) log = get_console_log(loglevel)
@ -80,9 +88,10 @@ def pikerd(
) )
async def main(): async def main():
async with ( async with (
open_pikerd( open_pikerd(
tsdb=tsdb,
es=es,
loglevel=loglevel, loglevel=loglevel,
debug_mode=pdb, debug_mode=pdb,
registry_addr=reg_addr, registry_addr=reg_addr,
@ -90,23 +99,6 @@ def pikerd(
), # normally delivers a ``Services`` handle ), # normally delivers a ``Services`` handle
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
if tsdb:
from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await n.start(
start_ahab,
'marketstored',
start_marketstore,
)
log.info(
f'`marketstored` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
await trio.sleep_forever() await trio.sleep_forever()
@ -213,6 +205,7 @@ def services(config, tl, ports):
def _load_clis() -> None: def _load_clis() -> None:
from ..data import marketstore # noqa from ..data import marketstore # noqa
from ..data import elastic
from ..data import cli # noqa from ..data import cli # noqa
from ..brokers import cli # noqa from ..brokers import cli # noqa
from ..ui import cli # noqa from ..ui import cli # noqa

View File

@ -124,7 +124,9 @@ class Container:
async def process_logs_until( async def process_logs_until(
self, self,
patt: str, # this is a predicate func for matching log msgs emitted by the
# underlying containerized app
patt_matcher: Callable[[str], bool],
bp_on_msg: bool = False, bp_on_msg: bool = False,
) -> bool: ) -> bool:
''' '''
@ -135,7 +137,14 @@ class Container:
seen_so_far = self.seen_so_far seen_so_far = self.seen_so_far
while True: while True:
logs = self.cntr.logs() try:
logs = self.cntr.logs()
except (
docker.errors.NotFound,
docker.errors.APIError
):
return False
entries = logs.decode().split('\n') entries = logs.decode().split('\n')
for entry in entries: for entry in entries:
@ -143,31 +152,38 @@ class Container:
if not entry: if not entry:
continue continue
entry = entry.strip()
try: try:
record = json.loads(entry.strip()) record = json.loads(entry)
except json.JSONDecodeError:
if 'Error' in entry: if 'msg' in record:
raise RuntimeError(entry) msg = record['msg']
raise elif 'message' in record:
msg = record['message']
else:
raise KeyError(f'Unexpected log format\n{record}')
level = record['level']
except json.JSONDecodeError:
msg = entry
level = 'error'
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far: if msg and entry not in seen_so_far:
seen_so_far.add(entry) seen_so_far.add(entry)
if bp_on_msg: if bp_on_msg:
await tractor.breakpoint() await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}') getattr(log, level.lower(), log.error)(f'{msg}')
# print(f'level: {level}') if level == 'fatal':
if level in ('error', 'fatal'):
raise ApplicationLogError(msg) raise ApplicationLogError(msg)
if patt in msg: if await patt_matcher(msg):
return True return True
# do a checkpoint so we don't block if cancelled B) # do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01) await trio.sleep(0.1)
return False return False
@ -285,6 +301,7 @@ class Container:
async def open_ahabd( async def open_ahabd(
ctx: tractor.Context, ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type endpoint: str, # ns-pointer str-msg-type
start_timeout: float = 1.0,
**kwargs, **kwargs,
@ -300,17 +317,20 @@ async def open_ahabd(
( (
dcntr, dcntr,
cntr_config, cntr_config,
start_msg, start_lambda,
stop_msg, stop_lambda,
) = ep_func(client) ) = ep_func(client)
cntr = Container(dcntr) cntr = Container(dcntr)
with trio.move_on_after(1): with trio.move_on_after(start_timeout):
found = await cntr.process_logs_until(start_msg) found = await cntr.process_logs_until(start_lambda)
if not found and dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)
if not found and cntr not in client.containers.list():
raise RuntimeError( raise RuntimeError(
'Failed to start `marketstore` check logs deats' f'Failed to start {dcntr.id} check logs deats'
) )
await ctx.started(( await ctx.started((
@ -326,12 +346,13 @@ async def open_ahabd(
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
await cntr.cancel(stop_msg) await cntr.cancel(stop_lambda)
async def start_ahab( async def start_ahab(
service_name: str, service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer], endpoint: Callable[docker.DockerClient, DockerContainer],
start_timeout: float = 1.0,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.Event, trio.Event,
@ -379,6 +400,7 @@ async def start_ahab(
async with portal.open_context( async with portal.open_context(
open_ahabd, open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)), endpoint=str(NamespacePath.from_ref(endpoint)),
start_timeout=start_timeout
) as (ctx, first): ) as (ctx, first):
cid, pid, cntr_config = first cid, pid, cntr_config = first

View File

@ -0,0 +1,109 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
TYPE_CHECKING,
)
import pyqtgraph as pg
import numpy as np
import tractor
if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
from piker.log import (
get_logger,
get_console_log
)
import asks
log = get_logger(__name__)
# container level config
_config = {
'port': 19200,
'log_level': 'debug',
}
def start_elasticsearch(
client: docker.DockerClient,
**kwargs,
) -> tuple[DockerContainer, dict[str, Any]]:
'''
Start and supervise an elasticsearch instance with its config bind-mounted
in from the piker config directory on the system.
The equivalent cli cmd to this code is:
sudo docker run \
-itd \
--rm \
--network=host \
--mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \
--env "elastic_username=elastic" \
--env "elastic_password=password" \
--env "xpack.security.enabled=false" \
elastic
'''
import docker
get_console_log('info', name=__name__)
dcntr: DockerContainer = client.containers.run(
'piker:elastic',
name='piker-elastic',
network='host',
detach=True,
remove=True
)
async def start_matcher(msg: str):
try:
health = (await asks.get(
f'http://localhost:19200/_cat/health',
params={'format': 'json'}
)).json()
except OSError:
log.error('couldnt reach elastic container')
return False
log.info(health)
return health[0]['status'] == 'green'
async def stop_matcher(msg: str):
return msg == 'closed'
return (
dcntr,
{},
# expected startup and stop msgs
start_matcher,
stop_matcher,
)

View File

@ -189,13 +189,20 @@ def start_marketstore(
init=True, init=True,
# remove=True, # remove=True,
) )
async def start_matcher(msg: str):
return "launching tcp listener for all services..." in msg
async def stop_matcher(msg: str):
return "exiting..." in msg
return ( return (
dcntr, dcntr,
_config, _config,
# expected startup and stop msgs # expected startup and stop msgs
"launching tcp listener for all services...", start_matcher,
"exiting...", stop_matcher,
) )

View File

@ -85,7 +85,10 @@ setup(
'tsdb': [ 'tsdb': [
'docker', 'docker',
], ],
'es': [
'docker',
'elasticsearch'
]
}, },
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.10", python_requires=">=3.10",

View File

@ -0,0 +1,66 @@
import pytest
import trio
from typing import AsyncContextManager
from piker._daemon import Services
from piker.log import get_logger
from elasticsearch import Elasticsearch
from piker.data import marketstore
def test_marketstore_startup_and_version(
open_test_pikerd: AsyncContextManager,
loglevel,
):
'''
Verify marketstore starts correctly
'''
log = get_logger(__name__)
async def main():
# port = 5995
async with (
open_test_pikerd(
loglevel=loglevel,
tsdb=True
) as (s, i, pikerd_portal, services),
marketstore.get_client() as client
):
assert (
len(await client.server_version()) ==
len('3862e9973da36cfc6004b88172c08f09269aaf01')
)
trio.run(main)
def test_elasticsearch_startup_and_version(
open_test_pikerd: AsyncContextManager,
loglevel,
):
'''
Verify elasticsearch starts correctly
'''
log = get_logger(__name__)
async def main():
port = 19200
async with open_test_pikerd(
loglevel=loglevel,
es=True
) as (s, i, pikerd_portal, services):
es = Elasticsearch(hosts=[f'http://localhost:{port}'])
assert es.info()['version']['number'] == '7.17.4'
trio.run(main)