From b8b76a32a69a3dc2b2e0fc9a06b77eba885dbf06 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Jun 2022 10:23:14 -0400 Subject: [PATCH] Harden container cancel-and-wait supervisor loop This should hopefully make teardown more reliable and includes better logic to fail over to a hard kill path after a 3 second timeout waiting for the instance to complete using the `docker-py` wait API. Also generalize the supervisor teardown loop by allowing the container config endpoint to return 2 msgs to expect: - a startup message that can be read from the container's internal process logging that indicates it is fully up and ready. - a teardown msg that can be polled for that indicates the container has gracefully terminated after a cancellation request which is passed to our container wrappers `.cancel()` method. Make the marketstore config endpoint return the 2 messages we previously had hard coded and use this new api. --- piker/data/_ahab.py | 85 +++++++++++++++++++++++++-------------- piker/data/marketstore.py | 9 ++++- 2 files changed, 62 insertions(+), 32 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index fea19a4d..7323a08d 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers. ''' import os +import time from typing import ( Optional, Callable, @@ -186,45 +187,62 @@ class Container: async def cancel( self, + stop_msg: str, ) -> None: cid = self.cntr.id + # first try a graceful cancel + log.cancel( + f'SIGINT cancelling container: {cid}\n' + f'waiting on stop msg: "{stop_msg}"' + ) self.try_signal('SIGINT') - with trio.move_on_after(0.5) as cs: - cs.shield = True - await self.process_logs_until('initiating graceful shutdown') - await self.process_logs_until('exiting...',) + start = time.time() + for _ in range(30): - for _ in range(10): with trio.move_on_after(0.5) as cs: cs.shield = True - await self.process_logs_until('exiting...',) + await self.process_logs_until(stop_msg) + + # if we aren't cancelled on above checkpoint then we + # assume we read the expected stop msg and terminated. break - if cs.cancelled_caught: - # get out the big guns, bc apparently marketstore - # doesn't actually know how to terminate gracefully - # :eyeroll:... - self.try_signal('SIGKILL') + try: + log.info(f'Polling for container shutdown:\n{cid}') + self.cntr.wait( + timeout=0.1, + condition='not-running', + ) + break - try: - log.info('Waiting on container shutdown: {cid}') - self.cntr.wait( - timeout=0.1, - condition='not-running', - ) - break - - except ( - ReadTimeout, - ConnectionError, - ): - log.error(f'failed to wait on container {cid}') - raise + except ( + ReadTimeout, + ): + log.info(f'Still waiting on container:\n{cid}') + continue + except ( + docker.errors.APIError, + ConnectionError, + ): + log.exception(f'Docker connection failure') + break else: - raise RuntimeError('Failed to cancel container {cid}') + delay = time.time() - start + log.error( + f'Failed to kill container {cid} after {delay}s\n' + 'sending SIGKILL..' + ) + # get out the big guns, bc apparently marketstore + # doesn't actually know how to terminate gracefully + # :eyeroll:... + self.try_signal('SIGKILL') + self.cntr.wait( + timeout=3, + condition='not-running', + ) log.cancel(f'Container stopped: {cid}') @@ -245,13 +263,18 @@ async def open_ahabd( # params, etc. passing to ``Containter.run()``? # call into endpoint for container config/init ep_func = NamespacePath(endpoint).load_ref() - dcntr, cntr_config = ep_func(client) + ( + dcntr, + cntr_config, + start_msg, + stop_msg, + ) = ep_func(client) cntr = Container(dcntr) with trio.move_on_after(1): - found = await cntr.process_logs_until( - "launching tcp listener for all services...", - ) + found = await cntr.process_logs_until(start_msg) + # "launching tcp listener for all services...", + # ) if not found and cntr not in client.containers.list(): raise RuntimeError( @@ -278,7 +301,7 @@ async def open_ahabd( ): with trio.CancelScope(shield=True): - await cntr.cancel() + await cntr.cancel(stop_msg) raise diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 19d426af..804a79f0 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -185,7 +185,14 @@ def start_marketstore( init=True, # remove=True, ) - return dcntr, _config + return ( + dcntr, + _config, + + # expected startup and stop msgs + "launching tcp listener for all services...", + "exiting...", + ) _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')