diff --git a/tractor/hilevel/__init__.py b/tractor/hilevel/__init__.py new file mode 100644 index 00000000..cf2741d8 --- /dev/null +++ b/tractor/hilevel/__init__.py @@ -0,0 +1,26 @@ +# tractor: structured concurrent "actors". +# Copyright 2024-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +High level design patterns, APIs and runtime extensions built on top +of the `tractor` runtime core. + +''' +from ._service import ( + open_service_mngr as open_service_mngr, + get_service_mngr as get_service_mngr, + ServiceMngr as ServiceMngr, +) diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py new file mode 100644 index 00000000..70dddbdf --- /dev/null +++ b/tractor/hilevel/_service.py @@ -0,0 +1,592 @@ +# tractor: structured concurrent "actors". +# Copyright 2024-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Daemon subactor as service(s) management and supervision primitives +and API. + +''' +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, + # contextmanager as cm, +) +from collections import defaultdict +from dataclasses import ( + dataclass, + field, +) +import functools +import inspect +from typing import ( + Callable, + Any, +) + +import tractor +import trio +from trio import TaskStatus +from tractor import ( + log, + ActorNursery, + current_actor, + ContextCancelled, + Context, + Portal, +) + +log = log.get_logger('tractor') + + +# TODO: implement a `@singleton` deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# -[ ] go through the options peeps on SO did? +# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python +# * including @mikenerone's answer +# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313 +# +# -[ ] put it in `tractor.lowlevel._globals` ? +# * fits with our oustanding actor-local/global feat req? +# |_ https://github.com/goodboy/tractor/issues/55 +# * how can it relate to the `Actor.lifetime_stack` that was +# silently patched in? +# |_ we could implicitly call both of these in the same +# spot in the runtime using the lifetime stack? +# - `open_singleton_cm().__exit__()` +# -`del_singleton()` +# |_ gives SC fixtue semantics to sync code oriented around +# sub-process lifetime? +# * what about with `trio.RunVar`? +# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar +# - which we'll need for no-GIL cpython (right?) presuming +# multiple `trio.run()` calls in process? +# +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + +# a deletion API for explicit instance de-allocation? +# @open_service_mngr.deleter +# def del_service_mngr() -> None: +# mngr = open_service_mngr._singleton[0] +# open_service_mngr._singleton[0] = None +# del mngr + + + +# TODO: implement a singleton deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + + +# TODO: singleton factory API instead of a class API +@acm +async def open_service_mngr( + *, + debug_mode: bool = False, + + # NOTE; since default values for keyword-args are effectively + # module-vars/globals as per the note from, + # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values + # + # > "The default value is evaluated only once. This makes + # a difference when the default is a mutable object such as + # a list, dictionary, or instances of most classes" + # + _singleton: list[ServiceMngr|None] = [None], + **init_kwargs, + +) -> ServiceMngr: + ''' + Open an actor-global "service-manager" for supervising a tree + of subactors and/or actor-global tasks. + + The delivered `ServiceMngr` is singleton instance for each + actor-process, that is, allocated on first open and never + de-allocated unless explicitly deleted by al call to + `del_service_mngr()`. + + ''' + # TODO: factor this an allocation into + # a `._mngr.open_service_mngr()` and put in the + # once-n-only-once setup/`.__aenter__()` part! + # -[ ] how to make this only happen on the `mngr == None` case? + # |_ use `.trionics.maybe_open_context()` (for generic + # async-with-style-only-once of the factory impl, though + # what do we do for the allocation case? + # / `.maybe_open_nursery()` (since for this specific case + # it's simpler?) to activate + async with ( + tractor.open_nursery() as an, + trio.open_nursery() as tn, + ): + # impl specific obvi.. + init_kwargs.update({ + 'an': an, + 'tn': tn, + }) + + mngr: ServiceMngr|None + if (mngr := _singleton[0]) is None: + + log.info('Allocating a new service mngr!') + mngr = _singleton[0] = ServiceMngr(**init_kwargs) + + # TODO: put into `.__aenter__()` section of + # eventual `@singleton_acm` API wrapper. + # + # assign globally for future daemon/task creation + mngr.an = an + mngr.tn = tn + + else: + assert (mngr.an and mngr.tn) + log.info( + 'Using extant service mngr!\n\n' + f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + ) + + try: + # NOTE: this is a singleton factory impl specific detail + # which should be supported in the condensed + # `@singleton_acm` API? + mngr.debug_mode = debug_mode + + yield mngr + finally: + # TODO: is this more clever/efficient? + # if 'samplerd' in mngr.service_ctxs: + # await mngr.cancel_service('samplerd') + tn.cancel_scope.cancel() + + + +def get_service_mngr() -> ServiceMngr: + ''' + Try to get the singleton service-mngr for this actor presuming it + has already been allocated using, + + .. code:: python + + async with open_<@singleton_acm(func)>() as mngr` + ... this block kept open ... + + If not yet allocated raise a `ServiceError`. + + ''' + # https://stackoverflow.com/a/12627202 + # https://docs.python.org/3/library/inspect.html#inspect.Signature + maybe_mngr: ServiceMngr|None = inspect.signature( + open_service_mngr + ).parameters['_singleton'].default[0] + + if maybe_mngr is None: + raise RuntimeError( + 'Someone must allocate a `ServiceMngr` using\n\n' + '`async with open_service_mngr()` beforehand!!\n' + ) + + return maybe_mngr + + +async def _open_and_supervise_service_ctx( + serman: ServiceMngr, + name: str, + ctx_fn: Callable, # TODO, type for `@tractor.context` requirement + portal: Portal, + + allow_overruns: bool = False, + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + Context, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, + **ctx_kwargs, + +) -> Any: + ''' + Open a remote IPC-context defined by `ctx_fn` in the + (service) actor accessed via `portal` and supervise the + (local) parent task to termination at which point the remote + actor runtime is cancelled alongside it. + + The main application is for allocating long-running + "sub-services" in a main daemon and explicitly controlling + their lifetimes from an actor-global singleton. + + ''' + # TODO: use the ctx._scope directly here instead? + # -[ ] actually what semantics do we expect for this + # usage!? + with trio.CancelScope() as cs: + try: + async with portal.open_context( + ctx_fn, + allow_overruns=allow_overruns, + **ctx_kwargs, + + ) as (ctx, started): + + # unblock once the remote context has started + complete = trio.Event() + task_status.started(( + cs, + ctx, + complete, + started, + )) + log.info( + f'`pikerd` service {name} started with value {started}' + ) + # wait on any context's return value + # and any final portal result from the + # sub-actor. + ctx_res: Any = await ctx.wait_for_result() + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. + return ( + await portal.wait_for_result(), + ctx_res, + ) + + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.chan.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service `{name}` was remotely cancelled by a peer?\n' + + # TODO: this would be a good spot to use + # a respawn feature Bo + f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' + + f'cancellee: {portal.chan.uid}\n' + f'canceller: {canceller}\n' + ) + else: + raise + + finally: + # NOTE: the ctx MUST be cancelled first if we + # don't want the above `ctx.wait_for_result()` to + # raise a self-ctxc. WHY, well since from the ctx's + # perspective the cancel request will have + # arrived out-out-of-band at the `Actor.cancel()` + # level, thus `Context.cancel_called == False`, + # meaning `ctx._is_self_cancelled() == False`. + # with trio.CancelScope(shield=True): + # await ctx.cancel() + await portal.cancel_actor() # terminate (remote) sub-actor + complete.set() # signal caller this task is done + serman.service_ctxs.pop(name) # remove mngr entry + + +# TODO: we need remote wrapping and a general soln: +# - factor this into a ``tractor.highlevel`` extension # pack for the +# library. +# - wrap a "remote api" wherein you can get a method proxy +# to the pikerd actor for starting services remotely! +# - prolly rename this to ActorServicesNursery since it spawns +# new actors and supervises them to completion? +@dataclass +class ServiceMngr: + ''' + A multi-subactor-as-service manager. + + Spawn, supervise and monitor service/daemon subactors in a SC + process tree. + + ''' + an: ActorNursery + tn: trio.Nursery + debug_mode: bool = False # tractor sub-actor debug mode flag + + service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + trio.Event, + ] + ] = field(default_factory=dict) + + service_ctxs: dict[ + str, + tuple[ + trio.CancelScope, + Context, + Portal, + trio.Event, + ] + ] = field(default_factory=dict) + + # internal per-service task mutexs + _locks = defaultdict(trio.Lock) + + # TODO, unify this interface with our `TaskManager` PR! + # + # + async def start_service_task( + self, + name: str, + # TODO: typevar for the return type of the target and then + # use it below for `ctx_res`? + fn: Callable, + + allow_overruns: bool = False, + **ctx_kwargs, + + ) -> tuple[ + trio.CancelScope, + Any, + trio.Event, + ]: + async def _task_manager_start( + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + trio.Event, + ] + ] = trio.TASK_STATUS_IGNORED, + ) -> Any: + + task_cs = trio.CancelScope() + task_complete = trio.Event() + + with task_cs as cs: + task_status.started(( + cs, + task_complete, + )) + try: + await fn() + except trio.Cancelled as taskc: + log.cancel( + f'Service task for `{name}` was cancelled!\n' + # TODO: this would be a good spot to use + # a respawn feature Bo + ) + raise taskc + finally: + task_complete.set() + ( + cs, + complete, + ) = await self.tn.start(_task_manager_start) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = ( + cs, + complete, + ) + return ( + cs, + complete, + ) + + async def cancel_service_task( + self, + name: str, + + ) -> Any: + log.info(f'Cancelling `pikerd` service {name}') + cs, complete = self.service_tasks[name] + + cs.cancel() + await complete.wait() + # TODO, if we use the `TaskMngr` from #346 + # we can also get the return value from the task! + + if name in self.service_tasks: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Service task {name!r} not terminated!?\n' + ) + + async def start_service_ctx( + self, + name: str, + portal: Portal, + # TODO: typevar for the return type of the target and then + # use it below for `ctx_res`? + ctx_fn: Callable, + **ctx_kwargs, + + ) -> tuple[ + trio.CancelScope, + Context, + Any, + ]: + ''' + Start a remote IPC-context defined by `ctx_fn` in a background + task and immediately return supervision primitives to manage it: + + - a `cs: CancelScope` for the newly allocated bg task + - the `ipc_ctx: Context` to manage the remotely scheduled + `trio.Task`. + - the `started: Any` value returned by the remote endpoint + task's `Context.started()` call. + + The bg task supervises the ctx such that when it terminates the supporting + actor runtime is also cancelled, see `_open_and_supervise_service_ctx()` + for details. + + ''' + cs, ipc_ctx, complete, started = await self.tn.start( + functools.partial( + _open_and_supervise_service_ctx, + serman=self, + name=name, + ctx_fn=ctx_fn, + portal=portal, + **ctx_kwargs, + ) + ) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_ctxs[name] = (cs, ipc_ctx, portal, complete) + return ( + cs, + ipc_ctx, + started, + ) + + async def start_service( + self, + daemon_name: str, + ctx_ep: Callable, # kwargs must `partial`-ed in! + # ^TODO, type for `@tractor.context` deco-ed funcs! + + debug_mode: bool = False, + **start_actor_kwargs, + + ) -> Context: + ''' + Start new subactor and schedule a supervising "service task" + in it which explicitly defines the sub's lifetime. + + "Service daemon subactors" are cancelled (and thus + terminated) using the paired `.cancel_service()`. + + Effectively this API can be used to manage "service daemons" + spawned under a single parent actor with supervision + semantics equivalent to a one-cancels-one style actor-nursery + or "(subactor) task manager" where each subprocess's (and + thus its embedded actor runtime) lifetime is synced to that + of the remotely spawned task defined by `ctx_ep`. + + The funcionality can be likened to a "daemonized" version of + `.hilevel.worker.run_in_actor()` but with supervision + controls offered by `tractor.Context` where the main/root + remotely scheduled `trio.Task` invoking `ctx_ep` determines + the underlying subactor's lifetime. + + ''' + entry: tuple|None = self.service_ctxs.get(daemon_name) + if entry: + (cs, sub_ctx, portal, complete) = entry + return sub_ctx + + if daemon_name not in self.service_ctxs: + portal: Portal = await self.an.start_actor( + daemon_name, + debug_mode=( # maybe set globally during allocate + debug_mode + or + self.debug_mode + ), + **start_actor_kwargs, + ) + ctx_kwargs: dict[str, Any] = {} + if isinstance(ctx_ep, functools.partial): + ctx_kwargs: dict[str, Any] = ctx_ep.keywords + ctx_ep: Callable = ctx_ep.func + + ( + cs, + sub_ctx, + started, + ) = await self.start_service_ctx( + name=daemon_name, + portal=portal, + ctx_fn=ctx_ep, + **ctx_kwargs, + ) + + return sub_ctx + + async def cancel_service( + self, + name: str, + + ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' + log.info(f'Cancelling `pikerd` service {name}') + cs, sub_ctx, portal, complete = self.service_ctxs[name] + + # cs.cancel() + await sub_ctx.cancel() + await complete.wait() + + if name in self.service_ctxs: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Service actor for {name} not terminated and/or unknown?' + ) + + # assert name not in self.service_ctxs, \ + # f'Serice task for {name} not terminated?'