diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 39a5b46a..66b41f38 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -18,6 +18,7 @@ Supervisor for docker with included specific-image service helpers. ''' +from collections import ChainMap import os import time from typing import ( @@ -124,10 +125,19 @@ class Container: async def process_logs_until( self, + log_msg_key: str, + # this is a predicate func for matching log msgs emitted by the # underlying containerized app patt_matcher: Callable[[str], bool], - bp_on_msg: bool = False, + + # XXX WARNING XXX: do not touch this sleep value unless + # you know what you are doing! the value is critical to + # making sure the caller code inside the startup context + # does not timeout BEFORE we receive a match on the + # ``patt_matcher()`` predicate above. + checkpoint_period: float = 0.001, + ) -> bool: ''' Attempt to capture container log messages and relay through our @@ -137,12 +147,14 @@ class Container: seen_so_far = self.seen_so_far while True: + logs = self.cntr.logs() try: logs = self.cntr.logs() except ( docker.errors.NotFound, docker.errors.APIError ): + log.exception('Failed to parse logs?') return False entries = logs.decode().split('\n') @@ -155,25 +167,23 @@ class Container: entry = entry.strip() try: record = json.loads(entry) - - if 'msg' in record: - msg = record['msg'] - elif 'message' in record: - msg = record['message'] - else: - raise KeyError(f'Unexpected log format\n{record}') - + msg = record[log_msg_key] level = record['level'] except json.JSONDecodeError: msg = entry level = 'error' - if msg and entry not in seen_so_far: - seen_so_far.add(entry) - if bp_on_msg: - await tractor.breakpoint() + # TODO: do we need a more general mechanism + # for these kinda of "log record entries"? + # if 'Error' in entry: + # raise RuntimeError(entry) + if ( + msg + and entry not in seen_so_far + ): + seen_so_far.add(entry) getattr(log, level.lower(), log.error)(f'{msg}') if level == 'fatal': @@ -183,7 +193,7 @@ class Container: return True # do a checkpoint so we don't block if cancelled B) - await trio.sleep(0.1) + await trio.sleep(checkpoint_period) return False @@ -301,7 +311,6 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type - start_timeout: float = 1.0, **kwargs, @@ -322,16 +331,39 @@ async def open_ahabd( ) = ep_func(client) cntr = Container(dcntr) - with trio.move_on_after(start_timeout): - found = await cntr.process_logs_until(start_lambda) + conf: ChainMap[str, Any] = ChainMap( - if not found and dcntr not in client.containers.list(): - for entry in cntr.seen_so_far: - log.info(entry) + # container specific + cntr_config, - raise RuntimeError( - f'Failed to start {dcntr.id} check logs deats' - ) + # defaults + { + 'startup_timeout': 1.0, + 'startup_query_period': 0.001, + 'log_msg_key': 'msg', + }, + ) + + found = False + with trio.move_on_after(conf['startup_timeout']): + found = await cntr.process_logs_until( + conf['log_msg_key'], + start_lambda, + checkpoint_period=conf['startup_query_period'], + ) + + # XXX: if we timeout on finding the "startup msg" we expect then + # we want to FOR SURE raise an error upwards! + if ( + not found + and dcntr not in client.containers.list() + ): + for entry in cntr.seen_so_far: + log.info(entry) + + raise RuntimeError( + f'Failed to start {dcntr.id} check logs deats' + ) await ctx.started(( cntr.cntr.id, @@ -346,13 +378,17 @@ async def open_ahabd( await trio.sleep_forever() finally: + # TODO: ensure loglevel can be set and teardown logs are + # reported if possible on error or cancel.. + # with trio.CancelScope(shield=True): await cntr.cancel(stop_lambda) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], - start_timeout: float = 1.0, + loglevel: str | None = None, + task_status: TaskStatus[ tuple[ trio.Event, @@ -400,7 +436,6 @@ async def start_ahab( async with portal.open_context( open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), - start_timeout=start_timeout ) as (ctx, first): cid, pid, cntr_config = first