diff --git a/piker/_daemon.py b/piker/_daemon.py index ae3474c7..e172fe6b 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -19,11 +19,12 @@ Structured, daemon tree service management. """ from typing import Optional, Union, Callable, Any -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import asynccontextmanager from collections import defaultdict from pydantic import BaseModel import trio +from trio_typing import TaskStatus import tractor from .log import get_logger, get_console_log @@ -49,40 +50,75 @@ class Services(BaseModel): actor_n: tractor._trionics.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag - ctx_stack: AsyncExitStack + service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} class Config: arbitrary_types_allowed = True - async def open_remote_ctx( + async def start_service_task( self, + name: str, portal: tractor.Portal, target: Callable, **kwargs, - ) -> tractor.Context: + ) -> (trio.CancelScope, tractor.Context): ''' Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` tearodwn. + that gets unwound at ``pikerd`` teardown. This allows for allocating long-running sub-services in our main daemon and explicitly controlling their lifetimes. ''' - async def open_context_in_task(): + async def open_context_in_task( + task_status: TaskStatus[ + trio.CancelScope] = trio.TASK_STATUS_IGNORED, - async with portal.open_context( - target, - **kwargs, - ) as (ctx, first): + ) -> Any: - await ctx.result() + with trio.CancelScope() as cs: - await portal.result() + async with portal.open_context( + target, + **kwargs, - self.service_n.start_soon(open_context_in_task) + ) as (ctx, first): - return 'yo, dis a daemon yo.' + # unblock once the remote context has started + task_status.started((cs, first)) + + # wait on any context's return value + ctx_res = await ctx.result() + log.info( + f'`pikerd` service {name} started with value {ctx_res}' + ) + + # wait on any error from the sub-actor + # NOTE: this will block indefinitely until cancelled + # either by error from the target context function or + # by being cancelled here by the surroundingn cancel + # scope + return await (portal.result(), ctx_res) + + cs, first = await self.service_n.start(open_context_in_task) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = (cs, portal) + + return cs, first + + async def cancel_service( + self, + name: str, + + ) -> Any: + + log.info(f'Cancelling `pikerd` service {name}') + cs, portal = self.service_tasks[name] + cs.cancel() + return await portal.cancel_actor() _services: Optional[Services] = None @@ -125,22 +161,22 @@ async def open_pikerd( # spawn other specialized daemons I think? enable_modules=_root_modules, ) as _, + tractor.open_nursery() as actor_nursery, ): async with trio.open_nursery() as service_nursery: - # setup service mngr singleton instance - async with AsyncExitStack() as stack: + # # setup service mngr singleton instance + # async with AsyncExitStack() as stack: - # assign globally for future daemon/task creation - _services = Services( - actor_n=actor_nursery, - service_n=service_nursery, - debug_mode=debug_mode, - ctx_stack=stack, - ) + # assign globally for future daemon/task creation + _services = Services( + actor_n=actor_nursery, + service_n=service_nursery, + debug_mode=debug_mode, + ) - yield _services + yield _services @asynccontextmanager @@ -182,16 +218,20 @@ async def maybe_open_pikerd( # subtle, we must have the runtime up here or portal lookup will fail async with maybe_open_runtime(loglevel, **kwargs): + async with tractor.find_actor(_root_dname) as portal: # assert portal is not None if portal is not None: yield portal return - # presume pikerd role + # presume pikerd role since no daemon could be found at + # configured address async with open_pikerd( + loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), + ) as _: # in the case where we're starting up the # tractor-piker runtime stack in **this** process @@ -217,7 +257,7 @@ class Brokerd: async def maybe_spawn_daemon( service_name: str, - spawn_func: Callable, + service_task_target: Callable, spawn_args: dict[str, Any], loglevel: Optional[str] = None, **kwargs, @@ -227,6 +267,13 @@ async def maybe_spawn_daemon( If no ``service_name`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. + If this function is called from a non-pikerd actor, the + spawned service will persist as long as pikerd does or + it is requested to be cancelled. + + This can be seen as a service starting api for remote-actor + clients. + """ if loglevel: get_console_log(loglevel) @@ -254,13 +301,24 @@ async def maybe_spawn_daemon( ) as pikerd_portal: if pikerd_portal is None: - # we are root so spawn brokerd directly in our tree - # the root nursery is accessed through process global state - await spawn_func(**spawn_args) + # we are the root and thus are `pikerd` + # so spawn the target service directly by calling + # the provided target routine. + # XXX: this assumes that the target is well formed and will + # do the right things to setup both a sub-actor **and** call + # the ``_Services`` api from above to start the top level + # service task for that actor. + await service_task_target(**spawn_args) else: + # tell the remote `pikerd` to start the target, + # the target can't return a non-serializable value + # since it is expected that service startingn is + # non-blocking and the target task will persist running + # on `pikerd` after the client requesting it's start + # disconnects. await pikerd_portal.run( - spawn_func, + service_task_target, **spawn_args, ) @@ -275,7 +333,7 @@ async def spawn_brokerd( loglevel: Optional[str] = None, **tractor_kwargs, -) -> tractor._portal.Portal: +) -> bool: log.info(f'Spawning {brokername} broker daemon') @@ -288,6 +346,8 @@ async def spawn_brokerd( global _services assert _services + # ask `pikerd` to spawn a new sub-actor and manage it under its + # actor nursery portal = await _services.actor_n.start_actor( dname, enable_modules=_data_mods + [brokermod.__name__], @@ -299,13 +359,13 @@ async def spawn_brokerd( # non-blocking setup of brokerd service nursery from .data import _setup_persistent_brokerd - await _services.open_remote_ctx( + await _services.start_service_task( + dname, portal, _setup_persistent_brokerd, brokername=brokername, ) - - return dname + return True @asynccontextmanager @@ -322,7 +382,7 @@ async def maybe_spawn_brokerd( async with maybe_spawn_daemon( f'brokerd.{brokername}', - spawn_func=spawn_brokerd, + service_task_target=spawn_brokerd, spawn_args={'brokername': brokername, 'loglevel': loglevel}, loglevel=loglevel, **kwargs, @@ -336,7 +396,7 @@ async def spawn_emsd( loglevel: Optional[str] = None, **extra_tractor_kwargs -) -> tractor._portal.Portal: +) -> bool: """ Start the clearing engine under ``pikerd``. @@ -360,12 +420,12 @@ async def spawn_emsd( # non-blocking setup of clearing service from .clearing._ems import _setup_persistent_emsd - await _services.open_remote_ctx( + await _services.start_service_task( + 'emsd', portal, _setup_persistent_emsd, ) - - return 'emsd' + return True @asynccontextmanager @@ -380,7 +440,7 @@ async def maybe_open_emsd( async with maybe_spawn_daemon( 'emsd', - spawn_func=spawn_emsd, + service_task_target=spawn_emsd, spawn_args={'loglevel': loglevel}, loglevel=loglevel, **kwargs,