Background docker-container logs processing

Previously we would make the `ahabd` supervisor-actor sync to docker
container startup using pseudo-blocking log message processing.

This has issues,
- we're forced to do a hacky "yield back to `trio`" in order to be
  "fake async" when reading the log stream and further,
- blocking on a message is fragile and often slow.

Instead, run the log processor in a background task and in the parent
task poll for the container to be in the client list using a similar
pseudo-async poll pattern. This allows the super to `Context.started()`
sooner (when the container is actually registered as "up") and thus
unblock its (remote) caller faster whilst still doing full log msg
proxying!

Deatz:
- adds `Container.cuid: str` a unique container id for logging.
- correctly proxy through the `loglevel: str` from `pikerd` caller task.
- shield around `Container.cancel()` in the teardown block and use
  cancel level logging in that method.
service_subpkg
Tyler Goodlet 2023-03-08 14:28:48 -05:00
parent b078a06621
commit 7694419e71
1 changed files with 86 additions and 37 deletions

View File

@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers.
''' '''
from collections import ChainMap from collections import ChainMap
from functools import partial
import os import os
import time import time
from typing import ( from typing import (
@ -46,7 +47,10 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ..log import get_logger, get_console_log from ..log import (
get_logger,
get_console_log,
)
from .. import config from .. import config
log = get_logger(__name__) log = get_logger(__name__)
@ -197,6 +201,11 @@ class Container:
return False return False
@property
def cuid(self) -> str:
fqcn: str = self.cntr.attrs['Config']['Image']
return f'{fqcn}[{self.cntr.short_id}]'
def try_signal( def try_signal(
self, self,
signal: str = 'SIGINT', signal: str = 'SIGINT',
@ -232,17 +241,23 @@ class Container:
async def cancel( async def cancel(
self, self,
stop_msg: str, log_msg_key: str,
stop_predicate: Callable[[str], bool],
hard_kill: bool = False, hard_kill: bool = False,
) -> None: ) -> None:
'''
Attempt to cancel this container gracefully, fail over to
a hard kill on timeout.
'''
cid = self.cntr.id cid = self.cntr.id
# first try a graceful cancel # first try a graceful cancel
log.cancel( log.cancel(
f'SIGINT cancelling container: {cid}\n' f'SIGINT cancelling container: {self.cuid}\n'
f'waiting on stop msg: "{stop_msg}"' 'waiting on stop predicate...'
) )
self.try_signal('SIGINT') self.try_signal('SIGINT')
@ -253,7 +268,10 @@ class Container:
log.cancel('polling for CNTR logs...') log.cancel('polling for CNTR logs...')
try: try:
await self.process_logs_until(stop_msg) await self.process_logs_until(
log_msg_key,
stop_predicate,
)
except ApplicationLogError: except ApplicationLogError:
hard_kill = True hard_kill = True
else: else:
@ -311,11 +329,16 @@ class Container:
async def open_ahabd( async def open_ahabd(
ctx: tractor.Context, ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type endpoint: str, # ns-pointer str-msg-type
loglevel: str | None = 'cancel',
**kwargs, **kwargs,
) -> None: ) -> None:
get_console_log('info', name=__name__)
log = get_console_log(
loglevel,
name=__name__,
)
async with open_docker() as client: async with open_docker() as client:
@ -338,33 +361,43 @@ async def open_ahabd(
# defaults # defaults
{ {
# startup time limit which is the max the supervisor
# will wait for the container to be registered in
# ``client.containers.list()``
'startup_timeout': 1.0, 'startup_timeout': 1.0,
# how fast to poll for the starup predicate by sleeping
# this amount incrementally thus yielding to the
# ``trio`` scheduler on during sync polling execution.
'startup_query_period': 0.001, 'startup_query_period': 0.001,
# str-key value expected to contain log message body-contents
# when read using:
# ``json.loads(entry for entry in DockerContainer.logs())``
'log_msg_key': 'msg', 'log_msg_key': 'msg',
}, },
) )
found = False with trio.move_on_after(conf['startup_timeout']) as cs:
with trio.move_on_after(conf['startup_timeout']): async with trio.open_nursery() as tn:
found = await cntr.process_logs_until( tn.start_soon(
conf['log_msg_key'], partial(
start_lambda, cntr.process_logs_until,
log_msg_key=conf['log_msg_key'],
patt_matcher=start_lambda,
checkpoint_period=conf['startup_query_period'], checkpoint_period=conf['startup_query_period'],
) )
# XXX: if we timeout on finding the "startup msg" we expect then
# we want to FOR SURE raise an error upwards!
if (
not found
and dcntr not in client.containers.list()
):
for entry in cntr.seen_so_far:
log.info(entry)
raise RuntimeError(
f'Failed to start {dcntr.id} check logs deats'
) )
# poll for container startup or timeout
while not cs.cancel_called:
if dcntr in client.containers.list():
break
await trio.sleep(conf['startup_query_period'])
# sync with remote caller actor-task but allow log
# processing to continue running in bg.
await ctx.started(( await ctx.started((
cntr.cntr.id, cntr.cntr.id,
os.getpid(), os.getpid(),
@ -372,6 +405,19 @@ async def open_ahabd(
)) ))
try: try:
# XXX: if we timeout on finding the "startup msg" we expect then
# we want to FOR SURE raise an error upwards!
if cs.cancelled_caught:
# if dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)
raise DockerNotStarted(
f'Failed to start container: {dcntr.cuid}\n'
f'due to startup_timeout={conf["startup_timeout"]}s\n\n'
"prolly you should check your container's logs for deats.."
)
# TODO: we might eventually want a proxy-style msg-prot here # TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing # to allow remote control of containers without needing
# callers to have root perms? # callers to have root perms?
@ -380,14 +426,17 @@ async def open_ahabd(
finally: finally:
# TODO: ensure loglevel can be set and teardown logs are # TODO: ensure loglevel can be set and teardown logs are
# reported if possible on error or cancel.. # reported if possible on error or cancel..
# with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await cntr.cancel(stop_lambda) await cntr.cancel(
log_msg_key=conf['log_msg_key'],
stop_predicate=stop_lambda,
)
async def start_ahab( async def start_ahab(
service_name: str, service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer], endpoint: Callable[docker.DockerClient, DockerContainer],
loglevel: str | None = None, loglevel: str | None = 'cancel',
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
@ -409,13 +458,12 @@ async def start_ahab(
''' '''
cn_ready = trio.Event() cn_ready = trio.Event()
try: try:
async with tractor.open_nursery( async with tractor.open_nursery() as an:
loglevel='runtime',
) as tn:
portal = await tn.start_actor( portal = await an.start_actor(
service_name, service_name,
enable_modules=[__name__] enable_modules=[__name__],
loglevel=loglevel,
) )
# TODO: we have issues with this on teardown # TODO: we have issues with this on teardown
@ -436,6 +484,7 @@ async def start_ahab(
async with portal.open_context( async with portal.open_context(
open_ahabd, open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)), endpoint=str(NamespacePath.from_ref(endpoint)),
loglevel=loglevel,
) as (ctx, first): ) as (ctx, first):
cid, pid, cntr_config = first cid, pid, cntr_config = first