From bf9ca4a4a863ed7b76b85b46dac4acf50268f669 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Wed, 15 Feb 2023 21:22:01 -0300 Subject: [PATCH] Generalize ahab to support elasticsearch logs and init procedure --- dockering/elastic/Dockerfile | 2 +- dockering/elastic/elasticsearch.yml | 4 +-- piker/cli/__init__.py | 10 ++++-- piker/data/_ahab.py | 47 ++++++++++++++++++----------- piker/data/elasticsearch.py | 6 ++-- piker/data/marketstore.py | 4 +-- 6 files changed, 45 insertions(+), 28 deletions(-) diff --git a/dockering/elastic/Dockerfile b/dockering/elastic/Dockerfile index 16f84cb2..f497a7a3 100644 --- a/dockering/elastic/Dockerfile +++ b/dockering/elastic/Dockerfile @@ -8,4 +8,4 @@ COPY elasticsearch.yml /usr/share/elasticsearch/config/ RUN printf "password" | ./bin/elasticsearch-keystore add -f -x "bootstrap.password" -EXPOSE 9200 +EXPOSE 19200 diff --git a/dockering/elastic/elasticsearch.yml b/dockering/elastic/elasticsearch.yml index 25eff005..fdaa905f 100644 --- a/dockering/elastic/elasticsearch.yml +++ b/dockering/elastic/elasticsearch.yml @@ -1,5 +1,5 @@ network.host: 0.0.0.0 -http.port: 9200 +http.port: 19200 -discovery.type: single-node \ No newline at end of file +discovery.type: single-node diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index d2a07b75..5bc6b2f4 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -20,6 +20,7 @@ CLI commons. ''' import os from pprint import pformat +from functools import partial import click import trio @@ -120,9 +121,12 @@ def pikerd( log.info('Spawning `elasticsearch` supervisor') ctn_ready, config, (cid, pid) = await n.start( - start_ahab, - 'elasticsearch', - start_elasticsearch, + partial( + start_ahab, + 'elasticsearch', + start_elasticsearch, + start_timeout=30.0 + ) ) log.info( diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 218d46e0..d7113b61 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -124,7 +124,7 @@ class Container: async def process_logs_until( self, - patt: str, + patt_matcher: Callable[[str], bool], bp_on_msg: bool = False, ) -> bool: ''' @@ -143,27 +143,37 @@ class Container: if not entry: continue + entry = entry.strip() try: - record = json.loads(entry.strip()) - except json.JSONDecodeError: - if 'Error' in entry: - raise RuntimeError(entry) - raise + record = json.loads(entry) + + if 'msg' in record: + msg = record['msg'] + elif 'message' in record: + msg = record['message'] + else: + raise KeyError('Unexpected log format') + + level = record['level'] + + except json.JSONDecodeError: + # if 'Error' in entry: + # raise RuntimeError(entry) + # raise + msg = entry + level = 'error' - msg = record['msg'] - level = record['level'] if msg and entry not in seen_so_far: seen_so_far.add(entry) if bp_on_msg: await tractor.breakpoint() - getattr(log, level, log.error)(f'{msg}') + getattr(log, level.lower(), log.error)(f'{msg}') - # print(f'level: {level}') - if level in ('error', 'fatal'): + if level == 'fatal': raise ApplicationLogError(msg) - if patt in msg: + if patt_matcher(msg): return True # do a checkpoint so we don't block if cancelled B) @@ -285,6 +295,7 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type + start_timeout: float = 1.0, **kwargs, @@ -300,13 +311,13 @@ async def open_ahabd( ( dcntr, cntr_config, - start_msg, - stop_msg, + start_lambda, + stop_lambda, ) = ep_func(client) cntr = Container(dcntr) - with trio.move_on_after(1): - found = await cntr.process_logs_until(start_msg) + with trio.move_on_after(start_timeout): + found = await cntr.process_logs_until(start_lambda) if not found and cntr not in client.containers.list(): raise RuntimeError( @@ -326,12 +337,13 @@ async def open_ahabd( await trio.sleep_forever() finally: - await cntr.cancel(stop_msg) + await cntr.cancel(stop_lambda) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], + start_timeout: float = 1.0, task_status: TaskStatus[ tuple[ trio.Event, @@ -379,6 +391,7 @@ async def start_ahab( async with portal.open_context( open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), + start_timeout=start_timeout ) as (ctx, first): cid, pid, cntr_config = first diff --git a/piker/data/elasticsearch.py b/piker/data/elasticsearch.py index 28028350..c3344aa3 100644 --- a/piker/data/elasticsearch.py +++ b/piker/data/elasticsearch.py @@ -93,7 +93,7 @@ def start_elasticsearch( get_console_log('info', name=__name__) dcntr: DockerContainer = client.containers.run( - 'elastic', + 'piker:elastic', network='host', detach=True, remove=True, @@ -103,6 +103,6 @@ def start_elasticsearch( dcntr, {}, # expected startup and stop msgs - "launching listener for all services...", - "exiting...", + lambda msg: msg == "started", + lambda msg: msg == "closed", ) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 88553af7..2595a2d6 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -194,8 +194,8 @@ def start_marketstore( _config, # expected startup and stop msgs - "launching tcp listener for all services...", - "exiting...", + lambda msg: "launching tcp listener for all services..." in msg, + lambda msg: "exiting..." in msg, )