`ahabd`: Harden cancellation teardown (again XD)

Needed to move the startup sequence inside the `try:` block to guarantee
we always do the (now shielded) `.cancel()` call if we get a cancel
during startup.

Also, support an optional `started_afunc` field in the config if
backends want to just provide a one-off blocking async func to sync
container startup. Add a `drop_root_perms: bool` to allow persisting
sudo perms for testing or dyanmic container spawning purposes.
service_subpkg
Tyler Goodlet 2023-03-09 17:58:13 -05:00
parent 9a00c45923
commit 15064d94cb
1 changed files with 61 additions and 46 deletions

View File

@ -15,7 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Supervisor for docker with included specific-image service helpers.
Supervisor for ``docker`` with included async and SC wrapping
to ensure a cancellable container lifetime system.
'''
from collections import ChainMap
@ -349,8 +350,8 @@ async def open_ahabd(
(
dcntr,
cntr_config,
start_lambda,
stop_lambda,
start_pred,
stop_pred,
) = ep_func(client)
cntr = Container(dcntr)
@ -375,48 +376,58 @@ async def open_ahabd(
# when read using:
# ``json.loads(entry for entry in DockerContainer.logs())``
'log_msg_key': 'msg',
# startup sync func, like `Nursery.started()`
'started_afunc': None,
},
)
with trio.move_on_after(conf['startup_timeout']) as cs:
async with trio.open_nursery() as tn:
tn.start_soon(
partial(
cntr.process_logs_until,
log_msg_key=conf['log_msg_key'],
patt_matcher=start_lambda,
checkpoint_period=conf['startup_query_period'],
)
)
# poll for container startup or timeout
while not cs.cancel_called:
if dcntr in client.containers.list():
break
await trio.sleep(conf['startup_query_period'])
# sync with remote caller actor-task but allow log
# processing to continue running in bg.
await ctx.started((
cntr.cntr.id,
os.getpid(),
cntr_config,
))
try:
# XXX: if we timeout on finding the "startup msg" we expect then
# we want to FOR SURE raise an error upwards!
if cs.cancelled_caught:
# if dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)
with trio.move_on_after(conf['startup_timeout']) as cs:
async with trio.open_nursery() as tn:
tn.start_soon(
partial(
cntr.process_logs_until,
log_msg_key=conf['log_msg_key'],
patt_matcher=start_pred,
checkpoint_period=conf['startup_query_period'],
)
)
raise DockerNotStarted(
f'Failed to start container: {cntr.cuid}\n'
f'due to startup_timeout={conf["startup_timeout"]}s\n\n'
"prolly you should check your container's logs for deats.."
)
# optional blocking routine
started = conf['started_afunc']
if started:
await started()
# poll for container startup or timeout
while not cs.cancel_called:
if dcntr in client.containers.list():
break
await trio.sleep(conf['startup_query_period'])
# sync with remote caller actor-task but allow log
# processing to continue running in bg.
await ctx.started((
cntr.cntr.id,
os.getpid(),
cntr_config,
))
# XXX: if we timeout on finding the "startup msg" we
# expect then we want to FOR SURE raise an error
# upwards!
if cs.cancelled_caught:
# if dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)
raise DockerNotStarted(
f'Failed to start container: {cntr.cuid}\n'
f'due to timeout={conf["startup_timeout"]}s\n\n'
"check ur container's logs!"
)
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
@ -430,17 +441,18 @@ async def open_ahabd(
# on ctl-c from user.. ideally we can avoid a cancel getting
# consumed and not propagating whilst still doing teardown
# logging..
# with trio.CancelScope(shield=True):
await cntr.cancel(
log_msg_key=conf['log_msg_key'],
stop_predicate=stop_lambda,
)
with trio.CancelScope(shield=True):
await cntr.cancel(
log_msg_key=conf['log_msg_key'],
stop_predicate=stop_pred,
)
async def start_ahab(
service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer],
loglevel: str | None = 'cancel',
drop_root_perms: bool = True,
task_status: TaskStatus[
tuple[
@ -477,7 +489,10 @@ async def start_ahab(
# de-escalate root perms to the original user
# after the docker supervisor actor is spawned.
if config._parent_user:
if (
drop_root_perms
and config._parent_user
):
import pwd
os.setuid(
pwd.getpwnam(