diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index fea19a4d..98b3b41f 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers. ''' import os +import time from typing import ( Optional, Callable, @@ -186,45 +187,65 @@ class Container: async def cancel( self, + stop_msg: str, ) -> None: cid = self.cntr.id + # first try a graceful cancel + log.cancel( + f'SIGINT cancelling container: {cid}\n' + f'waiting on stop msg: "{stop_msg}"' + ) self.try_signal('SIGINT') - with trio.move_on_after(0.5) as cs: - cs.shield = True - await self.process_logs_until('initiating graceful shutdown') - await self.process_logs_until('exiting...',) + start = time.time() + for _ in range(30): - for _ in range(10): with trio.move_on_after(0.5) as cs: cs.shield = True - await self.process_logs_until('exiting...',) + await self.process_logs_until(stop_msg) + + # if we aren't cancelled on above checkpoint then we + # assume we read the expected stop msg and terminated. break - if cs.cancelled_caught: - # get out the big guns, bc apparently marketstore - # doesn't actually know how to terminate gracefully - # :eyeroll:... - self.try_signal('SIGKILL') + try: + log.info(f'Polling for container shutdown:\n{cid}') - try: - log.info('Waiting on container shutdown: {cid}') + if self.cntr.status not in {'exited', 'not-running'}: self.cntr.wait( timeout=0.1, condition='not-running', ) - break - except ( - ReadTimeout, - ConnectionError, - ): - log.error(f'failed to wait on container {cid}') - raise + break + except ( + ReadTimeout, + ): + log.info(f'Still waiting on container:\n{cid}') + continue + + except ( + docker.errors.APIError, + ConnectionError, + ): + log.exception('Docker connection failure') + break else: - raise RuntimeError('Failed to cancel container {cid}') + delay = time.time() - start + log.error( + f'Failed to kill container {cid} after {delay}s\n' + 'sending SIGKILL..' + ) + # get out the big guns, bc apparently marketstore + # doesn't actually know how to terminate gracefully + # :eyeroll:... + self.try_signal('SIGKILL') + self.cntr.wait( + timeout=3, + condition='not-running', + ) log.cancel(f'Container stopped: {cid}') @@ -245,13 +266,16 @@ async def open_ahabd( # params, etc. passing to ``Containter.run()``? # call into endpoint for container config/init ep_func = NamespacePath(endpoint).load_ref() - dcntr, cntr_config = ep_func(client) + ( + dcntr, + cntr_config, + start_msg, + stop_msg, + ) = ep_func(client) cntr = Container(dcntr) with trio.move_on_after(1): - found = await cntr.process_logs_until( - "launching tcp listener for all services...", - ) + found = await cntr.process_logs_until(start_msg) if not found and cntr not in client.containers.list(): raise RuntimeError( @@ -271,16 +295,9 @@ async def open_ahabd( # callers to have root perms? await trio.sleep_forever() - except ( - BaseException, - # trio.Cancelled, - # KeyboardInterrupt, - ): - + finally: with trio.CancelScope(shield=True): - await cntr.cancel() - - raise + await cntr.cancel(stop_msg) async def start_ahab( diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 4d1c91ad..ee76ba1e 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -127,10 +127,15 @@ def start_marketstore( import os import docker from .. import config - get_console_log('info', name=__name__) - yml_file = os.path.join(config._config_dir, 'mkts.yml') + mktsdir = os.path.join(config._config_dir, 'marketstore') + + # create when dne + if not os.path.isdir(mktsdir): + os.mkdir(mktsdir) + + yml_file = os.path.join(mktsdir, 'mkts.yml') if not os.path.isfile(yml_file): log.warning( f'No `marketstore` config exists?: {yml_file}\n' @@ -143,14 +148,14 @@ def start_marketstore( # create a mount from user's local piker config dir into container config_dir_mnt = docker.types.Mount( target='/etc', - source=config._config_dir, + source=mktsdir, type='bind', ) # create a user config subdir where the marketstore # backing filesystem database can be persisted. persistent_data_dir = os.path.join( - config._config_dir, 'data', + mktsdir, 'data', ) if not os.path.isdir(persistent_data_dir): os.mkdir(persistent_data_dir) @@ -180,7 +185,14 @@ def start_marketstore( init=True, # remove=True, ) - return dcntr, _config + return ( + dcntr, + _config, + + # expected startup and stop msgs + "launching tcp listener for all services...", + "exiting...", + ) _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') @@ -383,7 +395,12 @@ class Storage: ]: first_tsdb_dt, last_tsdb_dt = None, None - tsdb_arrays = await self.read_ohlcv(fqsn) + tsdb_arrays = await self.read_ohlcv( + fqsn, + # on first load we don't need to pull the max + # history per request size worth. + limit=3000, + ) log.info(f'Loaded tsdb history {tsdb_arrays}') if tsdb_arrays: @@ -401,6 +418,7 @@ class Storage: fqsn: str, timeframe: Optional[Union[int, str]] = None, end: Optional[int] = None, + limit: int = int(800e3), ) -> tuple[ MarketstoreClient, @@ -423,7 +441,7 @@ class Storage: # TODO: figure the max limit here given the # ``purepc`` msg size limit of purerpc: 33554432 - limit=int(800e3), + limit=limit, ) if timeframe is None: