Map the grpc port and add graceful container teardown

Not sure how I missed mapping the 5995 grpc port 🤦; done now.
Also adds graceful teardown using SIGINT with included container
logging relayed to the piker console B).
marketstore_backup
Tyler Goodlet 2022-02-19 16:31:31 -05:00
parent abf7b14717
commit 4c5ea60035
1 changed files with 66 additions and 23 deletions

View File

@ -33,9 +33,9 @@ import docker
import json import json
from docker.models.containers import Container from docker.models.containers import Container
from docker.errors import DockerException, APIError from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError from requests.exceptions import ConnectionError, ReadTimeout
from ..log import get_logger # , get_console_log from ..log import get_logger, get_console_log
from .. import config from .. import config
log = get_logger(__name__) log = get_logger(__name__)
@ -52,7 +52,7 @@ _config = '''
root_directory: data root_directory: data
listen_port: 5993 listen_port: 5993
grpc_listen_port: 5995 grpc_listen_port: 5995
log_level: info log_level: debug
queryable: true queryable: true
stop_grace_period: 0 stop_grace_period: 0
wal_rotate_interval: 5 wal_rotate_interval: 5
@ -98,9 +98,6 @@ async def open_docker(
**kwargs **kwargs
) if url else docker.from_env(**kwargs) ) if url else docker.from_env(**kwargs)
yield client yield client
except ConnectionError:
# prolly no daemon started
raise DockerNotStarted('!?!?')
except ( except (
DockerException, DockerException,
@ -111,15 +108,20 @@ async def open_docker(
args = getattr(err, 'args', None) args = getattr(err, 'args', None)
if args: if args:
return args return args
else:
return str(err)
# could be more specific so let's check if it's just perms. # could be more specific so let's check if it's just perms.
if err.args: if err.args:
errs = err.args errs = err.args
for err in errs: for err in errs:
msg = unpack_msg(err) msg = unpack_msg(err)
if msg and 'PermissionError' in msg: if 'PermissionError' in msg:
raise DockerException('You dint run as root yo!') raise DockerException('You dint run as root yo!')
elif 'FileNotFoundError' in msg:
raise DockerNotStarted('Did you start da service sister?')
# not perms? # not perms?
raise raise
@ -181,7 +183,7 @@ async def open_marketstore(
5993:5993 alpacamarkets/marketstore:latest 5993:5993 alpacamarkets/marketstore:latest
''' '''
# log = get_console_log('info', name=__name__) log = get_console_log('info', name=__name__)
async with open_docker() as client: async with open_docker() as client:
# create a mount from user's local piker config dir into container # create a mount from user's local piker config dir into container
@ -197,46 +199,87 @@ async def open_marketstore(
# '-i', # '-i',
# '-p 5993:5993', # '-p 5993:5993',
ports={'5993/tcp': 5993}, ports={
'5993/tcp': 5993, # jsonrpc
'5995/tcp': 5995, # grpc
},
mounts=[config_dir_mnt], mounts=[config_dir_mnt],
detach=True, detach=True,
# stop_signal='SIGINT', stop_signal='SIGINT',
# init=True, init=True,
# remove=True, # remove=True,
) )
try: try:
started: bool = False seen_so_far = set()
logs = cntr.logs(stream=True)
with trio.move_on_after(0.5): async def process_logs_until(
match: str,
bp_on_msg: bool = False,
):
logs = cntr.logs(stream=True)
for entry in logs: for entry in logs:
entry = entry.decode() entry = entry.decode()
try: try:
record = json.loads(entry.strip()) record = json.loads(entry.strip())
except json.JSONDecodeError: except json.JSONDecodeError:
if 'Error' in entry: if 'Error' in entry:
raise RuntimeError(entry) raise RuntimeError(entry)
msg = record['msg'] msg = record['msg']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
log.info(f'{msg}')
if "launching tcp listener for all services..." in msg: # if "launching tcp listener for all services..." in msg:
started = True if match in msg:
break return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0) await trio.sleep(0)
if not started and cntr not in client.containers.list(): return False
with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...",
)
if not found and cntr not in client.containers.list():
raise RuntimeError( raise RuntimeError(
'Failed to start `marketstore` check logs output for deats' 'Failed to start `marketstore` check logs deats'
) )
await ctx.started(cntr.id) await ctx.started(cntr.id)
await trio.sleep_forever() await process_logs_until('exiting...',)
except (
trio.Cancelled,
KeyboardInterrupt,
):
cntr.kill('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await process_logs_until('exiting...',)
raise
finally: finally:
cntr.stop() try:
cntr.wait(
timeout=0.5,
condition='not-running',
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
async def start_ahab( async def start_ahab(
service_name: str,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -247,7 +290,7 @@ async def start_ahab(
) as tn: ) as tn:
portal = await tn.start_actor( portal = await tn.start_actor(
'marketstored', service_name,
enable_modules=[__name__] enable_modules=[__name__]
) )