From cc9a720af79b7688bd70e6c77bb72a886084b81f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jun 2021 00:44:02 -0400 Subject: [PATCH 1/3] Don't use a context stack for contexts --- piker/_daemon.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 009adad5..ae3474c7 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -45,6 +45,7 @@ _root_modules = [ class Services(BaseModel): + actor_n: tractor._trionics.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag @@ -68,13 +69,20 @@ class Services(BaseModel): daemon and explicitly controlling their lifetimes. ''' - ctx, first = await self.ctx_stack.enter_async_context( - portal.open_context( + async def open_context_in_task(): + + async with portal.open_context( target, **kwargs, - ) - ) - return ctx + ) as (ctx, first): + + await ctx.result() + + await portal.result() + + self.service_n.start_soon(open_context_in_task) + + return 'yo, dis a daemon yo.' _services: Optional[Services] = None From 7b6e34aaf48bfc2b31df380884b08218872a6d4a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 26 Jun 2021 16:01:21 -0400 Subject: [PATCH 2/3] Better formalize `pikerd` service semantics An async exit stack around the new `@tractor.context` is problematic since a pushed context can't bubble errors unless the exit stack has been closed. But in that case why do you need the exit stack if you're going to push it and wait it right away; it seems more correct to use a nursery and spawn a task in `pikerd` that waits on the both the target context completion first (thus being able to bubble up any errors from the remote, and top level service task) and the sub-actor portal. (Sub)service Daemons are spawned with `.start_actor()` and thus will block forever until cancelled so, add a way to cancel them explicitly which we'll need eventually for restarts and dynamic feed management. The big lesson here is that async exit stacks are not conducive to spawning and monitoring service tasks, and especially so if a `@tractor.context` is used since if the `.open_context()` call isn't exited (only possible by the stack being closed), then there will be no way for `trio` to cancel the task that pushed that context (since it can't run a checkpoint while yielded inside the stack) without also cancelling all other contexts pushed on that stack. Presuming one `pikerd` task is used to do the original pushing (which it was) then any error would have to kill all service daemon tasks which obviously won't work. I see this mostly as the painz of tinkering out an SC service manager with `tractor` / `trio` for the first time, so try to go easy on the process ;P --- piker/_daemon.py | 140 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 100 insertions(+), 40 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index ae3474c7..e172fe6b 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -19,11 +19,12 @@ Structured, daemon tree service management. """ from typing import Optional, Union, Callable, Any -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import asynccontextmanager from collections import defaultdict from pydantic import BaseModel import trio +from trio_typing import TaskStatus import tractor from .log import get_logger, get_console_log @@ -49,40 +50,75 @@ class Services(BaseModel): actor_n: tractor._trionics.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag - ctx_stack: AsyncExitStack + service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} class Config: arbitrary_types_allowed = True - async def open_remote_ctx( + async def start_service_task( self, + name: str, portal: tractor.Portal, target: Callable, **kwargs, - ) -> tractor.Context: + ) -> (trio.CancelScope, tractor.Context): ''' Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` tearodwn. + that gets unwound at ``pikerd`` teardown. This allows for allocating long-running sub-services in our main daemon and explicitly controlling their lifetimes. ''' - async def open_context_in_task(): + async def open_context_in_task( + task_status: TaskStatus[ + trio.CancelScope] = trio.TASK_STATUS_IGNORED, - async with portal.open_context( - target, - **kwargs, - ) as (ctx, first): + ) -> Any: - await ctx.result() + with trio.CancelScope() as cs: - await portal.result() + async with portal.open_context( + target, + **kwargs, - self.service_n.start_soon(open_context_in_task) + ) as (ctx, first): - return 'yo, dis a daemon yo.' + # unblock once the remote context has started + task_status.started((cs, first)) + + # wait on any context's return value + ctx_res = await ctx.result() + log.info( + f'`pikerd` service {name} started with value {ctx_res}' + ) + + # wait on any error from the sub-actor + # NOTE: this will block indefinitely until cancelled + # either by error from the target context function or + # by being cancelled here by the surroundingn cancel + # scope + return await (portal.result(), ctx_res) + + cs, first = await self.service_n.start(open_context_in_task) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = (cs, portal) + + return cs, first + + async def cancel_service( + self, + name: str, + + ) -> Any: + + log.info(f'Cancelling `pikerd` service {name}') + cs, portal = self.service_tasks[name] + cs.cancel() + return await portal.cancel_actor() _services: Optional[Services] = None @@ -125,22 +161,22 @@ async def open_pikerd( # spawn other specialized daemons I think? enable_modules=_root_modules, ) as _, + tractor.open_nursery() as actor_nursery, ): async with trio.open_nursery() as service_nursery: - # setup service mngr singleton instance - async with AsyncExitStack() as stack: + # # setup service mngr singleton instance + # async with AsyncExitStack() as stack: - # assign globally for future daemon/task creation - _services = Services( - actor_n=actor_nursery, - service_n=service_nursery, - debug_mode=debug_mode, - ctx_stack=stack, - ) + # assign globally for future daemon/task creation + _services = Services( + actor_n=actor_nursery, + service_n=service_nursery, + debug_mode=debug_mode, + ) - yield _services + yield _services @asynccontextmanager @@ -182,16 +218,20 @@ async def maybe_open_pikerd( # subtle, we must have the runtime up here or portal lookup will fail async with maybe_open_runtime(loglevel, **kwargs): + async with tractor.find_actor(_root_dname) as portal: # assert portal is not None if portal is not None: yield portal return - # presume pikerd role + # presume pikerd role since no daemon could be found at + # configured address async with open_pikerd( + loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), + ) as _: # in the case where we're starting up the # tractor-piker runtime stack in **this** process @@ -217,7 +257,7 @@ class Brokerd: async def maybe_spawn_daemon( service_name: str, - spawn_func: Callable, + service_task_target: Callable, spawn_args: dict[str, Any], loglevel: Optional[str] = None, **kwargs, @@ -227,6 +267,13 @@ async def maybe_spawn_daemon( If no ``service_name`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. + If this function is called from a non-pikerd actor, the + spawned service will persist as long as pikerd does or + it is requested to be cancelled. + + This can be seen as a service starting api for remote-actor + clients. + """ if loglevel: get_console_log(loglevel) @@ -254,13 +301,24 @@ async def maybe_spawn_daemon( ) as pikerd_portal: if pikerd_portal is None: - # we are root so spawn brokerd directly in our tree - # the root nursery is accessed through process global state - await spawn_func(**spawn_args) + # we are the root and thus are `pikerd` + # so spawn the target service directly by calling + # the provided target routine. + # XXX: this assumes that the target is well formed and will + # do the right things to setup both a sub-actor **and** call + # the ``_Services`` api from above to start the top level + # service task for that actor. + await service_task_target(**spawn_args) else: + # tell the remote `pikerd` to start the target, + # the target can't return a non-serializable value + # since it is expected that service startingn is + # non-blocking and the target task will persist running + # on `pikerd` after the client requesting it's start + # disconnects. await pikerd_portal.run( - spawn_func, + service_task_target, **spawn_args, ) @@ -275,7 +333,7 @@ async def spawn_brokerd( loglevel: Optional[str] = None, **tractor_kwargs, -) -> tractor._portal.Portal: +) -> bool: log.info(f'Spawning {brokername} broker daemon') @@ -288,6 +346,8 @@ async def spawn_brokerd( global _services assert _services + # ask `pikerd` to spawn a new sub-actor and manage it under its + # actor nursery portal = await _services.actor_n.start_actor( dname, enable_modules=_data_mods + [brokermod.__name__], @@ -299,13 +359,13 @@ async def spawn_brokerd( # non-blocking setup of brokerd service nursery from .data import _setup_persistent_brokerd - await _services.open_remote_ctx( + await _services.start_service_task( + dname, portal, _setup_persistent_brokerd, brokername=brokername, ) - - return dname + return True @asynccontextmanager @@ -322,7 +382,7 @@ async def maybe_spawn_brokerd( async with maybe_spawn_daemon( f'brokerd.{brokername}', - spawn_func=spawn_brokerd, + service_task_target=spawn_brokerd, spawn_args={'brokername': brokername, 'loglevel': loglevel}, loglevel=loglevel, **kwargs, @@ -336,7 +396,7 @@ async def spawn_emsd( loglevel: Optional[str] = None, **extra_tractor_kwargs -) -> tractor._portal.Portal: +) -> bool: """ Start the clearing engine under ``pikerd``. @@ -360,12 +420,12 @@ async def spawn_emsd( # non-blocking setup of clearing service from .clearing._ems import _setup_persistent_emsd - await _services.open_remote_ctx( + await _services.start_service_task( + 'emsd', portal, _setup_persistent_emsd, ) - - return 'emsd' + return True @asynccontextmanager @@ -380,7 +440,7 @@ async def maybe_open_emsd( async with maybe_spawn_daemon( 'emsd', - spawn_func=spawn_emsd, + service_task_target=spawn_emsd, spawn_args={'loglevel': loglevel}, loglevel=loglevel, **kwargs, From 9c24bb6480c254fa539e531e80dc54241846033e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Jul 2021 09:36:54 -0400 Subject: [PATCH 3/3] Make json resp log debug level --- piker/brokers/_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 4b4bd1cd..03125b20 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -53,6 +53,6 @@ def resproc( log.exception(f"Failed to process {resp}:\n{resp.text}") raise BrokerError(resp.text) else: - log.trace(f"Received json contents:\n{colorize_json(json)}") + log.debug(f"Received json contents:\n{colorize_json(json)}") return json if return_json else resp