Support startup-config overrides to `ahabd` super

With the addition of a new `elastixsearch` docker support in
https://github.com/pikers/piker/pull/464, adjustments were made
to container startup sync logic (particularly the `trio` checkpoint
sleep period - which itself is a hack around a sync client api) which
caused a regression in upstream startup logic wherein container error
logs were not being bubbled up correctly causing a silent failure mode:

- `marketstore` container started with corrupt input config
- `ahabd` super code timed out on startup phase due to a larger log
  polling period, skipped processing startup logs from the container,
  and continued on as though the container was started
- history client fails on grpc connection with no clear error on why the
  connection failed.

Here we revert to the old poll period (1ms) to avoid any more silent
failures and further extend supervisor control through a configuration
override mechanism. To address the underlying design issue, this patch
adds support for container-endpoint-callbacks to override supervisor
startup configuration parameters via the 2nd value in their returned
tuple: the already delivered configuration `dict` value.

The current exposed values include:
    {
        'startup_timeout': 1.0,
        'startup_query_period': 0.001,
        'log_msg_key': 'msg',
    },

This allows for container specific control over the startup-sync query
period (the hack mentioned above)  as well as the expected log msg key
and of course the startup timeout.
service_subpkg
Tyler Goodlet 2023-03-08 12:56:56 -05:00
parent fe0695fb7b
commit 7b196b1b97
1 changed files with 60 additions and 25 deletions

View File

@ -18,6 +18,7 @@
Supervisor for docker with included specific-image service helpers.
'''
from collections import ChainMap
import os
import time
from typing import (
@ -124,10 +125,19 @@ class Container:
async def process_logs_until(
self,
log_msg_key: str,
# this is a predicate func for matching log msgs emitted by the
# underlying containerized app
patt_matcher: Callable[[str], bool],
bp_on_msg: bool = False,
# XXX WARNING XXX: do not touch this sleep value unless
# you know what you are doing! the value is critical to
# making sure the caller code inside the startup context
# does not timeout BEFORE we receive a match on the
# ``patt_matcher()`` predicate above.
checkpoint_period: float = 0.001,
) -> bool:
'''
Attempt to capture container log messages and relay through our
@ -137,12 +147,14 @@ class Container:
seen_so_far = self.seen_so_far
while True:
logs = self.cntr.logs()
try:
logs = self.cntr.logs()
except (
docker.errors.NotFound,
docker.errors.APIError
):
log.exception('Failed to parse logs?')
return False
entries = logs.decode().split('\n')
@ -155,25 +167,23 @@ class Container:
entry = entry.strip()
try:
record = json.loads(entry)
if 'msg' in record:
msg = record['msg']
elif 'message' in record:
msg = record['message']
else:
raise KeyError(f'Unexpected log format\n{record}')
msg = record[log_msg_key]
level = record['level']
except json.JSONDecodeError:
msg = entry
level = 'error'
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
# TODO: do we need a more general mechanism
# for these kinda of "log record entries"?
# if 'Error' in entry:
# raise RuntimeError(entry)
if (
msg
and entry not in seen_so_far
):
seen_so_far.add(entry)
getattr(log, level.lower(), log.error)(f'{msg}')
if level == 'fatal':
@ -183,7 +193,7 @@ class Container:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.1)
await trio.sleep(checkpoint_period)
return False
@ -301,7 +311,6 @@ class Container:
async def open_ahabd(
ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type
start_timeout: float = 1.0,
**kwargs,
@ -322,16 +331,39 @@ async def open_ahabd(
) = ep_func(client)
cntr = Container(dcntr)
with trio.move_on_after(start_timeout):
found = await cntr.process_logs_until(start_lambda)
conf: ChainMap[str, Any] = ChainMap(
if not found and dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)
# container specific
cntr_config,
raise RuntimeError(
f'Failed to start {dcntr.id} check logs deats'
)
# defaults
{
'startup_timeout': 1.0,
'startup_query_period': 0.001,
'log_msg_key': 'msg',
},
)
found = False
with trio.move_on_after(conf['startup_timeout']):
found = await cntr.process_logs_until(
conf['log_msg_key'],
start_lambda,
checkpoint_period=conf['startup_query_period'],
)
# XXX: if we timeout on finding the "startup msg" we expect then
# we want to FOR SURE raise an error upwards!
if (
not found
and dcntr not in client.containers.list()
):
for entry in cntr.seen_so_far:
log.info(entry)
raise RuntimeError(
f'Failed to start {dcntr.id} check logs deats'
)
await ctx.started((
cntr.cntr.id,
@ -346,13 +378,17 @@ async def open_ahabd(
await trio.sleep_forever()
finally:
# TODO: ensure loglevel can be set and teardown logs are
# reported if possible on error or cancel..
# with trio.CancelScope(shield=True):
await cntr.cancel(stop_lambda)
async def start_ahab(
service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer],
start_timeout: float = 1.0,
loglevel: str | None = None,
task_status: TaskStatus[
tuple[
trio.Event,
@ -400,7 +436,6 @@ async def start_ahab(
async with portal.open_context(
open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)),
start_timeout=start_timeout
) as (ctx, first):
cid, pid, cntr_config = first