Harden container cancel-and-wait supervisor loop
This should hopefully make teardown more reliable and includes better logic to fail over to a hard kill path after a 3 second timeout waiting for the instance to complete using the `docker-py` wait API. Also generalize the supervisor teardown loop by allowing the container config endpoint to return 2 msgs to expect: - a startup message that can be read from the container's internal process logging that indicates it is fully up and ready. - a teardown msg that can be polled for that indicates the container has gracefully terminated after a cancellation request which is passed to our container wrappers `.cancel()` method. Make the marketstore config endpoint return the 2 messages we previously had hard coded and use this new api.contain_mkts
parent
dcee0ddd55
commit
b8b76a32a6
|
@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers.
|
|||
|
||||
'''
|
||||
import os
|
||||
import time
|
||||
from typing import (
|
||||
Optional,
|
||||
Callable,
|
||||
|
@ -186,45 +187,62 @@ class Container:
|
|||
|
||||
async def cancel(
|
||||
self,
|
||||
stop_msg: str,
|
||||
) -> None:
|
||||
|
||||
cid = self.cntr.id
|
||||
# first try a graceful cancel
|
||||
log.cancel(
|
||||
f'SIGINT cancelling container: {cid}\n'
|
||||
f'waiting on stop msg: "{stop_msg}"'
|
||||
)
|
||||
self.try_signal('SIGINT')
|
||||
|
||||
with trio.move_on_after(0.5) as cs:
|
||||
cs.shield = True
|
||||
await self.process_logs_until('initiating graceful shutdown')
|
||||
await self.process_logs_until('exiting...',)
|
||||
start = time.time()
|
||||
for _ in range(30):
|
||||
|
||||
for _ in range(10):
|
||||
with trio.move_on_after(0.5) as cs:
|
||||
cs.shield = True
|
||||
await self.process_logs_until('exiting...',)
|
||||
await self.process_logs_until(stop_msg)
|
||||
|
||||
# if we aren't cancelled on above checkpoint then we
|
||||
# assume we read the expected stop msg and terminated.
|
||||
break
|
||||
|
||||
if cs.cancelled_caught:
|
||||
# get out the big guns, bc apparently marketstore
|
||||
# doesn't actually know how to terminate gracefully
|
||||
# :eyeroll:...
|
||||
self.try_signal('SIGKILL')
|
||||
try:
|
||||
log.info(f'Polling for container shutdown:\n{cid}')
|
||||
self.cntr.wait(
|
||||
timeout=0.1,
|
||||
condition='not-running',
|
||||
)
|
||||
break
|
||||
|
||||
try:
|
||||
log.info('Waiting on container shutdown: {cid}')
|
||||
self.cntr.wait(
|
||||
timeout=0.1,
|
||||
condition='not-running',
|
||||
)
|
||||
break
|
||||
|
||||
except (
|
||||
ReadTimeout,
|
||||
ConnectionError,
|
||||
):
|
||||
log.error(f'failed to wait on container {cid}')
|
||||
raise
|
||||
except (
|
||||
ReadTimeout,
|
||||
):
|
||||
log.info(f'Still waiting on container:\n{cid}')
|
||||
continue
|
||||
|
||||
except (
|
||||
docker.errors.APIError,
|
||||
ConnectionError,
|
||||
):
|
||||
log.exception(f'Docker connection failure')
|
||||
break
|
||||
else:
|
||||
raise RuntimeError('Failed to cancel container {cid}')
|
||||
delay = time.time() - start
|
||||
log.error(
|
||||
f'Failed to kill container {cid} after {delay}s\n'
|
||||
'sending SIGKILL..'
|
||||
)
|
||||
# get out the big guns, bc apparently marketstore
|
||||
# doesn't actually know how to terminate gracefully
|
||||
# :eyeroll:...
|
||||
self.try_signal('SIGKILL')
|
||||
self.cntr.wait(
|
||||
timeout=3,
|
||||
condition='not-running',
|
||||
)
|
||||
|
||||
log.cancel(f'Container stopped: {cid}')
|
||||
|
||||
|
@ -245,13 +263,18 @@ async def open_ahabd(
|
|||
# params, etc. passing to ``Containter.run()``?
|
||||
# call into endpoint for container config/init
|
||||
ep_func = NamespacePath(endpoint).load_ref()
|
||||
dcntr, cntr_config = ep_func(client)
|
||||
(
|
||||
dcntr,
|
||||
cntr_config,
|
||||
start_msg,
|
||||
stop_msg,
|
||||
) = ep_func(client)
|
||||
cntr = Container(dcntr)
|
||||
|
||||
with trio.move_on_after(1):
|
||||
found = await cntr.process_logs_until(
|
||||
"launching tcp listener for all services...",
|
||||
)
|
||||
found = await cntr.process_logs_until(start_msg)
|
||||
# "launching tcp listener for all services...",
|
||||
# )
|
||||
|
||||
if not found and cntr not in client.containers.list():
|
||||
raise RuntimeError(
|
||||
|
@ -278,7 +301,7 @@ async def open_ahabd(
|
|||
):
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
await cntr.cancel()
|
||||
await cntr.cancel(stop_msg)
|
||||
|
||||
raise
|
||||
|
||||
|
|
|
@ -185,7 +185,14 @@ def start_marketstore(
|
|||
init=True,
|
||||
# remove=True,
|
||||
)
|
||||
return dcntr, _config
|
||||
return (
|
||||
dcntr,
|
||||
_config,
|
||||
|
||||
# expected startup and stop msgs
|
||||
"launching tcp listener for all services...",
|
||||
"exiting...",
|
||||
)
|
||||
|
||||
|
||||
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
|
||||
|
|
Loading…
Reference in New Issue