Generalize ahab to support elasticsearch logs and init procedure

explicit_write_pps_on_exit
Guillermo Rodriguez 2023-02-15 21:22:01 -03:00
parent 17a4fe4b2f
commit bf9ca4a4a8
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
6 changed files with 45 additions and 28 deletions

View File

@ -8,4 +8,4 @@ COPY elasticsearch.yml /usr/share/elasticsearch/config/
RUN printf "password" | ./bin/elasticsearch-keystore add -f -x "bootstrap.password" RUN printf "password" | ./bin/elasticsearch-keystore add -f -x "bootstrap.password"
EXPOSE 9200 EXPOSE 19200

View File

@ -1,5 +1,5 @@
network.host: 0.0.0.0 network.host: 0.0.0.0
http.port: 9200 http.port: 19200
discovery.type: single-node discovery.type: single-node

View File

@ -20,6 +20,7 @@ CLI commons.
''' '''
import os import os
from pprint import pformat from pprint import pformat
from functools import partial
import click import click
import trio import trio
@ -120,9 +121,12 @@ def pikerd(
log.info('Spawning `elasticsearch` supervisor') log.info('Spawning `elasticsearch` supervisor')
ctn_ready, config, (cid, pid) = await n.start( ctn_ready, config, (cid, pid) = await n.start(
partial(
start_ahab, start_ahab,
'elasticsearch', 'elasticsearch',
start_elasticsearch, start_elasticsearch,
start_timeout=30.0
)
) )
log.info( log.info(

View File

@ -124,7 +124,7 @@ class Container:
async def process_logs_until( async def process_logs_until(
self, self,
patt: str, patt_matcher: Callable[[str], bool],
bp_on_msg: bool = False, bp_on_msg: bool = False,
) -> bool: ) -> bool:
''' '''
@ -143,27 +143,37 @@ class Container:
if not entry: if not entry:
continue continue
entry = entry.strip()
try: try:
record = json.loads(entry.strip()) record = json.loads(entry)
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
if 'msg' in record:
msg = record['msg'] msg = record['msg']
elif 'message' in record:
msg = record['message']
else:
raise KeyError('Unexpected log format')
level = record['level'] level = record['level']
except json.JSONDecodeError:
# if 'Error' in entry:
# raise RuntimeError(entry)
# raise
msg = entry
level = 'error'
if msg and entry not in seen_so_far: if msg and entry not in seen_so_far:
seen_so_far.add(entry) seen_so_far.add(entry)
if bp_on_msg: if bp_on_msg:
await tractor.breakpoint() await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}') getattr(log, level.lower(), log.error)(f'{msg}')
# print(f'level: {level}') if level == 'fatal':
if level in ('error', 'fatal'):
raise ApplicationLogError(msg) raise ApplicationLogError(msg)
if patt in msg: if patt_matcher(msg):
return True return True
# do a checkpoint so we don't block if cancelled B) # do a checkpoint so we don't block if cancelled B)
@ -285,6 +295,7 @@ class Container:
async def open_ahabd( async def open_ahabd(
ctx: tractor.Context, ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type endpoint: str, # ns-pointer str-msg-type
start_timeout: float = 1.0,
**kwargs, **kwargs,
@ -300,13 +311,13 @@ async def open_ahabd(
( (
dcntr, dcntr,
cntr_config, cntr_config,
start_msg, start_lambda,
stop_msg, stop_lambda,
) = ep_func(client) ) = ep_func(client)
cntr = Container(dcntr) cntr = Container(dcntr)
with trio.move_on_after(1): with trio.move_on_after(start_timeout):
found = await cntr.process_logs_until(start_msg) found = await cntr.process_logs_until(start_lambda)
if not found and cntr not in client.containers.list(): if not found and cntr not in client.containers.list():
raise RuntimeError( raise RuntimeError(
@ -326,12 +337,13 @@ async def open_ahabd(
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
await cntr.cancel(stop_msg) await cntr.cancel(stop_lambda)
async def start_ahab( async def start_ahab(
service_name: str, service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer], endpoint: Callable[docker.DockerClient, DockerContainer],
start_timeout: float = 1.0,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.Event, trio.Event,
@ -379,6 +391,7 @@ async def start_ahab(
async with portal.open_context( async with portal.open_context(
open_ahabd, open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)), endpoint=str(NamespacePath.from_ref(endpoint)),
start_timeout=start_timeout
) as (ctx, first): ) as (ctx, first):
cid, pid, cntr_config = first cid, pid, cntr_config = first

View File

@ -93,7 +93,7 @@ def start_elasticsearch(
get_console_log('info', name=__name__) get_console_log('info', name=__name__)
dcntr: DockerContainer = client.containers.run( dcntr: DockerContainer = client.containers.run(
'elastic', 'piker:elastic',
network='host', network='host',
detach=True, detach=True,
remove=True, remove=True,
@ -103,6 +103,6 @@ def start_elasticsearch(
dcntr, dcntr,
{}, {},
# expected startup and stop msgs # expected startup and stop msgs
"launching listener for all services...", lambda msg: msg == "started",
"exiting...", lambda msg: msg == "closed",
) )

View File

@ -194,8 +194,8 @@ def start_marketstore(
_config, _config,
# expected startup and stop msgs # expected startup and stop msgs
"launching tcp listener for all services...", lambda msg: "launching tcp listener for all services..." in msg,
"exiting...", lambda msg: "exiting..." in msg,
) )