From 7694419e712bc7c11f756a87f1aba1cce5653c80 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 14:28:48 -0500 Subject: [PATCH] Background docker-container logs processing Previously we would make the `ahabd` supervisor-actor sync to docker container startup using pseudo-blocking log message processing. This has issues, - we're forced to do a hacky "yield back to `trio`" in order to be "fake async" when reading the log stream and further, - blocking on a message is fragile and often slow. Instead, run the log processor in a background task and in the parent task poll for the container to be in the client list using a similar pseudo-async poll pattern. This allows the super to `Context.started()` sooner (when the container is actually registered as "up") and thus unblock its (remote) caller faster whilst still doing full log msg proxying! Deatz: - adds `Container.cuid: str` a unique container id for logging. - correctly proxy through the `loglevel: str` from `pikerd` caller task. - shield around `Container.cancel()` in the teardown block and use cancel level logging in that method. --- piker/data/_ahab.py | 123 +++++++++++++++++++++++++++++++------------- 1 file changed, 86 insertions(+), 37 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 66b41f38..d2e042e3 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers. ''' from collections import ChainMap +from functools import partial import os import time from typing import ( @@ -46,7 +47,10 @@ from requests.exceptions import ( ReadTimeout, ) -from ..log import get_logger, get_console_log +from ..log import ( + get_logger, + get_console_log, +) from .. import config log = get_logger(__name__) @@ -197,6 +201,11 @@ class Container: return False + @property + def cuid(self) -> str: + fqcn: str = self.cntr.attrs['Config']['Image'] + return f'{fqcn}[{self.cntr.short_id}]' + def try_signal( self, signal: str = 'SIGINT', @@ -232,17 +241,23 @@ class Container: async def cancel( self, - stop_msg: str, + log_msg_key: str, + stop_predicate: Callable[[str], bool], + hard_kill: bool = False, ) -> None: + ''' + Attempt to cancel this container gracefully, fail over to + a hard kill on timeout. + ''' cid = self.cntr.id # first try a graceful cancel log.cancel( - f'SIGINT cancelling container: {cid}\n' - f'waiting on stop msg: "{stop_msg}"' + f'SIGINT cancelling container: {self.cuid}\n' + 'waiting on stop predicate...' ) self.try_signal('SIGINT') @@ -253,7 +268,10 @@ class Container: log.cancel('polling for CNTR logs...') try: - await self.process_logs_until(stop_msg) + await self.process_logs_until( + log_msg_key, + stop_predicate, + ) except ApplicationLogError: hard_kill = True else: @@ -311,11 +329,16 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type + loglevel: str | None = 'cancel', **kwargs, ) -> None: - get_console_log('info', name=__name__) + + log = get_console_log( + loglevel, + name=__name__, + ) async with open_docker() as client: @@ -338,40 +361,63 @@ async def open_ahabd( # defaults { + # startup time limit which is the max the supervisor + # will wait for the container to be registered in + # ``client.containers.list()`` 'startup_timeout': 1.0, + + # how fast to poll for the starup predicate by sleeping + # this amount incrementally thus yielding to the + # ``trio`` scheduler on during sync polling execution. 'startup_query_period': 0.001, + + # str-key value expected to contain log message body-contents + # when read using: + # ``json.loads(entry for entry in DockerContainer.logs())`` 'log_msg_key': 'msg', }, ) - found = False - with trio.move_on_after(conf['startup_timeout']): - found = await cntr.process_logs_until( - conf['log_msg_key'], - start_lambda, - checkpoint_period=conf['startup_query_period'], - ) + with trio.move_on_after(conf['startup_timeout']) as cs: + async with trio.open_nursery() as tn: + tn.start_soon( + partial( + cntr.process_logs_until, + log_msg_key=conf['log_msg_key'], + patt_matcher=start_lambda, + checkpoint_period=conf['startup_query_period'], + ) + ) - # XXX: if we timeout on finding the "startup msg" we expect then - # we want to FOR SURE raise an error upwards! - if ( - not found - and dcntr not in client.containers.list() - ): - for entry in cntr.seen_so_far: - log.info(entry) + # poll for container startup or timeout + while not cs.cancel_called: + if dcntr in client.containers.list(): + break - raise RuntimeError( - f'Failed to start {dcntr.id} check logs deats' - ) + await trio.sleep(conf['startup_query_period']) - await ctx.started(( - cntr.cntr.id, - os.getpid(), - cntr_config, - )) + # sync with remote caller actor-task but allow log + # processing to continue running in bg. + await ctx.started(( + cntr.cntr.id, + os.getpid(), + cntr_config, + )) try: + # XXX: if we timeout on finding the "startup msg" we expect then + # we want to FOR SURE raise an error upwards! + if cs.cancelled_caught: + # if dcntr not in client.containers.list(): + for entry in cntr.seen_so_far: + log.info(entry) + + raise DockerNotStarted( + f'Failed to start container: {dcntr.cuid}\n' + f'due to startup_timeout={conf["startup_timeout"]}s\n\n' + "prolly you should check your container's logs for deats.." + ) + # TODO: we might eventually want a proxy-style msg-prot here # to allow remote control of containers without needing # callers to have root perms? @@ -380,14 +426,17 @@ async def open_ahabd( finally: # TODO: ensure loglevel can be set and teardown logs are # reported if possible on error or cancel.. - # with trio.CancelScope(shield=True): - await cntr.cancel(stop_lambda) + with trio.CancelScope(shield=True): + await cntr.cancel( + log_msg_key=conf['log_msg_key'], + stop_predicate=stop_lambda, + ) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], - loglevel: str | None = None, + loglevel: str | None = 'cancel', task_status: TaskStatus[ tuple[ @@ -409,13 +458,12 @@ async def start_ahab( ''' cn_ready = trio.Event() try: - async with tractor.open_nursery( - loglevel='runtime', - ) as tn: + async with tractor.open_nursery() as an: - portal = await tn.start_actor( + portal = await an.start_actor( service_name, - enable_modules=[__name__] + enable_modules=[__name__], + loglevel=loglevel, ) # TODO: we have issues with this on teardown @@ -436,6 +484,7 @@ async def start_ahab( async with portal.open_context( open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), + loglevel=loglevel, ) as (ctx, first): cid, pid, cntr_config = first