Add a super simple `marketstore` container supervisor
							parent
							
								
									1cdb94374c
								
							
						
					
					
						commit
						fbd3d1e308
					
				|  | @ -18,25 +18,33 @@ | |||
| Supervisor for docker with included specific-image service helpers. | ||||
| 
 | ||||
| ''' | ||||
| from typing import Optional | ||||
| from contextlib import contextmanager as cm | ||||
| from typing import ( | ||||
|     Optional, | ||||
|     # Any, | ||||
| ) | ||||
| from contextlib import asynccontextmanager as acm | ||||
| # import time | ||||
| 
 | ||||
| import trio | ||||
| import tractor | ||||
| import docker | ||||
| import json | ||||
| # from docker.containers import Container | ||||
| from requests import ConnectionError | ||||
| from docker.models.containers import Container | ||||
| 
 | ||||
| from ..log import get_logger, get_console_log | ||||
| from ..log import get_logger  # , get_console_log | ||||
| from ..config import _config_dir | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| _config = ''' | ||||
| # piker's ``marketstore`` config. | ||||
| 
 | ||||
| # mount this config using: | ||||
| # sudo docker run --mount type=bind,source="$HOME/.config/piker/",target="/etc" -i -p 5993:5993 alpacamarkets/marketstore:latest | ||||
| # sudo docker run --mount \ | ||||
| # type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ | ||||
| # 5993:5993 alpacamarkets/marketstore:latest | ||||
| 
 | ||||
| root_directory: data | ||||
| listen_port: 5993 | ||||
| grpc_listen_port: 5995 | ||||
|  | @ -68,18 +76,58 @@ triggers: | |||
| ''' | ||||
| 
 | ||||
| 
 | ||||
| @cm | ||||
| def open_docker( | ||||
| @acm | ||||
| async def open_docker( | ||||
|     url: Optional[str] = None, | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> docker.DockerClient: | ||||
| 
 | ||||
|     # yield docker.Client( | ||||
|     #     base_url=url, | ||||
|     #     **kwargs | ||||
|     # ) if url else | ||||
|     yield docker.from_env(**kwargs) | ||||
|     client = docker.DockerClient( | ||||
|         base_url=url, | ||||
|         **kwargs | ||||
|     ) if url else docker.from_env(**kwargs) | ||||
| 
 | ||||
|     try: | ||||
|         yield client | ||||
|     finally: | ||||
|         # for c in client.containers.list(): | ||||
|         #     c.kill() | ||||
|         client.close() | ||||
|         # client.api._custom_adapter.close() | ||||
| 
 | ||||
| 
 | ||||
| # async def waitfor( | ||||
| #     cntr: Container, | ||||
| #     attr_path: tuple[str], | ||||
| #     expect=None, | ||||
| #     timeout: float = 0.5, | ||||
| 
 | ||||
| # ) -> Any: | ||||
| #     ''' | ||||
| #     Wait for a container's attr value to be set. If ``expect`` is | ||||
| #     provided wait for the value to be set to that value. | ||||
| 
 | ||||
| #     This is an async version of the helper from our ``pytest-dockerctl`` | ||||
| #     plugin. | ||||
| 
 | ||||
| #     ''' | ||||
| #     def get(val, path): | ||||
| #         for key in path: | ||||
| #             val = val[key] | ||||
| #         return val | ||||
| 
 | ||||
| #     start = time.time() | ||||
| #     while time.time() - start < timeout: | ||||
| #         cntr.reload() | ||||
| #         val = get(cntr.attrs, attr_path) | ||||
| #         if expect is None and val: | ||||
| #             return val | ||||
| #         elif val == expect: | ||||
| #             return val | ||||
| #     else: | ||||
| #         raise TimeoutError("{} failed to be {}, value: \"{}\"".format( | ||||
| #             attr_path, expect if expect else 'not None', val)) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  | @ -88,64 +136,88 @@ async def open_marketstore_container( | |||
|     **kwargs, | ||||
| 
 | ||||
| ) -> None: | ||||
|     log = get_console_log('info', name=__name__) | ||||
|     # this cli should "just work" | ||||
|     # sudo docker run --mount | ||||
|     # type=bind,source="$HOME/.config/piker/",target="/etc" -i -p | ||||
|     # 5993:5993 alpacamarkets/marketstore:latest | ||||
|     client = docker.from_env(**kwargs) | ||||
|     ''' | ||||
|     Start and supervise a marketstore instance with its config bind-mounted | ||||
|     in from the piker config directory on the system. | ||||
| 
 | ||||
|     # with open_docker() as client: | ||||
|     ctnr = client.containers.run( | ||||
|         'alpacamarkets/marketstore:latest', | ||||
|         [ | ||||
|             '--mount', | ||||
|             'type=bind,source="$HOME/.config/piker/",target="/etc"', | ||||
|             '-i', | ||||
|             '-p 5993:5993', | ||||
|         ], | ||||
|         detach=True, | ||||
|     ) | ||||
|     started: bool = False | ||||
|     logs = ctnr.logs(stream=True) | ||||
|     The equivalent cli cmd to this code is: | ||||
| 
 | ||||
|     with trio.move_on_after(0.5): | ||||
|         for entry in logs: | ||||
|             entry = entry.decode() | ||||
|             try: | ||||
|                 record = json.loads(entry.strip()) | ||||
|             except json.JSONDecodeError: | ||||
|                 if 'Error' in entry: | ||||
|                     raise RuntimeError(entry) | ||||
|                 # await tractor.breakpoint() | ||||
|             msg = record['msg'] | ||||
|         sudo docker run --mount \ | ||||
|         type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ | ||||
|         5993:5993 alpacamarkets/marketstore:latest | ||||
| 
 | ||||
|             if "launching tcp listener for all services..." in msg: | ||||
|                 started = True | ||||
|                 break | ||||
|     ''' | ||||
|     # log = get_console_log('info', name=__name__) | ||||
| 
 | ||||
|             await trio.sleep(0) | ||||
| 
 | ||||
|     if not started and ctnr not in client.containers.list(): | ||||
|         raise RuntimeError( | ||||
|             'Failed to start `marketstore` check logs output for deats' | ||||
|     # client = docker.from_env(**kwargs) | ||||
|     async with open_docker() as client: | ||||
|         # create a mount from user's local piker config dir into container | ||||
|         config_dir_mnt = docker.types.Mount( | ||||
|             target='/etc', | ||||
|             source=_config_dir, | ||||
|             type='bind', | ||||
|         ) | ||||
| 
 | ||||
|     await ctx.started() | ||||
|     await tractor.breakpoint() | ||||
|         cntr: Container = client.containers.run( | ||||
|             'alpacamarkets/marketstore:latest', | ||||
|             # do we need this for cmds? | ||||
|             # '-i', | ||||
| 
 | ||||
|             # '-p 5993:5993', | ||||
|             ports={'5993/tcp': 5993}, | ||||
|             mounts=[config_dir_mnt], | ||||
|             detach=True, | ||||
|             # stop_signal='SIGINT', | ||||
|             # init=True, | ||||
|             # remove=True, | ||||
|         ) | ||||
|         try: | ||||
|             started: bool = False | ||||
|             logs = cntr.logs(stream=True) | ||||
| 
 | ||||
|             with trio.move_on_after(0.5): | ||||
|                 for entry in logs: | ||||
|                     entry = entry.decode() | ||||
|                     try: | ||||
|                         record = json.loads(entry.strip()) | ||||
|                     except json.JSONDecodeError: | ||||
|                         if 'Error' in entry: | ||||
|                             raise RuntimeError(entry) | ||||
|                     msg = record['msg'] | ||||
| 
 | ||||
|                     if "launching tcp listener for all services..." in msg: | ||||
|                         started = True | ||||
|                         break | ||||
| 
 | ||||
|                     await trio.sleep(0) | ||||
| 
 | ||||
|             if not started and cntr not in client.containers.list(): | ||||
|                 raise RuntimeError( | ||||
|                     'Failed to start `marketstore` check logs output for deats' | ||||
|                 ) | ||||
| 
 | ||||
|             await ctx.started() | ||||
|             await trio.sleep_forever() | ||||
| 
 | ||||
|         finally: | ||||
|             cntr.stop() | ||||
| 
 | ||||
| 
 | ||||
| async def main(): | ||||
| 
 | ||||
|     async with tractor.open_nursery( | ||||
|         loglevel='info', | ||||
|         loglevel='runtime', | ||||
|     ) as tn: | ||||
|         portal = await tn.start_actor('ahab', enable_modules=[__name__]) | ||||
| 
 | ||||
|         async with portal.open_context( | ||||
|             open_marketstore_container | ||||
| 
 | ||||
|         ) as (first, ctx): | ||||
|         async with ( | ||||
|             ( | ||||
|                 await tn.start_actor('ahab', enable_modules=[__name__]) | ||||
|             ).open_context( | ||||
|                 open_marketstore_container | ||||
|             ) as (ctx, first), | ||||
|         ): | ||||
|             assert not first | ||||
|             await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     trio.run(main) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue