More reliable `marketstored` + container supervision

It turns out (i guess not so shockingly?) that `marketstore` doesn't
always teardown "gracefully" under SIGINT (seems to hang if there are
open client connections which are also in the midst of teardown?) so
this instead first tries the SIGINT and then fails over to a SIGKILL
(destroy loop) which seems to be much more reliable to ensure shutdown
without any downside - in terms of a "hard kill".

Originally i was thinking the issue was root perms related (which get
relegated solely to the `marketstored` daemon actor after spawn) but
actually it was indeed the signalling / application layer causing the
hold-up/latency on teardown. There's a bunch of lingering (now
commented) code which tried to solve this non-problem as well as a bunch
logging/prints to help decipher the root of the issue - this will all
get cleaned out shortly.
l1_precision_fix
Tyler Goodlet 2022-05-05 21:04:10 -04:00
parent a10dc4fe77
commit a6c5902437
1 changed files with 183 additions and 62 deletions

View File

@ -30,7 +30,7 @@ from trio_typing import TaskStatus
import tractor
import docker
import json
from docker.models.containers import Container
from docker.models.containers import Container as DockerContainer
from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError, ReadTimeout
@ -133,6 +133,136 @@ async def open_docker(
c.kill()
class Container:
'''
Wrapper around a ``docker.models.containers.Container`` to include
log capture and relay through our native logging system and helper
method(s) for cancellation/teardown.
'''
def __init__(
self,
cntr: DockerContainer,
) -> None:
self.cntr = cntr
# log msg de-duplication
self.seen_so_far = set()
async def process_logs_until(
self,
patt: str,
bp_on_msg: bool = False,
) -> bool:
'''
Attempt to capture container log messages and relay through our
native logging system.
'''
seen_so_far = self.seen_so_far
while True:
logs = self.cntr.logs()
entries = logs.decode().split('\n')
for entry in entries:
# ignore null lines
if not entry:
continue
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
if patt in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
return False
def try_signal(
self,
signal: str = 'SIGINT',
) -> bool:
try:
# XXX: market store doesn't seem to shutdown nicely all the
# time with this (maybe because there are still open grpc
# connections?) noticably after client connections have been
# made or are in use/teardown. It works just fine if you
# just start and stop the container tho?..
log.cancel(f'SENDING {signal} to {self.cntr.id}')
self.cntr.kill(signal)
return True
except docker.errors.APIError as err:
# _err = err
if 'is not running' in err.explanation:
return False
async def cancel(
self,
) -> None:
cid = self.cntr.id
self.try_signal('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('PROCESSINGN LOGS')
await self.process_logs_until('initiating graceful shutdown')
# print('SHUTDOWN REPORTED BY CONTAINER')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('waiting on EXITING')
await self.process_logs_until('exiting...',)
# print('got EXITING')
break
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
else:
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
@tractor.context
async def open_marketstored(
ctx: tractor.Context,
@ -175,7 +305,7 @@ async def open_marketstored(
type='bind',
)
cntr: Container = client.containers.run(
dcntr: DockerContainer = client.containers.run(
'alpacamarkets/marketstore:latest',
# do we need this for cmds?
# '-i',
@ -191,77 +321,59 @@ async def open_marketstored(
init=True,
# remove=True,
)
try:
seen_so_far = set()
cntr = Container(dcntr)
async def process_logs_until(
match: str,
bp_on_msg: bool = False,
):
logs = cntr.logs(stream=True)
for entry in logs:
entry = entry.decode()
with trio.move_on_after(1):
found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
# if "launching tcp listener for all services..." in msg:
if match in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0)
return False
with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...",
if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)
if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)
await ctx.started((cntr.cntr.id, os.getpid()))
await ctx.started(cntr.id)
# async with ctx.open_stream() as stream:
# block for the expected "teardown log msg"..
await process_logs_until('exiting...',)
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
# await cntr.cancel()
# with trio.CancelScope(shield=True):
# # block for the expected "teardown log msg"..
# # await cntr.process_logs_until('exiting...',)
# # only msg should be to signal killing the
# # container and this super daemon.
# msg = await stream.receive()
# # print("GOT CANCEL MSG")
# cid = msg['cancel']
# log.cancel(f'Cancelling container {cid}')
# # print("CANCELLING CONTAINER")
# await cntr.cancel()
# # print("SENDING ACK")
# await stream.send('ack')
except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
cntr.kill('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await process_logs_until('exiting...',)
raise
finally:
try:
cntr.wait(
timeout=0.5,
condition='not-running',
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
with trio.CancelScope(shield=True):
await cntr.cancel()
# await stream.send('ack')
raise
async def start_ahab(
@ -311,9 +423,18 @@ async def start_ahab(
open_marketstored,
) as (ctx, first):
assert str(first)
# run till cancelled
cid, pid = first
await trio.sleep_forever()
# async with ctx.open_stream() as stream:
# try:
# # run till cancelled
# await trio.sleep_forever()
# finally:
# with trio.CancelScope(shield=True):
# # print('SENDING CANCEL TO MARKETSTORED')
# await stream.send({'cancel': (cid, pid)})
# assert await stream.receive() == 'ack'
# since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in