Map the grpc port and add graceful container teardown

Not sure how I missed mapping the 5995 grpc port 🤦; done now.
Also adds graceful teardown using SIGINT with included container
logging relayed to the piker console B).
marketstore
Tyler Goodlet 2022-02-19 16:31:31 -05:00
parent a5c1febd12
commit 3eeb1caf32
1 changed files with 66 additions and 23 deletions

View File

@ -33,9 +33,9 @@ import docker
import json
from docker.models.containers import Container
from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError
from requests.exceptions import ConnectionError, ReadTimeout
from ..log import get_logger # , get_console_log
from ..log import get_logger, get_console_log
from .. import config
log = get_logger(__name__)
@ -52,7 +52,7 @@ _config = '''
root_directory: data
listen_port: 5993
grpc_listen_port: 5995
log_level: info
log_level: debug
queryable: true
stop_grace_period: 0
wal_rotate_interval: 5
@ -98,9 +98,6 @@ async def open_docker(
**kwargs
) if url else docker.from_env(**kwargs)
yield client
except ConnectionError:
# prolly no daemon started
raise DockerNotStarted('!?!?')
except (
DockerException,
@ -111,15 +108,20 @@ async def open_docker(
args = getattr(err, 'args', None)
if args:
return args
else:
return str(err)
# could be more specific so let's check if it's just perms.
if err.args:
errs = err.args
for err in errs:
msg = unpack_msg(err)
if msg and 'PermissionError' in msg:
if 'PermissionError' in msg:
raise DockerException('You dint run as root yo!')
elif 'FileNotFoundError' in msg:
raise DockerNotStarted('Did you start da service sister?')
# not perms?
raise
@ -181,7 +183,7 @@ async def open_marketstore(
5993:5993 alpacamarkets/marketstore:latest
'''
# log = get_console_log('info', name=__name__)
log = get_console_log('info', name=__name__)
async with open_docker() as client:
# create a mount from user's local piker config dir into container
@ -197,46 +199,87 @@ async def open_marketstore(
# '-i',
# '-p 5993:5993',
ports={'5993/tcp': 5993},
ports={
'5993/tcp': 5993, # jsonrpc
'5995/tcp': 5995, # grpc
},
mounts=[config_dir_mnt],
detach=True,
# stop_signal='SIGINT',
# init=True,
stop_signal='SIGINT',
init=True,
# remove=True,
)
try:
started: bool = False
logs = cntr.logs(stream=True)
seen_so_far = set()
with trio.move_on_after(0.5):
async def process_logs_until(
match: str,
bp_on_msg: bool = False,
):
logs = cntr.logs(stream=True)
for entry in logs:
entry = entry.decode()
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
msg = record['msg']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
log.info(f'{msg}')
if "launching tcp listener for all services..." in msg:
started = True
break
# if "launching tcp listener for all services..." in msg:
if match in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0)
if not started and cntr not in client.containers.list():
return False
with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...",
)
if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs output for deats'
'Failed to start `marketstore` check logs deats'
)
await ctx.started(cntr.id)
await trio.sleep_forever()
await process_logs_until('exiting...',)
except (
trio.Cancelled,
KeyboardInterrupt,
):
cntr.kill('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await process_logs_until('exiting...',)
raise
finally:
cntr.stop()
try:
cntr.wait(
timeout=0.5,
condition='not-running',
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
async def start_ahab(
service_name: str,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -247,7 +290,7 @@ async def start_ahab(
) as tn:
portal = await tn.start_actor(
'marketstored',
service_name,
enable_modules=[__name__]
)