Map the grpc port and add graceful container teardown
Not sure how I missed mapping the 5995 grpc port 🤦; done now.
Also adds graceful teardown using SIGINT with included container
logging relayed to the piker console B).
l1_precision_fix
parent
2c51ad2a0d
commit
d9773217e9
|
@ -33,9 +33,9 @@ import docker
|
||||||
import json
|
import json
|
||||||
from docker.models.containers import Container
|
from docker.models.containers import Container
|
||||||
from docker.errors import DockerException, APIError
|
from docker.errors import DockerException, APIError
|
||||||
from requests.exceptions import ConnectionError
|
from requests.exceptions import ConnectionError, ReadTimeout
|
||||||
|
|
||||||
from ..log import get_logger # , get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from .. import config
|
from .. import config
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -52,7 +52,7 @@ _config = '''
|
||||||
root_directory: data
|
root_directory: data
|
||||||
listen_port: 5993
|
listen_port: 5993
|
||||||
grpc_listen_port: 5995
|
grpc_listen_port: 5995
|
||||||
log_level: info
|
log_level: debug
|
||||||
queryable: true
|
queryable: true
|
||||||
stop_grace_period: 0
|
stop_grace_period: 0
|
||||||
wal_rotate_interval: 5
|
wal_rotate_interval: 5
|
||||||
|
@ -98,9 +98,6 @@ async def open_docker(
|
||||||
**kwargs
|
**kwargs
|
||||||
) if url else docker.from_env(**kwargs)
|
) if url else docker.from_env(**kwargs)
|
||||||
yield client
|
yield client
|
||||||
except ConnectionError:
|
|
||||||
# prolly no daemon started
|
|
||||||
raise DockerNotStarted('!?!?')
|
|
||||||
|
|
||||||
except (
|
except (
|
||||||
DockerException,
|
DockerException,
|
||||||
|
@ -111,15 +108,20 @@ async def open_docker(
|
||||||
args = getattr(err, 'args', None)
|
args = getattr(err, 'args', None)
|
||||||
if args:
|
if args:
|
||||||
return args
|
return args
|
||||||
|
else:
|
||||||
|
return str(err)
|
||||||
|
|
||||||
# could be more specific so let's check if it's just perms.
|
# could be more specific so let's check if it's just perms.
|
||||||
if err.args:
|
if err.args:
|
||||||
errs = err.args
|
errs = err.args
|
||||||
for err in errs:
|
for err in errs:
|
||||||
msg = unpack_msg(err)
|
msg = unpack_msg(err)
|
||||||
if msg and 'PermissionError' in msg:
|
if 'PermissionError' in msg:
|
||||||
raise DockerException('You dint run as root yo!')
|
raise DockerException('You dint run as root yo!')
|
||||||
|
|
||||||
|
elif 'FileNotFoundError' in msg:
|
||||||
|
raise DockerNotStarted('Did you start da service sister?')
|
||||||
|
|
||||||
# not perms?
|
# not perms?
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -181,7 +183,7 @@ async def open_marketstore(
|
||||||
5993:5993 alpacamarkets/marketstore:latest
|
5993:5993 alpacamarkets/marketstore:latest
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# log = get_console_log('info', name=__name__)
|
log = get_console_log('info', name=__name__)
|
||||||
|
|
||||||
async with open_docker() as client:
|
async with open_docker() as client:
|
||||||
# create a mount from user's local piker config dir into container
|
# create a mount from user's local piker config dir into container
|
||||||
|
@ -197,46 +199,87 @@ async def open_marketstore(
|
||||||
# '-i',
|
# '-i',
|
||||||
|
|
||||||
# '-p 5993:5993',
|
# '-p 5993:5993',
|
||||||
ports={'5993/tcp': 5993},
|
ports={
|
||||||
|
'5993/tcp': 5993, # jsonrpc
|
||||||
|
'5995/tcp': 5995, # grpc
|
||||||
|
},
|
||||||
mounts=[config_dir_mnt],
|
mounts=[config_dir_mnt],
|
||||||
detach=True,
|
detach=True,
|
||||||
# stop_signal='SIGINT',
|
stop_signal='SIGINT',
|
||||||
# init=True,
|
init=True,
|
||||||
# remove=True,
|
# remove=True,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
started: bool = False
|
seen_so_far = set()
|
||||||
logs = cntr.logs(stream=True)
|
|
||||||
|
|
||||||
with trio.move_on_after(0.5):
|
async def process_logs_until(
|
||||||
|
match: str,
|
||||||
|
bp_on_msg: bool = False,
|
||||||
|
):
|
||||||
|
logs = cntr.logs(stream=True)
|
||||||
for entry in logs:
|
for entry in logs:
|
||||||
entry = entry.decode()
|
entry = entry.decode()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
record = json.loads(entry.strip())
|
record = json.loads(entry.strip())
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
if 'Error' in entry:
|
if 'Error' in entry:
|
||||||
raise RuntimeError(entry)
|
raise RuntimeError(entry)
|
||||||
|
|
||||||
msg = record['msg']
|
msg = record['msg']
|
||||||
|
if msg and entry not in seen_so_far:
|
||||||
|
seen_so_far.add(entry)
|
||||||
|
if bp_on_msg:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
log.info(f'{msg}')
|
||||||
|
|
||||||
if "launching tcp listener for all services..." in msg:
|
# if "launching tcp listener for all services..." in msg:
|
||||||
started = True
|
if match in msg:
|
||||||
break
|
return True
|
||||||
|
|
||||||
|
# do a checkpoint so we don't block if cancelled B)
|
||||||
await trio.sleep(0)
|
await trio.sleep(0)
|
||||||
|
|
||||||
if not started and cntr not in client.containers.list():
|
return False
|
||||||
|
|
||||||
|
with trio.move_on_after(0.5):
|
||||||
|
found = await process_logs_until(
|
||||||
|
"launching tcp listener for all services...",
|
||||||
|
)
|
||||||
|
|
||||||
|
if not found and cntr not in client.containers.list():
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
'Failed to start `marketstore` check logs output for deats'
|
'Failed to start `marketstore` check logs deats'
|
||||||
)
|
)
|
||||||
|
|
||||||
await ctx.started(cntr.id)
|
await ctx.started(cntr.id)
|
||||||
await trio.sleep_forever()
|
await process_logs_until('exiting...',)
|
||||||
|
|
||||||
|
except (
|
||||||
|
trio.Cancelled,
|
||||||
|
KeyboardInterrupt,
|
||||||
|
):
|
||||||
|
cntr.kill('SIGINT')
|
||||||
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
cs.shield = True
|
||||||
|
await process_logs_until('exiting...',)
|
||||||
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
cntr.stop()
|
try:
|
||||||
|
cntr.wait(
|
||||||
|
timeout=0.5,
|
||||||
|
condition='not-running',
|
||||||
|
)
|
||||||
|
except (
|
||||||
|
ReadTimeout,
|
||||||
|
ConnectionError,
|
||||||
|
):
|
||||||
|
cntr.kill()
|
||||||
|
|
||||||
|
|
||||||
async def start_ahab(
|
async def start_ahab(
|
||||||
|
service_name: str,
|
||||||
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -247,7 +290,7 @@ async def start_ahab(
|
||||||
) as tn:
|
) as tn:
|
||||||
|
|
||||||
portal = await tn.start_actor(
|
portal = await tn.start_actor(
|
||||||
'marketstored',
|
service_name,
|
||||||
enable_modules=[__name__]
|
enable_modules=[__name__]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue