From 91d7db9db8a0f2a4c034458d37f19e25170b44f2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Feb 2025 10:34:34 -0500 Subject: [PATCH] Official service-mngr to `tractor.hilevel` move Such that we maintain that subsys in the actor-runtime repo (with hopefully an extensive test suite XD). Port deats, - rewrite `open_service_mngr()` as a thin wrapper that delegates into the new `tractor.hilevel.open_service_mngr()` but with maintenance of the `Services` class-singleton for now. - port `.service._daemon` usage to the new `ServiceMngr.start_service_ctx()` a rename from `.start_service_task()` which is now likely destined for the soon supported `tractor.trionics.TaskMngr` nursery extension. - ref the new `ServiceMngr.an: ActorNursery` instance var name. Other, - always enable the `tractor.pause_from_sync()` support via `greenback` whenever `debug_mode` is set at `pikerd` init. --- piker/service/_daemon.py | 18 +- piker/service/_mngr.py | 441 ++------------------------------------- 2 files changed, 31 insertions(+), 428 deletions(-) diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 0cb57291..b881afc6 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -104,6 +104,12 @@ async def maybe_spawn_daemon( # service task for that actor. started: bool if pikerd_portal is None: + + # await tractor.pause() + if tractor_kwargs.get('debug_mode', False): + from tractor.devx._debug import maybe_init_greenback + await maybe_init_greenback() + started = await service_task_target( loglevel=loglevel, **spawn_args, @@ -208,7 +214,7 @@ async def spawn_emsd( log.info('Spawning emsd') smngr: ServiceMngr = get_service_mngr() - portal = await smngr.actor_n.start_actor( + portal = await smngr.an.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', @@ -222,12 +228,10 @@ async def spawn_emsd( # non-blocking setup of clearing service from ..clearing._ems import _setup_persistent_emsd - await smngr.start_service_task( - 'emsd', - portal, - - # signature of target root-task endpoint - _setup_persistent_emsd, + await smngr.start_service_ctx( + name='emsd', + portal=portal, + ctx_fn=_setup_persistent_emsd, loglevel=loglevel, ) return True diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 4a4c3938..9557a828 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -18,425 +18,16 @@ daemon-service management API. """ -from __future__ import annotations from contextlib import ( asynccontextmanager as acm, - # contextmanager as cm, -) -from collections import defaultdict -from dataclasses import ( - dataclass, - field, -) -import functools -import inspect -from typing import ( - Callable, - Any, ) -import msgspec import tractor -import trio -from trio import TaskStatus -from tractor import ( - ActorNursery, - current_actor, - ContextCancelled, - Context, - Portal, +from tractor.hilevel import ( + ServiceMngr, + # open_service_mngr as _open_service_mngr, + get_service_mngr as get_service_mngr, ) - -from ._util import ( - log, # sub-sys logger -) - - -# TODO: implement a singleton deco-API for wrapping the below -# factory's impl for general actor-singleton use? -# -# @singleton -# async def open_service_mngr( -# **init_kwargs, -# ) -> ServiceMngr: -# ''' -# Note this function body is invoke IFF no existing singleton instance already -# exists in this proc's memory. - -# ''' -# # setup -# yield ServiceMngr(**init_kwargs) -# # teardown - - - -# TODO: singleton factory API instead of a class API -@acm -async def open_service_mngr( - *, - debug_mode: bool = False, - - # impl deat which ensures a single global instance - _singleton: list[ServiceMngr|None] = [None], - **init_kwargs, - -) -> ServiceMngr: - ''' - Open a multi-subactor-as-service-daemon tree supervisor. - - The delivered `ServiceMngr` is a singleton instance for each - actor-process and is allocated on first open and never - de-allocated unless explicitly deleted by al call to - `del_service_mngr()`. - - ''' - # TODO: factor this an allocation into - # a `._mngr.open_service_mngr()` and put in the - # once-n-only-once setup/`.__aenter__()` part! - # -[ ] how to make this only happen on the `mngr == None` case? - # |_ use `.trionics.maybe_open_context()` (for generic - # async-with-style-only-once of the factory impl, though - # what do we do for the allocation case? - # / `.maybe_open_nursery()` (since for this specific case - # it's simpler?) to activate - async with ( - tractor.open_nursery() as an, - trio.open_nursery() as tn, - ): - # impl specific obvi.. - init_kwargs.update({ - 'actor_n': an, - 'service_n': tn, - }) - - mngr: ServiceMngr|None - if (mngr := _singleton[0]) is None: - - log.info('Allocating a new service mngr!') - mngr = _singleton[0] = ServiceMngr(**init_kwargs) - - # TODO: put into `.__aenter__()` section of - # eventual `@singleton_acm` API wrapper. - # - # assign globally for future daemon/task creation - mngr.actor_n = an - mngr.service_n = tn - - else: - assert ( - mngr.actor_n - and - mngr.service_tn - ) - log.info( - 'Using extant service mngr!\n\n' - f'{mngr!r}\n' # it has a nice `.__repr__()` of services state - ) - - try: - # NOTE: this is a singleton factory impl specific detail - # which should be supported in the condensed - # `@singleton_acm` API? - mngr.debug_mode = debug_mode - - yield mngr - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in mngr.service_tasks: - # await mngr.cancel_service('samplerd') - - # await tractor.pause(shield=True) - # ^XXX, if needed mk sure to shield it ;) - tn.cancel_scope.cancel() - - - -def get_service_mngr() -> ServiceMngr: - ''' - Try to get the singleton service-mngr for this actor presuming it - has already been allocated using, - - .. code:: python - - async with open_<@singleton_acm(func)>() as mngr` - ... this block kept open ... - - If not yet allocated raise a `ServiceError`. - - ''' - # https://stackoverflow.com/a/12627202 - # https://docs.python.org/3/library/inspect.html#inspect.Signature - maybe_mngr: ServiceMngr|None = inspect.signature( - open_service_mngr - ).parameters['_singleton'].default[0] - - if maybe_mngr is None: - raise RuntimeError( - 'Someone must allocate a `ServiceMngr` using\n\n' - '`async with open_service_mngr()` beforehand!!\n' - ) - - return maybe_mngr - - -# TODO: we need remote wrapping and a general soln: -# - factor this into a ``tractor.highlevel`` extension # pack for the -# library. -# - wrap a "remote api" wherein you can get a method proxy -# to the pikerd actor for starting services remotely! -# - prolly rename this to ActorServicesNursery since it spawns -# new actors and supervises them to completion? -@dataclass -class ServiceMngr: -# class ServiceMngr(msgspec.Struct): - ''' - A multi-subactor-as-service manager. - - Spawn, supervise and monitor service/daemon subactors in a SC - process tree. - - ''' - actor_n: ActorNursery - service_n: trio.Nursery - debug_mode: bool = False # tractor sub-actor debug mode flag - - service_tasks: dict[ - str, - tuple[ - trio.CancelScope, - Context, - Portal, - trio.Event, - ] - ] = field(default_factory=dict) - - # internal per-service task mutexs - _locks = defaultdict(trio.Lock) - - async def start_service_task( - self, - name: str, - portal: Portal, - - # TODO: typevar for the return type of the target and then - # use it below for `ctx_res`? - target: Callable, - - allow_overruns: bool = False, - **ctx_kwargs, - - ) -> (trio.CancelScope, Context, Any): - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - async def open_context_in_task( - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - Context, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - - ) -> tuple[ - trio.CancelScope, - Context, - Any, # started value from ctx - ]: - - # TODO: use the ctx._scope directly here instead? - # -[ ] actually what semantics do we expect for this - # usage!? - with trio.CancelScope() as cs: - try: - async with portal.open_context( - target, - allow_overruns=allow_overruns, - - # hide_tb=False, - # ^XXX^ HAWT TIPZ - - **ctx_kwargs, - - ) as (ctx, started): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started(( - cs, - ctx, - complete, - started, - )) - log.info( - f'`pikerd` service {name} started with value {started}' - ) - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res: Any = await ctx.wait_for_result( - # hide_tb=False, - ) - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return ( - await portal.wait_for_result(), - ctx_res, - ) - - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.chan.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service `{name}` was remotely cancelled by a peer?\n' - - # TODO: this would be a good spot to use - # a respawn feature Bo - f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' - - f'cancellee: {portal.chan.uid}\n' - f'canceller: {canceller}\n' - ) - else: - raise - - finally: - # NOTE: the ctx MUST be cancelled first if we - # don't want the above `ctx.wait_for_result()` to - # raise a self-ctxc. - # - # WHY, well since from the ctx's - # perspective the cancel request will have - # arrived out-out-of-band at the `Actor.cancel()` - # level (since pikerd will have called - # `Portal.cancel_actor()`), and thus - # `Context.cancel_called == False`, - # meaning `ctx._is_self_cancelled() == False`. - # - # HOWEVER, this should happen implicitly WITHOUT - # a manual `ctx.cancel()` call HERE since, - # - # - in the mngr shutdown case the surrounding - # `.service_n.cancel_scope` should be - # `.cancel_called == True` and the - # `Portal.open_context()` internals should take - # care of it. - # - # - in the specific-service cancellation case, - # `.cancel_service()` makes the manual - # `ctx.cancel()` call for us which SHOULD mean - # the ctxc is never raised above (since, again, - # it will be gracefully suppressed by - # `.open_context()` internals) and thus we only - # need to shut down the service actor. - await portal.cancel_actor() - self.service_tasks.pop(name) - complete.set() - - ( - cs, # internally allocated - sub_ctx, # RPC peer-actor ctx - complete, # termination syncing - started, # proxyed from internal `.open_context()` entry. - ) = await self.service_n.start( - open_context_in_task - ) - - # store the cancel scope and portal for later cancellation or - # retstart if needed. - self.service_tasks[name] = (cs, sub_ctx, portal, complete) - return ( - cs, - sub_ctx, - started, - ) - - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, sub_ctx, portal, complete = self.service_tasks[name] - - # cs.cancel() - await sub_ctx.cancel() - await complete.wait() - - if name in self.service_tasks: - raise RuntimeError( - f'Serice task for {name} not terminated?' - ) - # raise ServiceError( - # ^TODO? custom err type? - - # assert name not in self.service_tasks, \ - # f'Serice task for {name} not terminated?' - - async def start_service( - self, - daemon_name: str, - ctx_ep: Callable, # kwargs must `partial`-ed in! - - debug_mode: bool = False, - **tractor_actor_kwargs, - - ) -> Context: - ''' - Start a "service" task in a new sub-actor (daemon) and manage it's lifetime - indefinitely. - - Services can be cancelled/shutdown using `.cancel_service()`. - - ''' - entry: tuple|None = self.service_tasks.get(daemon_name) - if entry: - (cs, sub_ctx, portal, complete) = entry - return sub_ctx - - if daemon_name not in self.service_tasks: - portal = await self.actor_n.start_actor( - daemon_name, - debug_mode=( # maybe set globally during allocate - debug_mode - or - self.debug_mode - ), - **tractor_actor_kwargs, - ) - ctx_kwargs: dict[str, Any] = {} - if isinstance(ctx_ep, functools.partial): - ctx_kwargs: dict[str, Any] = ctx_ep.keywords - ctx_ep: Callable = ctx_ep.func - - (cs, sub_ctx, started) = await self.start_service_task( - daemon_name, - portal, - ctx_ep, - **ctx_kwargs, - ) - - return sub_ctx - - # TODO: # -[ ] factor all the common shit from `.data._sampling` # and `.brokers._daemon` into here / `ServiceMngr` @@ -444,11 +35,19 @@ class ServiceMngr: # "service-in-subactor" starting! # -[ ] move to `tractor.hilevel._service`, import and use here! # NOTE: purposely leaks the ref to the mod-scope Bo -# import tractor -# from tractor.hilevel import ( -# open_service_mngr, -# ServiceMngr, -# ) -# mngr: ServiceMngr|None = None -# with tractor.hilevel.open_service_mngr() as mngr: -# Services = proxy(mngr) + +Services: ServiceMngr|None = None + +@acm +async def open_service_mngr( + **kwargs, +) -> ServiceMngr: + + global Services + async with tractor.hilevel.open_service_mngr( + **kwargs, + ) as mngr: + # Services = proxy(mngr) + Services = mngr + yield mngr + Services = None