From 1693cc42c1245ee1fa479e68ab23f9713f2792ce Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 15 Feb 2022 22:07:50 -0500 Subject: [PATCH] Add a super simple `marketstore` container supervisor --- piker/data/_ahab.py | 190 ++++++++++++++++++++++++++++++-------------- 1 file changed, 131 insertions(+), 59 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 591c19a9..08c0bcd1 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -18,25 +18,33 @@ Supervisor for docker with included specific-image service helpers. ''' -from typing import Optional -from contextlib import contextmanager as cm +from typing import ( + Optional, + # Any, +) +from contextlib import asynccontextmanager as acm # import time import trio import tractor import docker import json -# from docker.containers import Container -from requests import ConnectionError +from docker.models.containers import Container -from ..log import get_logger, get_console_log +from ..log import get_logger # , get_console_log +from ..config import _config_dir log = get_logger(__name__) _config = ''' +# piker's ``marketstore`` config. + # mount this config using: -# sudo docker run --mount type=bind,source="$HOME/.config/piker/",target="/etc" -i -p 5993:5993 alpacamarkets/marketstore:latest +# sudo docker run --mount \ +# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ +# 5993:5993 alpacamarkets/marketstore:latest + root_directory: data listen_port: 5993 grpc_listen_port: 5995 @@ -68,18 +76,58 @@ triggers: ''' -@cm -def open_docker( +@acm +async def open_docker( url: Optional[str] = None, **kwargs, ) -> docker.DockerClient: - # yield docker.Client( - # base_url=url, - # **kwargs - # ) if url else - yield docker.from_env(**kwargs) + client = docker.DockerClient( + base_url=url, + **kwargs + ) if url else docker.from_env(**kwargs) + + try: + yield client + finally: + # for c in client.containers.list(): + # c.kill() + client.close() + # client.api._custom_adapter.close() + + +# async def waitfor( +# cntr: Container, +# attr_path: tuple[str], +# expect=None, +# timeout: float = 0.5, + +# ) -> Any: +# ''' +# Wait for a container's attr value to be set. If ``expect`` is +# provided wait for the value to be set to that value. + +# This is an async version of the helper from our ``pytest-dockerctl`` +# plugin. + +# ''' +# def get(val, path): +# for key in path: +# val = val[key] +# return val + +# start = time.time() +# while time.time() - start < timeout: +# cntr.reload() +# val = get(cntr.attrs, attr_path) +# if expect is None and val: +# return val +# elif val == expect: +# return val +# else: +# raise TimeoutError("{} failed to be {}, value: \"{}\"".format( +# attr_path, expect if expect else 'not None', val)) @tractor.context @@ -88,64 +136,88 @@ async def open_marketstore_container( **kwargs, ) -> None: - log = get_console_log('info', name=__name__) - # this cli should "just work" - # sudo docker run --mount - # type=bind,source="$HOME/.config/piker/",target="/etc" -i -p - # 5993:5993 alpacamarkets/marketstore:latest - client = docker.from_env(**kwargs) + ''' + Start and supervise a marketstore instance with its config bind-mounted + in from the piker config directory on the system. - # with open_docker() as client: - ctnr = client.containers.run( - 'alpacamarkets/marketstore:latest', - [ - '--mount', - 'type=bind,source="$HOME/.config/piker/",target="/etc"', - '-i', - '-p 5993:5993', - ], - detach=True, - ) - started: bool = False - logs = ctnr.logs(stream=True) + The equivalent cli cmd to this code is: - with trio.move_on_after(0.5): - for entry in logs: - entry = entry.decode() - try: - record = json.loads(entry.strip()) - except json.JSONDecodeError: - if 'Error' in entry: - raise RuntimeError(entry) - # await tractor.breakpoint() - msg = record['msg'] + sudo docker run --mount \ + type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ + 5993:5993 alpacamarkets/marketstore:latest - if "launching tcp listener for all services..." in msg: - started = True - break + ''' + # log = get_console_log('info', name=__name__) - await trio.sleep(0) - - if not started and ctnr not in client.containers.list(): - raise RuntimeError( - 'Failed to start `marketstore` check logs output for deats' + # client = docker.from_env(**kwargs) + async with open_docker() as client: + # create a mount from user's local piker config dir into container + config_dir_mnt = docker.types.Mount( + target='/etc', + source=_config_dir, + type='bind', ) - await ctx.started() - await tractor.breakpoint() + cntr: Container = client.containers.run( + 'alpacamarkets/marketstore:latest', + # do we need this for cmds? + # '-i', + + # '-p 5993:5993', + ports={'5993/tcp': 5993}, + mounts=[config_dir_mnt], + detach=True, + # stop_signal='SIGINT', + # init=True, + # remove=True, + ) + try: + started: bool = False + logs = cntr.logs(stream=True) + + with trio.move_on_after(0.5): + for entry in logs: + entry = entry.decode() + try: + record = json.loads(entry.strip()) + except json.JSONDecodeError: + if 'Error' in entry: + raise RuntimeError(entry) + msg = record['msg'] + + if "launching tcp listener for all services..." in msg: + started = True + break + + await trio.sleep(0) + + if not started and cntr not in client.containers.list(): + raise RuntimeError( + 'Failed to start `marketstore` check logs output for deats' + ) + + await ctx.started() + await trio.sleep_forever() + + finally: + cntr.stop() async def main(): + async with tractor.open_nursery( - loglevel='info', + loglevel='runtime', ) as tn: - portal = await tn.start_actor('ahab', enable_modules=[__name__]) - - async with portal.open_context( - open_marketstore_container - - ) as (first, ctx): + async with ( + ( + await tn.start_actor('ahab', enable_modules=[__name__]) + ).open_context( + open_marketstore_container + ) as (ctx, first), + ): + assert not first await trio.sleep_forever() + if __name__ == '__main__': trio.run(main)