From 46dbe6d2fcf33e21cd631cb54089c7d646658aaf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Dec 2024 12:38:35 -0500 Subject: [PATCH] Mv over `ServiceMngr` from `piker` with mods Namely distinguishing service "IPC contexts" (opened in a subactor via a `Portal`) from just local `trio.Task`s started and managed under the `.service_n` (more or less wrapping in the interface of a "task-manager" style nursery - aka a one-cancels-one supervision start). API changes from original (`piker`) impl, - mk `.start_service_task()` do ONLY that, start a task with a wrapping cancel-scope and completion event. |_ ideally this gets factored-out/re-implemented using the task-manager/OCO-style-nursery from GH #363. - change what was the impl of `.start_service_task()` to `.start_service_ctx()` since it more explicitly defines the functionality of entering `Portal.open_context()` with a wrapping cs and completion event inside a bg task (which syncs the ctx's lifetime with termination of the remote actor runtime). - factor out what was a `.start_service_ctx()` closure to a new `_open_and_supervise_service_ctx()` mod-func holding the meat of the supervision logic. `ServiceMngr` API brief, - use `open_service_mngr()` and `get_service_mngr()` to acquire the actor-global singleton. - `ServiceMngr.start_service()` and `.cancel_service()` which allow for straight forward mgmt of "service subactor daemons". --- tractor/hilevel/__init__.py | 26 +++ tractor/hilevel/_service.py | 448 ++++++++++++++++++++++++++++++++++-- 2 files changed, 453 insertions(+), 21 deletions(-) diff --git a/tractor/hilevel/__init__.py b/tractor/hilevel/__init__.py index e69de29..cf2741d 100644 --- a/tractor/hilevel/__init__.py +++ b/tractor/hilevel/__init__.py @@ -0,0 +1,26 @@ +# tractor: structured concurrent "actors". +# Copyright 2024-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +High level design patterns, APIs and runtime extensions built on top +of the `tractor` runtime core. + +''' +from ._service import ( + open_service_mngr as open_service_mngr, + get_service_mngr as get_service_mngr, + ServiceMngr as ServiceMngr, +) diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py index 1376c2a..869d486 100644 --- a/tractor/hilevel/_service.py +++ b/tractor/hilevel/_service.py @@ -21,18 +21,26 @@ and API. ''' from __future__ import annotations from contextlib import ( - # asynccontextmanager as acm, - contextmanager as cm, + asynccontextmanager as acm, + # contextmanager as cm, ) from collections import defaultdict +from dataclasses import ( + dataclass, + field, +) +import functools +import inspect from typing import ( Callable, Any, ) +import tractor import trio from trio import TaskStatus from tractor import ( + log, ActorNursery, current_actor, ContextCancelled, @@ -40,9 +48,7 @@ from tractor import ( Portal, ) -from ._util import ( - log, # sub-sys logger -) +log = log.get_logger('tractor') # TODO: implement a `@singleton` deco-API for wrapping the below @@ -93,11 +99,30 @@ from ._util import ( +# TODO: implement a singleton deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + + # TODO: singleton factory API instead of a class API -@cm -def open_service_mngr( +@acm +async def open_service_mngr( *, - _singleton: list[ServiceMngr|None] = [None], + debug_mode: bool = False, + # NOTE; since default values for keyword-args are effectively # module-vars/globals as per the note from, # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values @@ -106,27 +131,408 @@ def open_service_mngr( # a difference when the default is a mutable object such as # a list, dictionary, or instances of most classes" # + _singleton: list[ServiceMngr|None] = [None], **init_kwargs, ) -> ServiceMngr: ''' - Open a multi-subactor-as-service-daemon tree supervisor. + Open an actor-global "service-manager" for supervising a tree + of subactors and/or actor-global tasks. - The delivered `ServiceMngr` is a singleton instance for each - actor-process and is allocated on first open and never + The delivered `ServiceMngr` is singleton instance for each + actor-process, that is, allocated on first open and never de-allocated unless explicitly deleted by al call to `del_service_mngr()`. ''' - mngr: ServiceMngr|None - if (mngr := _singleton[0]) is None: - log.info('Allocating a new service mngr!') - mngr = _singleton[0] = ServiceMngr(**init_kwargs) - else: - log.info( - 'Using extant service mngr!\n\n' - f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + # TODO: factor this an allocation into + # a `._mngr.open_service_mngr()` and put in the + # once-n-only-once setup/`.__aenter__()` part! + # -[ ] how to make this only happen on the `mngr == None` case? + # |_ use `.trionics.maybe_open_context()` (for generic + # async-with-style-only-once of the factory impl, though + # what do we do for the allocation case? + # / `.maybe_open_nursery()` (since for this specific case + # it's simpler?) to activate + async with ( + tractor.open_nursery() as an, + trio.open_nursery() as tn, + ): + # impl specific obvi.. + init_kwargs.update({ + 'actor_n': an, + 'service_n': tn, + }) + + mngr: ServiceMngr|None + if (mngr := _singleton[0]) is None: + + log.info('Allocating a new service mngr!') + mngr = _singleton[0] = ServiceMngr(**init_kwargs) + + # TODO: put into `.__aenter__()` section of + # eventual `@singleton_acm` API wrapper. + # + # assign globally for future daemon/task creation + mngr.actor_n = an + mngr.service_n = tn + + else: + assert ( + mngr.actor_n + and + mngr.service_tn + ) + log.info( + 'Using extant service mngr!\n\n' + f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + ) + + try: + # NOTE: this is a singleton factory impl specific detail + # which should be supported in the condensed + # `@singleton_acm` API? + mngr.debug_mode = debug_mode + + yield mngr + finally: + # TODO: is this more clever/efficient? + # if 'samplerd' in mngr.service_tasks: + # await mngr.cancel_service('samplerd') + tn.cancel_scope.cancel() + + + +def get_service_mngr() -> ServiceMngr: + ''' + Try to get the singleton service-mngr for this actor presuming it + has already been allocated using, + + .. code:: python + + async with open_<@singleton_acm(func)>() as mngr` + ... this block kept open ... + + If not yet allocated raise a `ServiceError`. + + ''' + # https://stackoverflow.com/a/12627202 + # https://docs.python.org/3/library/inspect.html#inspect.Signature + maybe_mngr: ServiceMngr|None = inspect.signature( + open_service_mngr + ).parameters['_singleton'].default[0] + + if maybe_mngr is None: + raise RuntimeError( + 'Someone must allocate a `ServiceMngr` using\n\n' + '`async with open_service_mngr()` beforehand!!\n' ) - with mngr: - yield mngr + return maybe_mngr + + +# TODO: we need remote wrapping and a general soln: +# - factor this into a ``tractor.highlevel`` extension # pack for the +# library. +# - wrap a "remote api" wherein you can get a method proxy +# to the pikerd actor for starting services remotely! +# - prolly rename this to ActorServicesNursery since it spawns +# new actors and supervises them to completion? +@dataclass +class ServiceMngr: + ''' + A multi-subactor-as-service manager. + + Spawn, supervise and monitor service/daemon subactors in a SC + process tree. + + ''' + actor_n: ActorNursery + service_n: trio.Nursery + debug_mode: bool = False # tractor sub-actor debug mode flag + + service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + Context, + Portal, + trio.Event, + ] + ] = field(default_factory=dict) + + # internal per-service task mutexs + _locks = defaultdict(trio.Lock) + + # TODO, unify this interface with our `TaskManager` PR! + # + # + async def start_service_task( + self, + name: str, + # TODO: typevar for the return type of the target and then + # use it below for `ctx_res`? + fn: Callable, + + allow_overruns: bool = False, + **ctx_kwargs, + + ) -> tuple[ + trio.CancelScope, + Any, + trio.Event, + ]: + async def _task_manager_start( + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + trio.Event, + ] + ] = trio.TASK_STATUS_IGNORED, + ) -> Any: + + task_cs = trio.CancelScope() + task_complete = trio.Event() + + with task_cs as cs: + task_status.started(( + cs, + task_complete, + )) + try: + await fn() + except trio.Cancelled as taskc: + log.cancel( + f'Service task for `{name}` was cancelled!\n' + # TODO: this would be a good spot to use + # a respawn feature Bo + ) + raise taskc + finally: + task_complete.set() + ( + cs, + complete, + ) = await self.service_n.start(_task_manager_start) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = ( + cs, + None, + None, + complete, + ) + return ( + cs, + complete, + ) + + async def start_service_ctx( + self, + name: str, + portal: Portal, + # TODO: typevar for the return type of the target and then + # use it below for `ctx_res`? + target: Callable, + **ctx_kwargs, + + ) -> tuple[ + trio.CancelScope, + Context, + Any, + ]: + cs, sub_ctx, complete, started = await self.service_n.start( + functools.partial( + self._open_and_supervise_service_ctx, + name=name, + target=target, + portal=portal, + **ctx_kwargs, + + ) + ) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = (cs, sub_ctx, portal, complete) + return ( + cs, + sub_ctx, + started, + ) + + async def _open_and_supervise_service_ctx( + self, + name: str, + target: Callable, # TODO, type for `@tractor.context` requirement + portal: Portal, + + allow_overruns: bool = False, + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + Context, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, + **ctx_kwargs, + + ) -> Any: + ''' + Open a context in a service sub-actor, add to a stack + that gets unwound at ``pikerd`` teardown. + + This allows for allocating long-running sub-services in our main + daemon and explicitly controlling their lifetimes. + + ''' + # TODO: use the ctx._scope directly here instead? + # -[ ] actually what semantics do we expect for this + # usage!? + with trio.CancelScope() as cs: + try: + async with portal.open_context( + target, + allow_overruns=allow_overruns, + **ctx_kwargs, + + ) as (ctx, started): + + # unblock once the remote context has started + complete = trio.Event() + task_status.started(( + cs, + ctx, + complete, + started, + )) + log.info( + f'`pikerd` service {name} started with value {started}' + ) + # wait on any context's return value + # and any final portal result from the + # sub-actor. + ctx_res: Any = await ctx.wait_for_result() + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. + return ( + await portal.wait_for_result(), + ctx_res, + ) + + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.chan.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service `{name}` was remotely cancelled by a peer?\n' + + # TODO: this would be a good spot to use + # a respawn feature Bo + f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' + + f'cancellee: {portal.chan.uid}\n' + f'canceller: {canceller}\n' + ) + else: + raise + + finally: + # NOTE: the ctx MUST be cancelled first if we + # don't want the above `ctx.wait_for_result()` to + # raise a self-ctxc. WHY, well since from the ctx's + # perspective the cancel request will have + # arrived out-out-of-band at the `Actor.cancel()` + # level, thus `Context.cancel_called == False`, + # meaning `ctx._is_self_cancelled() == False`. + # with trio.CancelScope(shield=True): + # await ctx.cancel() + await portal.cancel_actor() # terminate (remote) sub-actor + complete.set() # signal caller this task is done + self.service_tasks.pop(name) # remove mngr entry + + async def cancel_service( + self, + name: str, + + ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' + log.info(f'Cancelling `pikerd` service {name}') + cs, sub_ctx, portal, complete = self.service_tasks[name] + + # cs.cancel() + await sub_ctx.cancel() + await complete.wait() + + if name in self.service_tasks: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Service actor for {name} not terminated and/or unknown?' + ) + + # assert name not in self.service_tasks, \ + # f'Serice task for {name} not terminated?' + + async def start_service( + self, + daemon_name: str, + ctx_ep: Callable, # kwargs must `partial`-ed in! + # ^TODO, type for `@tractor.context` deco-ed funcs! + + debug_mode: bool = False, + **tractor_actor_kwargs, + + ) -> Context: + ''' + Start a "service" task in a (new) sub-actor and manage its + lifetime indefinitely until termination. + + Service actors can be cancelled (and thus shutdown) using + `.cancel_service()`. + + ''' + entry: tuple|None = self.service_tasks.get(daemon_name) + if entry: + (cs, sub_ctx, portal, complete) = entry + return sub_ctx + + if daemon_name not in self.service_tasks: + portal: Portal = await self.actor_n.start_actor( + daemon_name, + debug_mode=( # maybe set globally during allocate + debug_mode + or + self.debug_mode + ), + **tractor_actor_kwargs, + ) + ctx_kwargs: dict[str, Any] = {} + if isinstance(ctx_ep, functools.partial): + ctx_kwargs: dict[str, Any] = ctx_ep.keywords + ctx_ep: Callable = ctx_ep.func + + ( + cs, + sub_ctx, + started, + ) = await self.start_service_ctx( + name=daemon_name, + portal=portal, + target=ctx_ep, + **ctx_kwargs, + ) + + return sub_ctx