diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 0629aeda..49d72de6 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -19,6 +19,7 @@ Supervisor for ``docker`` with included async and SC wrapping to ensure a cancellable container lifetime system. ''' +from __future__ import annotations from collections import ChainMap from functools import partial import os @@ -48,6 +49,7 @@ from requests.exceptions import ( ReadTimeout, ) +from ._mngr import Services from ._util import ( log, # sub-sys logger get_console_log, @@ -187,7 +189,11 @@ class Container: and entry not in seen_so_far ): seen_so_far.add(entry) - getattr(log, level.lower(), log.error)(f'{msg}') + getattr( + log, + level.lower(), + log.error + )(f'{msg}') if level == 'fatal': raise ApplicationLogError(msg) @@ -263,8 +269,10 @@ class Container: start = time.time() for _ in range(6): - with trio.move_on_after(0.5) as cs: - log.cancel('polling for CNTR logs...') + with trio.move_on_after(1) as cs: + log.cancel( + 'polling for CNTR logs for {stop_predicate}..' + ) try: await self.process_logs_until( @@ -328,16 +336,13 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type - loglevel: str | None = 'cancel', + loglevel: str | None = None, - **kwargs, + **ep_kwargs, ) -> None: - log = get_console_log( - loglevel, - name=__name__, - ) + log = get_console_log(loglevel or 'cancel') async with open_docker() as client: @@ -350,7 +355,7 @@ async def open_ahabd( cntr_config, start_pred, stop_pred, - ) = ep_func(client) + ) = ep_func(client, **ep_kwargs) cntr = Container(dcntr) conf: ChainMap[str, Any] = ChainMap( @@ -446,10 +451,17 @@ async def open_ahabd( ) -async def start_ahab( +@acm +async def start_ahab_service( + services: Services, service_name: str, + + # endpoint config passed as **kwargs endpoint: Callable[docker.DockerClient, DockerContainer], + ep_kwargs: dict, loglevel: str | None = 'cancel', + + # supervisor config drop_root_perms: bool = True, task_status: TaskStatus[ @@ -470,6 +482,9 @@ async def start_ahab( is started. ''' + # global log + log = get_console_log(loglevel or 'cancel') + cn_ready = trio.Event() try: async with tractor.open_nursery() as an: @@ -498,21 +513,28 @@ async def start_ahab( )[2] # named user's uid ) - async with portal.open_context( - open_ahabd, + cs, first = await services.start_service_task( + name=service_name, + portal=portal, + + # rest: endpoint inputs + target=open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), loglevel='cancel', - ) as (ctx, first): + **ep_kwargs, + ) - cid, pid, cntr_config = first + cid, pid, cntr_config = first - task_status.started(( + try: + yield ( cn_ready, cntr_config, (cid, pid), - )) - - await trio.sleep_forever() + ) + finally: + log.info(f'Cancelling ahab service `{service_name}`') + await services.cancel_service(service_name) # since we demoted root perms in this parent # we'll get a perms error on proc cleanup in diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index e37bb7ec..70771593 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -33,8 +33,11 @@ from ._util import ( ) -# TODO: factor this into a ``tractor.highlevel`` extension -# pack for the library. +# TODO: we need remote wrapping and a general soln: +# - factor this into a ``tractor.highlevel`` extension # pack for the +# library. +# - wrap a "remote api" wherein you can get a method proxy +# to the pikerd actor for starting services remotely! class Services: actor_n: tractor._supervise.ActorNursery @@ -80,6 +83,7 @@ class Services: ) -> Any: with trio.CancelScope() as cs: + async with portal.open_context( target, allow_overruns=allow_overruns,