Add a super simple `marketstore` container supervisor
							parent
							
								
									d9ded54e10
								
							
						
					
					
						commit
						bb45100168
					
				| 
						 | 
				
			
			@ -18,25 +18,33 @@
 | 
			
		|||
Supervisor for docker with included specific-image service helpers.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from typing import Optional
 | 
			
		||||
from contextlib import contextmanager as cm
 | 
			
		||||
from typing import (
 | 
			
		||||
    Optional,
 | 
			
		||||
    # Any,
 | 
			
		||||
)
 | 
			
		||||
from contextlib import asynccontextmanager as acm
 | 
			
		||||
# import time
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
import tractor
 | 
			
		||||
import docker
 | 
			
		||||
import json
 | 
			
		||||
# from docker.containers import Container
 | 
			
		||||
from requests import ConnectionError
 | 
			
		||||
from docker.models.containers import Container
 | 
			
		||||
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
from ..log import get_logger  # , get_console_log
 | 
			
		||||
from ..config import _config_dir
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_config = '''
 | 
			
		||||
# piker's ``marketstore`` config.
 | 
			
		||||
 | 
			
		||||
# mount this config using:
 | 
			
		||||
# sudo docker run --mount type=bind,source="$HOME/.config/piker/",target="/etc" -i -p 5993:5993 alpacamarkets/marketstore:latest
 | 
			
		||||
# sudo docker run --mount \
 | 
			
		||||
# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
 | 
			
		||||
# 5993:5993 alpacamarkets/marketstore:latest
 | 
			
		||||
 | 
			
		||||
root_directory: data
 | 
			
		||||
listen_port: 5993
 | 
			
		||||
grpc_listen_port: 5995
 | 
			
		||||
| 
						 | 
				
			
			@ -68,18 +76,58 @@ triggers:
 | 
			
		|||
'''
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@cm
 | 
			
		||||
def open_docker(
 | 
			
		||||
@acm
 | 
			
		||||
async def open_docker(
 | 
			
		||||
    url: Optional[str] = None,
 | 
			
		||||
    **kwargs,
 | 
			
		||||
 | 
			
		||||
) -> docker.DockerClient:
 | 
			
		||||
 | 
			
		||||
    # yield docker.Client(
 | 
			
		||||
    #     base_url=url,
 | 
			
		||||
    #     **kwargs
 | 
			
		||||
    # ) if url else
 | 
			
		||||
    yield docker.from_env(**kwargs)
 | 
			
		||||
    client = docker.DockerClient(
 | 
			
		||||
        base_url=url,
 | 
			
		||||
        **kwargs
 | 
			
		||||
    ) if url else docker.from_env(**kwargs)
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        yield client
 | 
			
		||||
    finally:
 | 
			
		||||
        # for c in client.containers.list():
 | 
			
		||||
        #     c.kill()
 | 
			
		||||
        client.close()
 | 
			
		||||
        # client.api._custom_adapter.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# async def waitfor(
 | 
			
		||||
#     cntr: Container,
 | 
			
		||||
#     attr_path: tuple[str],
 | 
			
		||||
#     expect=None,
 | 
			
		||||
#     timeout: float = 0.5,
 | 
			
		||||
 | 
			
		||||
# ) -> Any:
 | 
			
		||||
#     '''
 | 
			
		||||
#     Wait for a container's attr value to be set. If ``expect`` is
 | 
			
		||||
#     provided wait for the value to be set to that value.
 | 
			
		||||
 | 
			
		||||
#     This is an async version of the helper from our ``pytest-dockerctl``
 | 
			
		||||
#     plugin.
 | 
			
		||||
 | 
			
		||||
#     '''
 | 
			
		||||
#     def get(val, path):
 | 
			
		||||
#         for key in path:
 | 
			
		||||
#             val = val[key]
 | 
			
		||||
#         return val
 | 
			
		||||
 | 
			
		||||
#     start = time.time()
 | 
			
		||||
#     while time.time() - start < timeout:
 | 
			
		||||
#         cntr.reload()
 | 
			
		||||
#         val = get(cntr.attrs, attr_path)
 | 
			
		||||
#         if expect is None and val:
 | 
			
		||||
#             return val
 | 
			
		||||
#         elif val == expect:
 | 
			
		||||
#             return val
 | 
			
		||||
#     else:
 | 
			
		||||
#         raise TimeoutError("{} failed to be {}, value: \"{}\"".format(
 | 
			
		||||
#             attr_path, expect if expect else 'not None', val))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
| 
						 | 
				
			
			@ -88,64 +136,88 @@ async def open_marketstore_container(
 | 
			
		|||
    **kwargs,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    log = get_console_log('info', name=__name__)
 | 
			
		||||
    # this cli should "just work"
 | 
			
		||||
    # sudo docker run --mount
 | 
			
		||||
    # type=bind,source="$HOME/.config/piker/",target="/etc" -i -p
 | 
			
		||||
    # 5993:5993 alpacamarkets/marketstore:latest
 | 
			
		||||
    client = docker.from_env(**kwargs)
 | 
			
		||||
    '''
 | 
			
		||||
    Start and supervise a marketstore instance with its config bind-mounted
 | 
			
		||||
    in from the piker config directory on the system.
 | 
			
		||||
 | 
			
		||||
    # with open_docker() as client:
 | 
			
		||||
    ctnr = client.containers.run(
 | 
			
		||||
        'alpacamarkets/marketstore:latest',
 | 
			
		||||
        [
 | 
			
		||||
            '--mount',
 | 
			
		||||
            'type=bind,source="$HOME/.config/piker/",target="/etc"',
 | 
			
		||||
            '-i',
 | 
			
		||||
            '-p 5993:5993',
 | 
			
		||||
        ],
 | 
			
		||||
        detach=True,
 | 
			
		||||
    )
 | 
			
		||||
    started: bool = False
 | 
			
		||||
    logs = ctnr.logs(stream=True)
 | 
			
		||||
    The equivalent cli cmd to this code is:
 | 
			
		||||
 | 
			
		||||
    with trio.move_on_after(0.5):
 | 
			
		||||
        for entry in logs:
 | 
			
		||||
            entry = entry.decode()
 | 
			
		||||
            try:
 | 
			
		||||
                record = json.loads(entry.strip())
 | 
			
		||||
            except json.JSONDecodeError:
 | 
			
		||||
                if 'Error' in entry:
 | 
			
		||||
                    raise RuntimeError(entry)
 | 
			
		||||
                # await tractor.breakpoint()
 | 
			
		||||
            msg = record['msg']
 | 
			
		||||
        sudo docker run --mount \
 | 
			
		||||
        type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
 | 
			
		||||
        5993:5993 alpacamarkets/marketstore:latest
 | 
			
		||||
 | 
			
		||||
            if "launching tcp listener for all services..." in msg:
 | 
			
		||||
                started = True
 | 
			
		||||
                break
 | 
			
		||||
    '''
 | 
			
		||||
    # log = get_console_log('info', name=__name__)
 | 
			
		||||
 | 
			
		||||
            await trio.sleep(0)
 | 
			
		||||
 | 
			
		||||
    if not started and ctnr not in client.containers.list():
 | 
			
		||||
        raise RuntimeError(
 | 
			
		||||
            'Failed to start `marketstore` check logs output for deats'
 | 
			
		||||
    # client = docker.from_env(**kwargs)
 | 
			
		||||
    async with open_docker() as client:
 | 
			
		||||
        # create a mount from user's local piker config dir into container
 | 
			
		||||
        config_dir_mnt = docker.types.Mount(
 | 
			
		||||
            target='/etc',
 | 
			
		||||
            source=_config_dir,
 | 
			
		||||
            type='bind',
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    await ctx.started()
 | 
			
		||||
    await tractor.breakpoint()
 | 
			
		||||
        cntr: Container = client.containers.run(
 | 
			
		||||
            'alpacamarkets/marketstore:latest',
 | 
			
		||||
            # do we need this for cmds?
 | 
			
		||||
            # '-i',
 | 
			
		||||
 | 
			
		||||
            # '-p 5993:5993',
 | 
			
		||||
            ports={'5993/tcp': 5993},
 | 
			
		||||
            mounts=[config_dir_mnt],
 | 
			
		||||
            detach=True,
 | 
			
		||||
            # stop_signal='SIGINT',
 | 
			
		||||
            # init=True,
 | 
			
		||||
            # remove=True,
 | 
			
		||||
        )
 | 
			
		||||
        try:
 | 
			
		||||
            started: bool = False
 | 
			
		||||
            logs = cntr.logs(stream=True)
 | 
			
		||||
 | 
			
		||||
            with trio.move_on_after(0.5):
 | 
			
		||||
                for entry in logs:
 | 
			
		||||
                    entry = entry.decode()
 | 
			
		||||
                    try:
 | 
			
		||||
                        record = json.loads(entry.strip())
 | 
			
		||||
                    except json.JSONDecodeError:
 | 
			
		||||
                        if 'Error' in entry:
 | 
			
		||||
                            raise RuntimeError(entry)
 | 
			
		||||
                    msg = record['msg']
 | 
			
		||||
 | 
			
		||||
                    if "launching tcp listener for all services..." in msg:
 | 
			
		||||
                        started = True
 | 
			
		||||
                        break
 | 
			
		||||
 | 
			
		||||
                    await trio.sleep(0)
 | 
			
		||||
 | 
			
		||||
            if not started and cntr not in client.containers.list():
 | 
			
		||||
                raise RuntimeError(
 | 
			
		||||
                    'Failed to start `marketstore` check logs output for deats'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            await ctx.started()
 | 
			
		||||
            await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
        finally:
 | 
			
		||||
            cntr.stop()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def main():
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery(
 | 
			
		||||
        loglevel='info',
 | 
			
		||||
        loglevel='runtime',
 | 
			
		||||
    ) as tn:
 | 
			
		||||
        portal = await tn.start_actor('ahab', enable_modules=[__name__])
 | 
			
		||||
 | 
			
		||||
        async with portal.open_context(
 | 
			
		||||
            open_marketstore_container
 | 
			
		||||
 | 
			
		||||
        ) as (first, ctx):
 | 
			
		||||
        async with (
 | 
			
		||||
            (
 | 
			
		||||
                await tn.start_actor('ahab', enable_modules=[__name__])
 | 
			
		||||
            ).open_context(
 | 
			
		||||
                open_marketstore_container
 | 
			
		||||
            ) as (ctx, first),
 | 
			
		||||
        ):
 | 
			
		||||
            assert not first
 | 
			
		||||
            await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue