diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 38d4a9e7..7c3133e1 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -15,7 +15,8 @@ # along with this program. If not, see . ''' -Supervisor for docker with included specific-image service helpers. +Supervisor for ``docker`` with included async and SC wrapping +to ensure a cancellable container lifetime system. ''' from collections import ChainMap @@ -349,8 +350,8 @@ async def open_ahabd( ( dcntr, cntr_config, - start_lambda, - stop_lambda, + start_pred, + stop_pred, ) = ep_func(client) cntr = Container(dcntr) @@ -375,48 +376,58 @@ async def open_ahabd( # when read using: # ``json.loads(entry for entry in DockerContainer.logs())`` 'log_msg_key': 'msg', + + + # startup sync func, like `Nursery.started()` + 'started_afunc': None, }, ) - with trio.move_on_after(conf['startup_timeout']) as cs: - async with trio.open_nursery() as tn: - tn.start_soon( - partial( - cntr.process_logs_until, - log_msg_key=conf['log_msg_key'], - patt_matcher=start_lambda, - checkpoint_period=conf['startup_query_period'], - ) - ) - - # poll for container startup or timeout - while not cs.cancel_called: - if dcntr in client.containers.list(): - break - - await trio.sleep(conf['startup_query_period']) - - # sync with remote caller actor-task but allow log - # processing to continue running in bg. - await ctx.started(( - cntr.cntr.id, - os.getpid(), - cntr_config, - )) - try: - # XXX: if we timeout on finding the "startup msg" we expect then - # we want to FOR SURE raise an error upwards! - if cs.cancelled_caught: - # if dcntr not in client.containers.list(): - for entry in cntr.seen_so_far: - log.info(entry) + with trio.move_on_after(conf['startup_timeout']) as cs: + async with trio.open_nursery() as tn: + tn.start_soon( + partial( + cntr.process_logs_until, + log_msg_key=conf['log_msg_key'], + patt_matcher=start_pred, + checkpoint_period=conf['startup_query_period'], + ) + ) - raise DockerNotStarted( - f'Failed to start container: {cntr.cuid}\n' - f'due to startup_timeout={conf["startup_timeout"]}s\n\n' - "prolly you should check your container's logs for deats.." - ) + # optional blocking routine + started = conf['started_afunc'] + if started: + await started() + + # poll for container startup or timeout + while not cs.cancel_called: + if dcntr in client.containers.list(): + break + + await trio.sleep(conf['startup_query_period']) + + # sync with remote caller actor-task but allow log + # processing to continue running in bg. + await ctx.started(( + cntr.cntr.id, + os.getpid(), + cntr_config, + )) + + # XXX: if we timeout on finding the "startup msg" we + # expect then we want to FOR SURE raise an error + # upwards! + if cs.cancelled_caught: + # if dcntr not in client.containers.list(): + for entry in cntr.seen_so_far: + log.info(entry) + + raise DockerNotStarted( + f'Failed to start container: {cntr.cuid}\n' + f'due to timeout={conf["startup_timeout"]}s\n\n' + "check ur container's logs!" + ) # TODO: we might eventually want a proxy-style msg-prot here # to allow remote control of containers without needing @@ -430,17 +441,18 @@ async def open_ahabd( # on ctl-c from user.. ideally we can avoid a cancel getting # consumed and not propagating whilst still doing teardown # logging.. - # with trio.CancelScope(shield=True): - await cntr.cancel( - log_msg_key=conf['log_msg_key'], - stop_predicate=stop_lambda, - ) + with trio.CancelScope(shield=True): + await cntr.cancel( + log_msg_key=conf['log_msg_key'], + stop_predicate=stop_pred, + ) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], loglevel: str | None = 'cancel', + drop_root_perms: bool = True, task_status: TaskStatus[ tuple[ @@ -477,7 +489,10 @@ async def start_ahab( # de-escalate root perms to the original user # after the docker supervisor actor is spawned. - if config._parent_user: + if ( + drop_root_perms + and config._parent_user + ): import pwd os.setuid( pwd.getpwnam(