Support startup-config overrides to `ahabd` super

With the addition of a new `elastixsearch` docker support in
https://github.com/pikers/piker/pull/464, adjustments were made
to container startup sync logic (particularly the `trio` checkpoint
sleep period - which itself is a hack around a sync client api) which
caused a regression in upstream startup logic wherein container error
logs were not being bubbled up correctly causing a silent failure mode:

- `marketstore` container started with corrupt input config
- `ahabd` super code timed out on startup phase due to a larger log
  polling period, skipped processing startup logs from the container,
  and continued on as though the container was started
- history client fails on grpc connection with no clear error on why the
  connection failed.

Here we revert to the old poll period (1ms) to avoid any more silent
failures and further extend supervisor control through a configuration
override mechanism. To address the underlying design issue, this patch
adds support for container-endpoint-callbacks to override supervisor
startup configuration parameters via the 2nd value in their returned
tuple: the already delivered configuration `dict` value.

The current exposed values include:
    {
        'startup_timeout': 1.0,
        'startup_query_period': 0.001,
        'log_msg_key': 'msg',
    },

This allows for container specific control over the startup-sync query
period (the hack mentioned above)  as well as the expected log msg key
and of course the startup timeout.
service_subpkg_backup
Tyler Goodlet 2023-03-08 12:56:56 -05:00
parent 58f39d1829
commit d3272ede7a
1 changed files with 60 additions and 25 deletions

View File

@ -18,6 +18,7 @@
Supervisor for docker with included specific-image service helpers. Supervisor for docker with included specific-image service helpers.
''' '''
from collections import ChainMap
import os import os
import time import time
from typing import ( from typing import (
@ -124,10 +125,19 @@ class Container:
async def process_logs_until( async def process_logs_until(
self, self,
log_msg_key: str,
# this is a predicate func for matching log msgs emitted by the # this is a predicate func for matching log msgs emitted by the
# underlying containerized app # underlying containerized app
patt_matcher: Callable[[str], bool], patt_matcher: Callable[[str], bool],
bp_on_msg: bool = False,
# XXX WARNING XXX: do not touch this sleep value unless
# you know what you are doing! the value is critical to
# making sure the caller code inside the startup context
# does not timeout BEFORE we receive a match on the
# ``patt_matcher()`` predicate above.
checkpoint_period: float = 0.001,
) -> bool: ) -> bool:
''' '''
Attempt to capture container log messages and relay through our Attempt to capture container log messages and relay through our
@ -137,12 +147,14 @@ class Container:
seen_so_far = self.seen_so_far seen_so_far = self.seen_so_far
while True: while True:
logs = self.cntr.logs()
try: try:
logs = self.cntr.logs() logs = self.cntr.logs()
except ( except (
docker.errors.NotFound, docker.errors.NotFound,
docker.errors.APIError docker.errors.APIError
): ):
log.exception('Failed to parse logs?')
return False return False
entries = logs.decode().split('\n') entries = logs.decode().split('\n')
@ -155,25 +167,23 @@ class Container:
entry = entry.strip() entry = entry.strip()
try: try:
record = json.loads(entry) record = json.loads(entry)
msg = record[log_msg_key]
if 'msg' in record:
msg = record['msg']
elif 'message' in record:
msg = record['message']
else:
raise KeyError(f'Unexpected log format\n{record}')
level = record['level'] level = record['level']
except json.JSONDecodeError: except json.JSONDecodeError:
msg = entry msg = entry
level = 'error' level = 'error'
if msg and entry not in seen_so_far: # TODO: do we need a more general mechanism
seen_so_far.add(entry) # for these kinda of "log record entries"?
if bp_on_msg: # if 'Error' in entry:
await tractor.breakpoint() # raise RuntimeError(entry)
if (
msg
and entry not in seen_so_far
):
seen_so_far.add(entry)
getattr(log, level.lower(), log.error)(f'{msg}') getattr(log, level.lower(), log.error)(f'{msg}')
if level == 'fatal': if level == 'fatal':
@ -183,7 +193,7 @@ class Container:
return True return True
# do a checkpoint so we don't block if cancelled B) # do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.1) await trio.sleep(checkpoint_period)
return False return False
@ -301,7 +311,6 @@ class Container:
async def open_ahabd( async def open_ahabd(
ctx: tractor.Context, ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type endpoint: str, # ns-pointer str-msg-type
start_timeout: float = 1.0,
**kwargs, **kwargs,
@ -322,16 +331,39 @@ async def open_ahabd(
) = ep_func(client) ) = ep_func(client)
cntr = Container(dcntr) cntr = Container(dcntr)
with trio.move_on_after(start_timeout): conf: ChainMap[str, Any] = ChainMap(
found = await cntr.process_logs_until(start_lambda)
if not found and dcntr not in client.containers.list(): # container specific
for entry in cntr.seen_so_far: cntr_config,
log.info(entry)
raise RuntimeError( # defaults
f'Failed to start {dcntr.id} check logs deats' {
) 'startup_timeout': 1.0,
'startup_query_period': 0.001,
'log_msg_key': 'msg',
},
)
found = False
with trio.move_on_after(conf['startup_timeout']):
found = await cntr.process_logs_until(
conf['log_msg_key'],
start_lambda,
checkpoint_period=conf['startup_query_period'],
)
# XXX: if we timeout on finding the "startup msg" we expect then
# we want to FOR SURE raise an error upwards!
if (
not found
and dcntr not in client.containers.list()
):
for entry in cntr.seen_so_far:
log.info(entry)
raise RuntimeError(
f'Failed to start {dcntr.id} check logs deats'
)
await ctx.started(( await ctx.started((
cntr.cntr.id, cntr.cntr.id,
@ -346,13 +378,17 @@ async def open_ahabd(
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
# TODO: ensure loglevel can be set and teardown logs are
# reported if possible on error or cancel..
# with trio.CancelScope(shield=True):
await cntr.cancel(stop_lambda) await cntr.cancel(stop_lambda)
async def start_ahab( async def start_ahab(
service_name: str, service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer], endpoint: Callable[docker.DockerClient, DockerContainer],
start_timeout: float = 1.0, loglevel: str | None = None,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.Event, trio.Event,
@ -400,7 +436,6 @@ async def start_ahab(
async with portal.open_context( async with portal.open_context(
open_ahabd, open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)), endpoint=str(NamespacePath.from_ref(endpoint)),
start_timeout=start_timeout
) as (ctx, first): ) as (ctx, first):
cid, pid, cntr_config = first cid, pid, cntr_config = first