_ahab: use `Services` api to spawn docker tasks

Allows for using the `Services.cancel_service()` api for explicit
cancellation in tests and eventually for remote teardown. Change
`.start_ahab()` to an `@acm` `start_ahab_service()` and just yield back
the same values we were returning prior. Also fix the logging (level) to
actually reflect what's passed in - we weren't using the correct name
/ instance from the `.sevice` subpkg..
master
Tyler Goodlet 2023-05-23 11:31:58 -04:00
parent 611d1ee3fc
commit bd919f9d66
2 changed files with 47 additions and 21 deletions

View File

@ -19,6 +19,7 @@ Supervisor for ``docker`` with included async and SC wrapping
to ensure a cancellable container lifetime system. to ensure a cancellable container lifetime system.
''' '''
from __future__ import annotations
from collections import ChainMap from collections import ChainMap
from functools import partial from functools import partial
import os import os
@ -48,6 +49,7 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ._mngr import Services
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -187,7 +189,11 @@ class Container:
and entry not in seen_so_far and entry not in seen_so_far
): ):
seen_so_far.add(entry) seen_so_far.add(entry)
getattr(log, level.lower(), log.error)(f'{msg}') getattr(
log,
level.lower(),
log.error
)(f'{msg}')
if level == 'fatal': if level == 'fatal':
raise ApplicationLogError(msg) raise ApplicationLogError(msg)
@ -263,8 +269,10 @@ class Container:
start = time.time() start = time.time()
for _ in range(6): for _ in range(6):
with trio.move_on_after(0.5) as cs: with trio.move_on_after(1) as cs:
log.cancel('polling for CNTR logs...') log.cancel(
'polling for CNTR logs for {stop_predicate}..'
)
try: try:
await self.process_logs_until( await self.process_logs_until(
@ -328,16 +336,13 @@ class Container:
async def open_ahabd( async def open_ahabd(
ctx: tractor.Context, ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type endpoint: str, # ns-pointer str-msg-type
loglevel: str | None = 'cancel', loglevel: str | None = None,
**kwargs, **ep_kwargs,
) -> None: ) -> None:
log = get_console_log( log = get_console_log(loglevel or 'cancel')
loglevel,
name=__name__,
)
async with open_docker() as client: async with open_docker() as client:
@ -350,7 +355,7 @@ async def open_ahabd(
cntr_config, cntr_config,
start_pred, start_pred,
stop_pred, stop_pred,
) = ep_func(client) ) = ep_func(client, **ep_kwargs)
cntr = Container(dcntr) cntr = Container(dcntr)
conf: ChainMap[str, Any] = ChainMap( conf: ChainMap[str, Any] = ChainMap(
@ -446,10 +451,17 @@ async def open_ahabd(
) )
async def start_ahab( @acm
async def start_ahab_service(
services: Services,
service_name: str, service_name: str,
# endpoint config passed as **kwargs
endpoint: Callable[docker.DockerClient, DockerContainer], endpoint: Callable[docker.DockerClient, DockerContainer],
ep_kwargs: dict,
loglevel: str | None = 'cancel', loglevel: str | None = 'cancel',
# supervisor config
drop_root_perms: bool = True, drop_root_perms: bool = True,
task_status: TaskStatus[ task_status: TaskStatus[
@ -470,6 +482,9 @@ async def start_ahab(
is started. is started.
''' '''
# global log
log = get_console_log(loglevel or 'cancel')
cn_ready = trio.Event() cn_ready = trio.Event()
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
@ -498,21 +513,28 @@ async def start_ahab(
)[2] # named user's uid )[2] # named user's uid
) )
async with portal.open_context( cs, first = await services.start_service_task(
open_ahabd, name=service_name,
portal=portal,
# rest: endpoint inputs
target=open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)), endpoint=str(NamespacePath.from_ref(endpoint)),
loglevel='cancel', loglevel='cancel',
) as (ctx, first): **ep_kwargs,
)
cid, pid, cntr_config = first cid, pid, cntr_config = first
task_status.started(( try:
yield (
cn_ready, cn_ready,
cntr_config, cntr_config,
(cid, pid), (cid, pid),
)) )
finally:
await trio.sleep_forever() log.info(f'Cancelling ahab service `{service_name}`')
await services.cancel_service(service_name)
# since we demoted root perms in this parent # since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in # we'll get a perms error on proc cleanup in

View File

@ -33,8 +33,11 @@ from ._util import (
) )
# TODO: factor this into a ``tractor.highlevel`` extension # TODO: we need remote wrapping and a general soln:
# pack for the library. # - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
class Services: class Services:
actor_n: tractor._supervise.ActorNursery actor_n: tractor._supervise.ActorNursery
@ -80,6 +83,7 @@ class Services:
) -> Any: ) -> Any:
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
async with portal.open_context( async with portal.open_context(
target, target,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,