From d9773217e9e17fed06cebb27249ebe84e5e339bc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Feb 2022 16:31:31 -0500 Subject: [PATCH] Map the grpc port and add graceful container teardown Not sure how I missed mapping the 5995 grpc port :facepalm:; done now. Also adds graceful teardown using SIGINT with included container logging relayed to the piker console B). --- piker/data/_ahab.py | 89 +++++++++++++++++++++++++++++++++------------ 1 file changed, 66 insertions(+), 23 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index ed81fed0..9ec805a8 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -33,9 +33,9 @@ import docker import json from docker.models.containers import Container from docker.errors import DockerException, APIError -from requests.exceptions import ConnectionError +from requests.exceptions import ConnectionError, ReadTimeout -from ..log import get_logger # , get_console_log +from ..log import get_logger, get_console_log from .. import config log = get_logger(__name__) @@ -52,7 +52,7 @@ _config = ''' root_directory: data listen_port: 5993 grpc_listen_port: 5995 -log_level: info +log_level: debug queryable: true stop_grace_period: 0 wal_rotate_interval: 5 @@ -98,9 +98,6 @@ async def open_docker( **kwargs ) if url else docker.from_env(**kwargs) yield client - except ConnectionError: - # prolly no daemon started - raise DockerNotStarted('!?!?') except ( DockerException, @@ -111,15 +108,20 @@ async def open_docker( args = getattr(err, 'args', None) if args: return args + else: + return str(err) # could be more specific so let's check if it's just perms. if err.args: errs = err.args for err in errs: msg = unpack_msg(err) - if msg and 'PermissionError' in msg: + if 'PermissionError' in msg: raise DockerException('You dint run as root yo!') + elif 'FileNotFoundError' in msg: + raise DockerNotStarted('Did you start da service sister?') + # not perms? raise @@ -181,7 +183,7 @@ async def open_marketstore( 5993:5993 alpacamarkets/marketstore:latest ''' - # log = get_console_log('info', name=__name__) + log = get_console_log('info', name=__name__) async with open_docker() as client: # create a mount from user's local piker config dir into container @@ -197,46 +199,87 @@ async def open_marketstore( # '-i', # '-p 5993:5993', - ports={'5993/tcp': 5993}, + ports={ + '5993/tcp': 5993, # jsonrpc + '5995/tcp': 5995, # grpc + }, mounts=[config_dir_mnt], detach=True, - # stop_signal='SIGINT', - # init=True, + stop_signal='SIGINT', + init=True, # remove=True, ) try: - started: bool = False - logs = cntr.logs(stream=True) + seen_so_far = set() - with trio.move_on_after(0.5): + async def process_logs_until( + match: str, + bp_on_msg: bool = False, + ): + logs = cntr.logs(stream=True) for entry in logs: entry = entry.decode() + try: record = json.loads(entry.strip()) except json.JSONDecodeError: if 'Error' in entry: raise RuntimeError(entry) + msg = record['msg'] + if msg and entry not in seen_so_far: + seen_so_far.add(entry) + if bp_on_msg: + await tractor.breakpoint() + log.info(f'{msg}') - if "launching tcp listener for all services..." in msg: - started = True - break + # if "launching tcp listener for all services..." in msg: + if match in msg: + return True + # do a checkpoint so we don't block if cancelled B) await trio.sleep(0) - if not started and cntr not in client.containers.list(): - raise RuntimeError( - 'Failed to start `marketstore` check logs output for deats' + return False + + with trio.move_on_after(0.5): + found = await process_logs_until( + "launching tcp listener for all services...", ) + if not found and cntr not in client.containers.list(): + raise RuntimeError( + 'Failed to start `marketstore` check logs deats' + ) + await ctx.started(cntr.id) - await trio.sleep_forever() + await process_logs_until('exiting...',) + + except ( + trio.Cancelled, + KeyboardInterrupt, + ): + cntr.kill('SIGINT') + with trio.move_on_after(0.5) as cs: + cs.shield = True + await process_logs_until('exiting...',) + raise finally: - cntr.stop() + try: + cntr.wait( + timeout=0.5, + condition='not-running', + ) + except ( + ReadTimeout, + ConnectionError, + ): + cntr.kill() async def start_ahab( + service_name: str, task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -247,7 +290,7 @@ async def start_ahab( ) as tn: portal = await tn.start_actor( - 'marketstored', + service_name, enable_modules=[__name__] )