From dcee0ddd55e0521dd6125e16fa0f5c4a41c76c2c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Jun 2022 13:07:46 -0400 Subject: [PATCH 1/4] Move/expect all marketstore configs under a `/piker/marketstore` subdir --- piker/data/marketstore.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 4d1c91ad..19d426af 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -127,10 +127,15 @@ def start_marketstore( import os import docker from .. import config - get_console_log('info', name=__name__) - yml_file = os.path.join(config._config_dir, 'mkts.yml') + mktsdir = os.path.join(config._config_dir, 'marketstore') + + # create when dne + if not os.path.isdir(mktsdir): + os.mkdir(mktsdir) + + yml_file = os.path.join(mktsdir, 'mkts.yml') if not os.path.isfile(yml_file): log.warning( f'No `marketstore` config exists?: {yml_file}\n' @@ -143,14 +148,14 @@ def start_marketstore( # create a mount from user's local piker config dir into container config_dir_mnt = docker.types.Mount( target='/etc', - source=config._config_dir, + source=mktsdir, type='bind', ) # create a user config subdir where the marketstore # backing filesystem database can be persisted. persistent_data_dir = os.path.join( - config._config_dir, 'data', + mktsdir, 'data', ) if not os.path.isdir(persistent_data_dir): os.mkdir(persistent_data_dir) From b8b76a32a69a3dc2b2e0fc9a06b77eba885dbf06 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Jun 2022 10:23:14 -0400 Subject: [PATCH 2/4] Harden container cancel-and-wait supervisor loop This should hopefully make teardown more reliable and includes better logic to fail over to a hard kill path after a 3 second timeout waiting for the instance to complete using the `docker-py` wait API. Also generalize the supervisor teardown loop by allowing the container config endpoint to return 2 msgs to expect: - a startup message that can be read from the container's internal process logging that indicates it is fully up and ready. - a teardown msg that can be polled for that indicates the container has gracefully terminated after a cancellation request which is passed to our container wrappers `.cancel()` method. Make the marketstore config endpoint return the 2 messages we previously had hard coded and use this new api. --- piker/data/_ahab.py | 85 +++++++++++++++++++++++++-------------- piker/data/marketstore.py | 9 ++++- 2 files changed, 62 insertions(+), 32 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index fea19a4d..7323a08d 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers. ''' import os +import time from typing import ( Optional, Callable, @@ -186,45 +187,62 @@ class Container: async def cancel( self, + stop_msg: str, ) -> None: cid = self.cntr.id + # first try a graceful cancel + log.cancel( + f'SIGINT cancelling container: {cid}\n' + f'waiting on stop msg: "{stop_msg}"' + ) self.try_signal('SIGINT') - with trio.move_on_after(0.5) as cs: - cs.shield = True - await self.process_logs_until('initiating graceful shutdown') - await self.process_logs_until('exiting...',) + start = time.time() + for _ in range(30): - for _ in range(10): with trio.move_on_after(0.5) as cs: cs.shield = True - await self.process_logs_until('exiting...',) + await self.process_logs_until(stop_msg) + + # if we aren't cancelled on above checkpoint then we + # assume we read the expected stop msg and terminated. break - if cs.cancelled_caught: - # get out the big guns, bc apparently marketstore - # doesn't actually know how to terminate gracefully - # :eyeroll:... - self.try_signal('SIGKILL') + try: + log.info(f'Polling for container shutdown:\n{cid}') + self.cntr.wait( + timeout=0.1, + condition='not-running', + ) + break - try: - log.info('Waiting on container shutdown: {cid}') - self.cntr.wait( - timeout=0.1, - condition='not-running', - ) - break - - except ( - ReadTimeout, - ConnectionError, - ): - log.error(f'failed to wait on container {cid}') - raise + except ( + ReadTimeout, + ): + log.info(f'Still waiting on container:\n{cid}') + continue + except ( + docker.errors.APIError, + ConnectionError, + ): + log.exception(f'Docker connection failure') + break else: - raise RuntimeError('Failed to cancel container {cid}') + delay = time.time() - start + log.error( + f'Failed to kill container {cid} after {delay}s\n' + 'sending SIGKILL..' + ) + # get out the big guns, bc apparently marketstore + # doesn't actually know how to terminate gracefully + # :eyeroll:... + self.try_signal('SIGKILL') + self.cntr.wait( + timeout=3, + condition='not-running', + ) log.cancel(f'Container stopped: {cid}') @@ -245,13 +263,18 @@ async def open_ahabd( # params, etc. passing to ``Containter.run()``? # call into endpoint for container config/init ep_func = NamespacePath(endpoint).load_ref() - dcntr, cntr_config = ep_func(client) + ( + dcntr, + cntr_config, + start_msg, + stop_msg, + ) = ep_func(client) cntr = Container(dcntr) with trio.move_on_after(1): - found = await cntr.process_logs_until( - "launching tcp listener for all services...", - ) + found = await cntr.process_logs_until(start_msg) + # "launching tcp listener for all services...", + # ) if not found and cntr not in client.containers.list(): raise RuntimeError( @@ -278,7 +301,7 @@ async def open_ahabd( ): with trio.CancelScope(shield=True): - await cntr.cancel() + await cntr.cancel(stop_msg) raise diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 19d426af..804a79f0 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -185,7 +185,14 @@ def start_marketstore( init=True, # remove=True, ) - return dcntr, _config + return ( + dcntr, + _config, + + # expected startup and stop msgs + "launching tcp listener for all services...", + "exiting...", + ) _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') From 27c523ca7443ecd685c9b2f379a5f423ca77dc73 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Jun 2022 15:09:56 -0400 Subject: [PATCH 3/4] Speedup: only load a "views worth" of datums on first query --- piker/data/marketstore.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 804a79f0..ee76ba1e 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -395,7 +395,12 @@ class Storage: ]: first_tsdb_dt, last_tsdb_dt = None, None - tsdb_arrays = await self.read_ohlcv(fqsn) + tsdb_arrays = await self.read_ohlcv( + fqsn, + # on first load we don't need to pull the max + # history per request size worth. + limit=3000, + ) log.info(f'Loaded tsdb history {tsdb_arrays}') if tsdb_arrays: @@ -413,6 +418,7 @@ class Storage: fqsn: str, timeframe: Optional[Union[int, str]] = None, end: Optional[int] = None, + limit: int = int(800e3), ) -> tuple[ MarketstoreClient, @@ -435,7 +441,7 @@ class Storage: # TODO: figure the max limit here given the # ``purepc`` msg size limit of purerpc: 33554432 - limit=int(800e3), + limit=limit, ) if timeframe is None: From e45cb9d08aac7cd1a597355e026553baf45abc9e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jun 2022 13:07:25 -0400 Subject: [PATCH 4/4] Always cancel container on teardown --- piker/data/_ahab.py | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 7323a08d..98b3b41f 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -211,10 +211,13 @@ class Container: try: log.info(f'Polling for container shutdown:\n{cid}') - self.cntr.wait( - timeout=0.1, - condition='not-running', - ) + + if self.cntr.status not in {'exited', 'not-running'}: + self.cntr.wait( + timeout=0.1, + condition='not-running', + ) + break except ( @@ -227,7 +230,7 @@ class Container: docker.errors.APIError, ConnectionError, ): - log.exception(f'Docker connection failure') + log.exception('Docker connection failure') break else: delay = time.time() - start @@ -273,8 +276,6 @@ async def open_ahabd( with trio.move_on_after(1): found = await cntr.process_logs_until(start_msg) - # "launching tcp listener for all services...", - # ) if not found and cntr not in client.containers.list(): raise RuntimeError( @@ -294,17 +295,10 @@ async def open_ahabd( # callers to have root perms? await trio.sleep_forever() - except ( - BaseException, - # trio.Cancelled, - # KeyboardInterrupt, - ): - + finally: with trio.CancelScope(shield=True): await cntr.cancel(stop_msg) - raise - async def start_ahab( service_name: str,