diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 52088e91..14e278a7 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -30,7 +30,7 @@ from trio_typing import TaskStatus import tractor import docker import json -from docker.models.containers import Container +from docker.models.containers import Container as DockerContainer from docker.errors import DockerException, APIError from requests.exceptions import ConnectionError, ReadTimeout @@ -133,6 +133,136 @@ async def open_docker( c.kill() +class Container: + ''' + Wrapper around a ``docker.models.containers.Container`` to include + log capture and relay through our native logging system and helper + method(s) for cancellation/teardown. + + ''' + def __init__( + self, + cntr: DockerContainer, + ) -> None: + + self.cntr = cntr + # log msg de-duplication + self.seen_so_far = set() + + async def process_logs_until( + self, + patt: str, + bp_on_msg: bool = False, + ) -> bool: + ''' + Attempt to capture container log messages and relay through our + native logging system. + + ''' + seen_so_far = self.seen_so_far + + while True: + logs = self.cntr.logs() + entries = logs.decode().split('\n') + for entry in entries: + + # ignore null lines + if not entry: + continue + + try: + record = json.loads(entry.strip()) + except json.JSONDecodeError: + if 'Error' in entry: + raise RuntimeError(entry) + raise + + msg = record['msg'] + level = record['level'] + if msg and entry not in seen_so_far: + seen_so_far.add(entry) + if bp_on_msg: + await tractor.breakpoint() + + getattr(log, level, log.error)(f'{msg}') + + if patt in msg: + return True + + # do a checkpoint so we don't block if cancelled B) + await trio.sleep(0.01) + + return False + + def try_signal( + self, + signal: str = 'SIGINT', + + ) -> bool: + try: + # XXX: market store doesn't seem to shutdown nicely all the + # time with this (maybe because there are still open grpc + # connections?) noticably after client connections have been + # made or are in use/teardown. It works just fine if you + # just start and stop the container tho?.. + log.cancel(f'SENDING {signal} to {self.cntr.id}') + self.cntr.kill(signal) + return True + + except docker.errors.APIError as err: + # _err = err + if 'is not running' in err.explanation: + return False + + async def cancel( + self, + ) -> None: + + cid = self.cntr.id + self.try_signal('SIGINT') + + with trio.move_on_after(0.5) as cs: + cs.shield = True + # print('PROCESSINGN LOGS') + await self.process_logs_until('initiating graceful shutdown') + # print('SHUTDOWN REPORTED BY CONTAINER') + await self.process_logs_until('exiting...',) + + for _ in range(10): + with trio.move_on_after(0.5) as cs: + cs.shield = True + # print('waiting on EXITING') + await self.process_logs_until('exiting...',) + # print('got EXITING') + break + + if cs.cancelled_caught: + # get out the big guns, bc apparently marketstore + # doesn't actually know how to terminate gracefully + # :eyeroll:... + self.try_signal('SIGKILL') + + try: + log.info('Waiting on container shutdown: {cid}') + self.cntr.wait( + timeout=0.1, + condition='not-running', + ) + break + + except ( + ReadTimeout, + ConnectionError, + ): + log.error(f'failed to wait on container {cid}') + raise + + else: + raise RuntimeError('Failed to cancel container {cid}') + + log.cancel(f'Container stopped: {cid}') + + @tractor.context async def open_marketstored( ctx: tractor.Context, @@ -175,7 +305,7 @@ async def open_marketstored( type='bind', ) - cntr: Container = client.containers.run( + dcntr: DockerContainer = client.containers.run( 'alpacamarkets/marketstore:latest', # do we need this for cmds? # '-i', @@ -191,77 +321,59 @@ async def open_marketstored( init=True, # remove=True, ) - try: - seen_so_far = set() + cntr = Container(dcntr) - async def process_logs_until( - match: str, - bp_on_msg: bool = False, - ): - logs = cntr.logs(stream=True) - for entry in logs: - entry = entry.decode() + with trio.move_on_after(1): + found = await cntr.process_logs_until( + "launching tcp listener for all services...", + ) - try: - record = json.loads(entry.strip()) - except json.JSONDecodeError: - if 'Error' in entry: - raise RuntimeError(entry) - - msg = record['msg'] - level = record['level'] - if msg and entry not in seen_so_far: - seen_so_far.add(entry) - if bp_on_msg: - await tractor.breakpoint() - getattr(log, level, log.error)(f'{msg}') - - # if "launching tcp listener for all services..." in msg: - if match in msg: - return True - - # do a checkpoint so we don't block if cancelled B) - await trio.sleep(0) - - return False - - with trio.move_on_after(0.5): - found = await process_logs_until( - "launching tcp listener for all services...", + if not found and cntr not in client.containers.list(): + raise RuntimeError( + 'Failed to start `marketstore` check logs deats' ) - if not found and cntr not in client.containers.list(): - raise RuntimeError( - 'Failed to start `marketstore` check logs deats' - ) + await ctx.started((cntr.cntr.id, os.getpid())) - await ctx.started(cntr.id) + # async with ctx.open_stream() as stream: - # block for the expected "teardown log msg".. - await process_logs_until('exiting...',) + try: + + # TODO: we might eventually want a proxy-style msg-prot here + # to allow remote control of containers without needing + # callers to have root perms? + await trio.sleep_forever() + + # await cntr.cancel() + # with trio.CancelScope(shield=True): + # # block for the expected "teardown log msg".. + # # await cntr.process_logs_until('exiting...',) + + # # only msg should be to signal killing the + # # container and this super daemon. + # msg = await stream.receive() + # # print("GOT CANCEL MSG") + + # cid = msg['cancel'] + # log.cancel(f'Cancelling container {cid}') + + # # print("CANCELLING CONTAINER") + # await cntr.cancel() + + # # print("SENDING ACK") + # await stream.send('ack') except ( BaseException, # trio.Cancelled, # KeyboardInterrupt, ): - cntr.kill('SIGINT') - with trio.move_on_after(0.5) as cs: - cs.shield = True - await process_logs_until('exiting...',) - raise - finally: - try: - cntr.wait( - timeout=0.5, - condition='not-running', - ) - except ( - ReadTimeout, - ConnectionError, - ): - cntr.kill() + with trio.CancelScope(shield=True): + await cntr.cancel() + # await stream.send('ack') + + raise async def start_ahab( @@ -311,9 +423,18 @@ async def start_ahab( open_marketstored, ) as (ctx, first): - assert str(first) - # run till cancelled + cid, pid = first + await trio.sleep_forever() + # async with ctx.open_stream() as stream: + # try: + # # run till cancelled + # await trio.sleep_forever() + # finally: + # with trio.CancelScope(shield=True): + # # print('SENDING CANCEL TO MARKETSTORED') + # await stream.send({'cancel': (cid, pid)}) + # assert await stream.receive() == 'ack' # since we demoted root perms in this parent # we'll get a perms error on proc cleanup in