More reliable `marketstored` + container supervision
It turns out (i guess not so shockingly?) that `marketstore` doesn't always teardown "gracefully" under SIGINT (seems to hang if there are open client connections which are also in the midst of teardown?) so this instead first tries the SIGINT and then fails over to a SIGKILL (destroy loop) which seems to be much more reliable to ensure shutdown without any downside - in terms of a "hard kill". Originally i was thinking the issue was root perms related (which get relegated solely to the `marketstored` daemon actor after spawn) but actually it was indeed the signalling / application layer causing the hold-up/latency on teardown. There's a bunch of lingering (now commented) code which tried to solve this non-problem as well as a bunch logging/prints to help decipher the root of the issue - this will all get cleaned out shortly.incr_update_backup
parent
1abe7d87a5
commit
d9e2666e80
|
@ -30,7 +30,7 @@ from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
import docker
|
import docker
|
||||||
import json
|
import json
|
||||||
from docker.models.containers import Container
|
from docker.models.containers import Container as DockerContainer
|
||||||
from docker.errors import DockerException, APIError
|
from docker.errors import DockerException, APIError
|
||||||
from requests.exceptions import ConnectionError, ReadTimeout
|
from requests.exceptions import ConnectionError, ReadTimeout
|
||||||
|
|
||||||
|
@ -133,6 +133,136 @@ async def open_docker(
|
||||||
c.kill()
|
c.kill()
|
||||||
|
|
||||||
|
|
||||||
|
class Container:
|
||||||
|
'''
|
||||||
|
Wrapper around a ``docker.models.containers.Container`` to include
|
||||||
|
log capture and relay through our native logging system and helper
|
||||||
|
method(s) for cancellation/teardown.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
cntr: DockerContainer,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
self.cntr = cntr
|
||||||
|
# log msg de-duplication
|
||||||
|
self.seen_so_far = set()
|
||||||
|
|
||||||
|
async def process_logs_until(
|
||||||
|
self,
|
||||||
|
patt: str,
|
||||||
|
bp_on_msg: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Attempt to capture container log messages and relay through our
|
||||||
|
native logging system.
|
||||||
|
|
||||||
|
'''
|
||||||
|
seen_so_far = self.seen_so_far
|
||||||
|
|
||||||
|
while True:
|
||||||
|
logs = self.cntr.logs()
|
||||||
|
entries = logs.decode().split('\n')
|
||||||
|
for entry in entries:
|
||||||
|
|
||||||
|
# ignore null lines
|
||||||
|
if not entry:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
record = json.loads(entry.strip())
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
if 'Error' in entry:
|
||||||
|
raise RuntimeError(entry)
|
||||||
|
raise
|
||||||
|
|
||||||
|
msg = record['msg']
|
||||||
|
level = record['level']
|
||||||
|
if msg and entry not in seen_so_far:
|
||||||
|
seen_so_far.add(entry)
|
||||||
|
if bp_on_msg:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
|
getattr(log, level, log.error)(f'{msg}')
|
||||||
|
|
||||||
|
if patt in msg:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# do a checkpoint so we don't block if cancelled B)
|
||||||
|
await trio.sleep(0.01)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def try_signal(
|
||||||
|
self,
|
||||||
|
signal: str = 'SIGINT',
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
try:
|
||||||
|
# XXX: market store doesn't seem to shutdown nicely all the
|
||||||
|
# time with this (maybe because there are still open grpc
|
||||||
|
# connections?) noticably after client connections have been
|
||||||
|
# made or are in use/teardown. It works just fine if you
|
||||||
|
# just start and stop the container tho?..
|
||||||
|
log.cancel(f'SENDING {signal} to {self.cntr.id}')
|
||||||
|
self.cntr.kill(signal)
|
||||||
|
return True
|
||||||
|
|
||||||
|
except docker.errors.APIError as err:
|
||||||
|
# _err = err
|
||||||
|
if 'is not running' in err.explanation:
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def cancel(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
cid = self.cntr.id
|
||||||
|
self.try_signal('SIGINT')
|
||||||
|
|
||||||
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
cs.shield = True
|
||||||
|
# print('PROCESSINGN LOGS')
|
||||||
|
await self.process_logs_until('initiating graceful shutdown')
|
||||||
|
# print('SHUTDOWN REPORTED BY CONTAINER')
|
||||||
|
await self.process_logs_until('exiting...',)
|
||||||
|
|
||||||
|
for _ in range(10):
|
||||||
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
cs.shield = True
|
||||||
|
# print('waiting on EXITING')
|
||||||
|
await self.process_logs_until('exiting...',)
|
||||||
|
# print('got EXITING')
|
||||||
|
break
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
# get out the big guns, bc apparently marketstore
|
||||||
|
# doesn't actually know how to terminate gracefully
|
||||||
|
# :eyeroll:...
|
||||||
|
self.try_signal('SIGKILL')
|
||||||
|
|
||||||
|
try:
|
||||||
|
log.info('Waiting on container shutdown: {cid}')
|
||||||
|
self.cntr.wait(
|
||||||
|
timeout=0.1,
|
||||||
|
condition='not-running',
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
except (
|
||||||
|
ReadTimeout,
|
||||||
|
ConnectionError,
|
||||||
|
):
|
||||||
|
log.error(f'failed to wait on container {cid}')
|
||||||
|
raise
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise RuntimeError('Failed to cancel container {cid}')
|
||||||
|
|
||||||
|
log.cancel(f'Container stopped: {cid}')
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def open_marketstored(
|
async def open_marketstored(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
|
@ -175,7 +305,7 @@ async def open_marketstored(
|
||||||
type='bind',
|
type='bind',
|
||||||
)
|
)
|
||||||
|
|
||||||
cntr: Container = client.containers.run(
|
dcntr: DockerContainer = client.containers.run(
|
||||||
'alpacamarkets/marketstore:latest',
|
'alpacamarkets/marketstore:latest',
|
||||||
# do we need this for cmds?
|
# do we need this for cmds?
|
||||||
# '-i',
|
# '-i',
|
||||||
|
@ -191,77 +321,59 @@ async def open_marketstored(
|
||||||
init=True,
|
init=True,
|
||||||
# remove=True,
|
# remove=True,
|
||||||
)
|
)
|
||||||
try:
|
cntr = Container(dcntr)
|
||||||
seen_so_far = set()
|
|
||||||
|
|
||||||
async def process_logs_until(
|
with trio.move_on_after(1):
|
||||||
match: str,
|
found = await cntr.process_logs_until(
|
||||||
bp_on_msg: bool = False,
|
"launching tcp listener for all services...",
|
||||||
):
|
)
|
||||||
logs = cntr.logs(stream=True)
|
|
||||||
for entry in logs:
|
|
||||||
entry = entry.decode()
|
|
||||||
|
|
||||||
try:
|
if not found and cntr not in client.containers.list():
|
||||||
record = json.loads(entry.strip())
|
raise RuntimeError(
|
||||||
except json.JSONDecodeError:
|
'Failed to start `marketstore` check logs deats'
|
||||||
if 'Error' in entry:
|
|
||||||
raise RuntimeError(entry)
|
|
||||||
|
|
||||||
msg = record['msg']
|
|
||||||
level = record['level']
|
|
||||||
if msg and entry not in seen_so_far:
|
|
||||||
seen_so_far.add(entry)
|
|
||||||
if bp_on_msg:
|
|
||||||
await tractor.breakpoint()
|
|
||||||
getattr(log, level, log.error)(f'{msg}')
|
|
||||||
|
|
||||||
# if "launching tcp listener for all services..." in msg:
|
|
||||||
if match in msg:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# do a checkpoint so we don't block if cancelled B)
|
|
||||||
await trio.sleep(0)
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
with trio.move_on_after(0.5):
|
|
||||||
found = await process_logs_until(
|
|
||||||
"launching tcp listener for all services...",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if not found and cntr not in client.containers.list():
|
await ctx.started((cntr.cntr.id, os.getpid()))
|
||||||
raise RuntimeError(
|
|
||||||
'Failed to start `marketstore` check logs deats'
|
|
||||||
)
|
|
||||||
|
|
||||||
await ctx.started(cntr.id)
|
# async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
# block for the expected "teardown log msg"..
|
try:
|
||||||
await process_logs_until('exiting...',)
|
|
||||||
|
# TODO: we might eventually want a proxy-style msg-prot here
|
||||||
|
# to allow remote control of containers without needing
|
||||||
|
# callers to have root perms?
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
# await cntr.cancel()
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# # block for the expected "teardown log msg"..
|
||||||
|
# # await cntr.process_logs_until('exiting...',)
|
||||||
|
|
||||||
|
# # only msg should be to signal killing the
|
||||||
|
# # container and this super daemon.
|
||||||
|
# msg = await stream.receive()
|
||||||
|
# # print("GOT CANCEL MSG")
|
||||||
|
|
||||||
|
# cid = msg['cancel']
|
||||||
|
# log.cancel(f'Cancelling container {cid}')
|
||||||
|
|
||||||
|
# # print("CANCELLING CONTAINER")
|
||||||
|
# await cntr.cancel()
|
||||||
|
|
||||||
|
# # print("SENDING ACK")
|
||||||
|
# await stream.send('ack')
|
||||||
|
|
||||||
except (
|
except (
|
||||||
BaseException,
|
BaseException,
|
||||||
# trio.Cancelled,
|
# trio.Cancelled,
|
||||||
# KeyboardInterrupt,
|
# KeyboardInterrupt,
|
||||||
):
|
):
|
||||||
cntr.kill('SIGINT')
|
|
||||||
with trio.move_on_after(0.5) as cs:
|
|
||||||
cs.shield = True
|
|
||||||
await process_logs_until('exiting...',)
|
|
||||||
raise
|
|
||||||
|
|
||||||
finally:
|
with trio.CancelScope(shield=True):
|
||||||
try:
|
await cntr.cancel()
|
||||||
cntr.wait(
|
# await stream.send('ack')
|
||||||
timeout=0.5,
|
|
||||||
condition='not-running',
|
raise
|
||||||
)
|
|
||||||
except (
|
|
||||||
ReadTimeout,
|
|
||||||
ConnectionError,
|
|
||||||
):
|
|
||||||
cntr.kill()
|
|
||||||
|
|
||||||
|
|
||||||
async def start_ahab(
|
async def start_ahab(
|
||||||
|
@ -311,9 +423,18 @@ async def start_ahab(
|
||||||
open_marketstored,
|
open_marketstored,
|
||||||
) as (ctx, first):
|
) as (ctx, first):
|
||||||
|
|
||||||
assert str(first)
|
cid, pid = first
|
||||||
# run till cancelled
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
# async with ctx.open_stream() as stream:
|
||||||
|
# try:
|
||||||
|
# # run till cancelled
|
||||||
|
# await trio.sleep_forever()
|
||||||
|
# finally:
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# # print('SENDING CANCEL TO MARKETSTORED')
|
||||||
|
# await stream.send({'cancel': (cid, pid)})
|
||||||
|
# assert await stream.receive() == 'ack'
|
||||||
|
|
||||||
# since we demoted root perms in this parent
|
# since we demoted root perms in this parent
|
||||||
# we'll get a perms error on proc cleanup in
|
# we'll get a perms error on proc cleanup in
|
||||||
|
|
Loading…
Reference in New Issue