From d9e2666e8035a9cf03f04276baeca3a8e02b60fb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 5 May 2022 21:04:10 -0400 Subject: [PATCH] More reliable `marketstored` + container supervision It turns out (i guess not so shockingly?) that `marketstore` doesn't always teardown "gracefully" under SIGINT (seems to hang if there are open client connections which are also in the midst of teardown?) so this instead first tries the SIGINT and then fails over to a SIGKILL (destroy loop) which seems to be much more reliable to ensure shutdown without any downside - in terms of a "hard kill". Originally i was thinking the issue was root perms related (which get relegated solely to the `marketstored` daemon actor after spawn) but actually it was indeed the signalling / application layer causing the hold-up/latency on teardown. There's a bunch of lingering (now commented) code which tried to solve this non-problem as well as a bunch logging/prints to help decipher the root of the issue - this will all get cleaned out shortly. --- piker/data/_ahab.py | 245 +++++++++++++++++++++++++++++++++----------- 1 file changed, 183 insertions(+), 62 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 52088e91..14e278a7 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -30,7 +30,7 @@ from trio_typing import TaskStatus import tractor import docker import json -from docker.models.containers import Container +from docker.models.containers import Container as DockerContainer from docker.errors import DockerException, APIError from requests.exceptions import ConnectionError, ReadTimeout @@ -133,6 +133,136 @@ async def open_docker( c.kill() +class Container: + ''' + Wrapper around a ``docker.models.containers.Container`` to include + log capture and relay through our native logging system and helper + method(s) for cancellation/teardown. + + ''' + def __init__( + self, + cntr: DockerContainer, + ) -> None: + + self.cntr = cntr + # log msg de-duplication + self.seen_so_far = set() + + async def process_logs_until( + self, + patt: str, + bp_on_msg: bool = False, + ) -> bool: + ''' + Attempt to capture container log messages and relay through our + native logging system. + + ''' + seen_so_far = self.seen_so_far + + while True: + logs = self.cntr.logs() + entries = logs.decode().split('\n') + for entry in entries: + + # ignore null lines + if not entry: + continue + + try: + record = json.loads(entry.strip()) + except json.JSONDecodeError: + if 'Error' in entry: + raise RuntimeError(entry) + raise + + msg = record['msg'] + level = record['level'] + if msg and entry not in seen_so_far: + seen_so_far.add(entry) + if bp_on_msg: + await tractor.breakpoint() + + getattr(log, level, log.error)(f'{msg}') + + if patt in msg: + return True + + # do a checkpoint so we don't block if cancelled B) + await trio.sleep(0.01) + + return False + + def try_signal( + self, + signal: str = 'SIGINT', + + ) -> bool: + try: + # XXX: market store doesn't seem to shutdown nicely all the + # time with this (maybe because there are still open grpc + # connections?) noticably after client connections have been + # made or are in use/teardown. It works just fine if you + # just start and stop the container tho?.. + log.cancel(f'SENDING {signal} to {self.cntr.id}') + self.cntr.kill(signal) + return True + + except docker.errors.APIError as err: + # _err = err + if 'is not running' in err.explanation: + return False + + async def cancel( + self, + ) -> None: + + cid = self.cntr.id + self.try_signal('SIGINT') + + with trio.move_on_after(0.5) as cs: + cs.shield = True + # print('PROCESSINGN LOGS') + await self.process_logs_until('initiating graceful shutdown') + # print('SHUTDOWN REPORTED BY CONTAINER') + await self.process_logs_until('exiting...',) + + for _ in range(10): + with trio.move_on_after(0.5) as cs: + cs.shield = True + # print('waiting on EXITING') + await self.process_logs_until('exiting...',) + # print('got EXITING') + break + + if cs.cancelled_caught: + # get out the big guns, bc apparently marketstore + # doesn't actually know how to terminate gracefully + # :eyeroll:... + self.try_signal('SIGKILL') + + try: + log.info('Waiting on container shutdown: {cid}') + self.cntr.wait( + timeout=0.1, + condition='not-running', + ) + break + + except ( + ReadTimeout, + ConnectionError, + ): + log.error(f'failed to wait on container {cid}') + raise + + else: + raise RuntimeError('Failed to cancel container {cid}') + + log.cancel(f'Container stopped: {cid}') + + @tractor.context async def open_marketstored( ctx: tractor.Context, @@ -175,7 +305,7 @@ async def open_marketstored( type='bind', ) - cntr: Container = client.containers.run( + dcntr: DockerContainer = client.containers.run( 'alpacamarkets/marketstore:latest', # do we need this for cmds? # '-i', @@ -191,77 +321,59 @@ async def open_marketstored( init=True, # remove=True, ) - try: - seen_so_far = set() + cntr = Container(dcntr) - async def process_logs_until( - match: str, - bp_on_msg: bool = False, - ): - logs = cntr.logs(stream=True) - for entry in logs: - entry = entry.decode() + with trio.move_on_after(1): + found = await cntr.process_logs_until( + "launching tcp listener for all services...", + ) - try: - record = json.loads(entry.strip()) - except json.JSONDecodeError: - if 'Error' in entry: - raise RuntimeError(entry) - - msg = record['msg'] - level = record['level'] - if msg and entry not in seen_so_far: - seen_so_far.add(entry) - if bp_on_msg: - await tractor.breakpoint() - getattr(log, level, log.error)(f'{msg}') - - # if "launching tcp listener for all services..." in msg: - if match in msg: - return True - - # do a checkpoint so we don't block if cancelled B) - await trio.sleep(0) - - return False - - with trio.move_on_after(0.5): - found = await process_logs_until( - "launching tcp listener for all services...", + if not found and cntr not in client.containers.list(): + raise RuntimeError( + 'Failed to start `marketstore` check logs deats' ) - if not found and cntr not in client.containers.list(): - raise RuntimeError( - 'Failed to start `marketstore` check logs deats' - ) + await ctx.started((cntr.cntr.id, os.getpid())) - await ctx.started(cntr.id) + # async with ctx.open_stream() as stream: - # block for the expected "teardown log msg".. - await process_logs_until('exiting...',) + try: + + # TODO: we might eventually want a proxy-style msg-prot here + # to allow remote control of containers without needing + # callers to have root perms? + await trio.sleep_forever() + + # await cntr.cancel() + # with trio.CancelScope(shield=True): + # # block for the expected "teardown log msg".. + # # await cntr.process_logs_until('exiting...',) + + # # only msg should be to signal killing the + # # container and this super daemon. + # msg = await stream.receive() + # # print("GOT CANCEL MSG") + + # cid = msg['cancel'] + # log.cancel(f'Cancelling container {cid}') + + # # print("CANCELLING CONTAINER") + # await cntr.cancel() + + # # print("SENDING ACK") + # await stream.send('ack') except ( BaseException, # trio.Cancelled, # KeyboardInterrupt, ): - cntr.kill('SIGINT') - with trio.move_on_after(0.5) as cs: - cs.shield = True - await process_logs_until('exiting...',) - raise - finally: - try: - cntr.wait( - timeout=0.5, - condition='not-running', - ) - except ( - ReadTimeout, - ConnectionError, - ): - cntr.kill() + with trio.CancelScope(shield=True): + await cntr.cancel() + # await stream.send('ack') + + raise async def start_ahab( @@ -311,9 +423,18 @@ async def start_ahab( open_marketstored, ) as (ctx, first): - assert str(first) - # run till cancelled + cid, pid = first + await trio.sleep_forever() + # async with ctx.open_stream() as stream: + # try: + # # run till cancelled + # await trio.sleep_forever() + # finally: + # with trio.CancelScope(shield=True): + # # print('SENDING CANCEL TO MARKETSTORED') + # await stream.send({'cancel': (cid, pid)}) + # assert await stream.receive() == 'ack' # since we demoted root perms in this parent # we'll get a perms error on proc cleanup in