diff --git a/tractor/hilevel/__init__.py b/tractor/hilevel/__init__.py
new file mode 100644
index 00000000..cf2741d8
--- /dev/null
+++ b/tractor/hilevel/__init__.py
@@ -0,0 +1,26 @@
+# tractor: structured concurrent "actors".
+# Copyright 2024-eternity Tyler Goodlet.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+High level design patterns, APIs and runtime extensions built on top
+of the `tractor` runtime core.
+
+'''
+from ._service import (
+ open_service_mngr as open_service_mngr,
+ get_service_mngr as get_service_mngr,
+ ServiceMngr as ServiceMngr,
+)
diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py
new file mode 100644
index 00000000..70dddbdf
--- /dev/null
+++ b/tractor/hilevel/_service.py
@@ -0,0 +1,592 @@
+# tractor: structured concurrent "actors".
+# Copyright 2024-eternity Tyler Goodlet.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+Daemon subactor as service(s) management and supervision primitives
+and API.
+
+'''
+from __future__ import annotations
+from contextlib import (
+ asynccontextmanager as acm,
+ # contextmanager as cm,
+)
+from collections import defaultdict
+from dataclasses import (
+ dataclass,
+ field,
+)
+import functools
+import inspect
+from typing import (
+ Callable,
+ Any,
+)
+
+import tractor
+import trio
+from trio import TaskStatus
+from tractor import (
+ log,
+ ActorNursery,
+ current_actor,
+ ContextCancelled,
+ Context,
+ Portal,
+)
+
+log = log.get_logger('tractor')
+
+
+# TODO: implement a `@singleton` deco-API for wrapping the below
+# factory's impl for general actor-singleton use?
+#
+# -[ ] go through the options peeps on SO did?
+# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python
+# * including @mikenerone's answer
+# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313
+#
+# -[ ] put it in `tractor.lowlevel._globals` ?
+# * fits with our oustanding actor-local/global feat req?
+# |_ https://github.com/goodboy/tractor/issues/55
+# * how can it relate to the `Actor.lifetime_stack` that was
+# silently patched in?
+# |_ we could implicitly call both of these in the same
+# spot in the runtime using the lifetime stack?
+# - `open_singleton_cm().__exit__()`
+# -`del_singleton()`
+# |_ gives SC fixtue semantics to sync code oriented around
+# sub-process lifetime?
+# * what about with `trio.RunVar`?
+# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar
+# - which we'll need for no-GIL cpython (right?) presuming
+# multiple `trio.run()` calls in process?
+#
+#
+# @singleton
+# async def open_service_mngr(
+# **init_kwargs,
+# ) -> ServiceMngr:
+# '''
+# Note this function body is invoke IFF no existing singleton instance already
+# exists in this proc's memory.
+
+# '''
+# # setup
+# yield ServiceMngr(**init_kwargs)
+# # teardown
+
+
+# a deletion API for explicit instance de-allocation?
+# @open_service_mngr.deleter
+# def del_service_mngr() -> None:
+# mngr = open_service_mngr._singleton[0]
+# open_service_mngr._singleton[0] = None
+# del mngr
+
+
+
+# TODO: implement a singleton deco-API for wrapping the below
+# factory's impl for general actor-singleton use?
+#
+# @singleton
+# async def open_service_mngr(
+# **init_kwargs,
+# ) -> ServiceMngr:
+# '''
+# Note this function body is invoke IFF no existing singleton instance already
+# exists in this proc's memory.
+
+# '''
+# # setup
+# yield ServiceMngr(**init_kwargs)
+# # teardown
+
+
+
+# TODO: singleton factory API instead of a class API
+@acm
+async def open_service_mngr(
+ *,
+ debug_mode: bool = False,
+
+ # NOTE; since default values for keyword-args are effectively
+ # module-vars/globals as per the note from,
+ # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values
+ #
+ # > "The default value is evaluated only once. This makes
+ # a difference when the default is a mutable object such as
+ # a list, dictionary, or instances of most classes"
+ #
+ _singleton: list[ServiceMngr|None] = [None],
+ **init_kwargs,
+
+) -> ServiceMngr:
+ '''
+ Open an actor-global "service-manager" for supervising a tree
+ of subactors and/or actor-global tasks.
+
+ The delivered `ServiceMngr` is singleton instance for each
+ actor-process, that is, allocated on first open and never
+ de-allocated unless explicitly deleted by al call to
+ `del_service_mngr()`.
+
+ '''
+ # TODO: factor this an allocation into
+ # a `._mngr.open_service_mngr()` and put in the
+ # once-n-only-once setup/`.__aenter__()` part!
+ # -[ ] how to make this only happen on the `mngr == None` case?
+ # |_ use `.trionics.maybe_open_context()` (for generic
+ # async-with-style-only-once of the factory impl, though
+ # what do we do for the allocation case?
+ # / `.maybe_open_nursery()` (since for this specific case
+ # it's simpler?) to activate
+ async with (
+ tractor.open_nursery() as an,
+ trio.open_nursery() as tn,
+ ):
+ # impl specific obvi..
+ init_kwargs.update({
+ 'an': an,
+ 'tn': tn,
+ })
+
+ mngr: ServiceMngr|None
+ if (mngr := _singleton[0]) is None:
+
+ log.info('Allocating a new service mngr!')
+ mngr = _singleton[0] = ServiceMngr(**init_kwargs)
+
+ # TODO: put into `.__aenter__()` section of
+ # eventual `@singleton_acm` API wrapper.
+ #
+ # assign globally for future daemon/task creation
+ mngr.an = an
+ mngr.tn = tn
+
+ else:
+ assert (mngr.an and mngr.tn)
+ log.info(
+ 'Using extant service mngr!\n\n'
+ f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
+ )
+
+ try:
+ # NOTE: this is a singleton factory impl specific detail
+ # which should be supported in the condensed
+ # `@singleton_acm` API?
+ mngr.debug_mode = debug_mode
+
+ yield mngr
+ finally:
+ # TODO: is this more clever/efficient?
+ # if 'samplerd' in mngr.service_ctxs:
+ # await mngr.cancel_service('samplerd')
+ tn.cancel_scope.cancel()
+
+
+
+def get_service_mngr() -> ServiceMngr:
+ '''
+ Try to get the singleton service-mngr for this actor presuming it
+ has already been allocated using,
+
+ .. code:: python
+
+ async with open_<@singleton_acm(func)>() as mngr`
+ ... this block kept open ...
+
+ If not yet allocated raise a `ServiceError`.
+
+ '''
+ # https://stackoverflow.com/a/12627202
+ # https://docs.python.org/3/library/inspect.html#inspect.Signature
+ maybe_mngr: ServiceMngr|None = inspect.signature(
+ open_service_mngr
+ ).parameters['_singleton'].default[0]
+
+ if maybe_mngr is None:
+ raise RuntimeError(
+ 'Someone must allocate a `ServiceMngr` using\n\n'
+ '`async with open_service_mngr()` beforehand!!\n'
+ )
+
+ return maybe_mngr
+
+
+async def _open_and_supervise_service_ctx(
+ serman: ServiceMngr,
+ name: str,
+ ctx_fn: Callable, # TODO, type for `@tractor.context` requirement
+ portal: Portal,
+
+ allow_overruns: bool = False,
+ task_status: TaskStatus[
+ tuple[
+ trio.CancelScope,
+ Context,
+ trio.Event,
+ Any,
+ ]
+ ] = trio.TASK_STATUS_IGNORED,
+ **ctx_kwargs,
+
+) -> Any:
+ '''
+ Open a remote IPC-context defined by `ctx_fn` in the
+ (service) actor accessed via `portal` and supervise the
+ (local) parent task to termination at which point the remote
+ actor runtime is cancelled alongside it.
+
+ The main application is for allocating long-running
+ "sub-services" in a main daemon and explicitly controlling
+ their lifetimes from an actor-global singleton.
+
+ '''
+ # TODO: use the ctx._scope directly here instead?
+ # -[ ] actually what semantics do we expect for this
+ # usage!?
+ with trio.CancelScope() as cs:
+ try:
+ async with portal.open_context(
+ ctx_fn,
+ allow_overruns=allow_overruns,
+ **ctx_kwargs,
+
+ ) as (ctx, started):
+
+ # unblock once the remote context has started
+ complete = trio.Event()
+ task_status.started((
+ cs,
+ ctx,
+ complete,
+ started,
+ ))
+ log.info(
+ f'`pikerd` service {name} started with value {started}'
+ )
+ # wait on any context's return value
+ # and any final portal result from the
+ # sub-actor.
+ ctx_res: Any = await ctx.wait_for_result()
+
+ # NOTE: blocks indefinitely until cancelled
+ # either by error from the target context
+ # function or by being cancelled here by the
+ # surrounding cancel scope.
+ return (
+ await portal.wait_for_result(),
+ ctx_res,
+ )
+
+ except ContextCancelled as ctxe:
+ canceller: tuple[str, str] = ctxe.canceller
+ our_uid: tuple[str, str] = current_actor().uid
+ if (
+ canceller != portal.chan.uid
+ and
+ canceller != our_uid
+ ):
+ log.cancel(
+ f'Actor-service `{name}` was remotely cancelled by a peer?\n'
+
+ # TODO: this would be a good spot to use
+ # a respawn feature Bo
+ f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
+
+ f'cancellee: {portal.chan.uid}\n'
+ f'canceller: {canceller}\n'
+ )
+ else:
+ raise
+
+ finally:
+ # NOTE: the ctx MUST be cancelled first if we
+ # don't want the above `ctx.wait_for_result()` to
+ # raise a self-ctxc. WHY, well since from the ctx's
+ # perspective the cancel request will have
+ # arrived out-out-of-band at the `Actor.cancel()`
+ # level, thus `Context.cancel_called == False`,
+ # meaning `ctx._is_self_cancelled() == False`.
+ # with trio.CancelScope(shield=True):
+ # await ctx.cancel()
+ await portal.cancel_actor() # terminate (remote) sub-actor
+ complete.set() # signal caller this task is done
+ serman.service_ctxs.pop(name) # remove mngr entry
+
+
+# TODO: we need remote wrapping and a general soln:
+# - factor this into a ``tractor.highlevel`` extension # pack for the
+# library.
+# - wrap a "remote api" wherein you can get a method proxy
+# to the pikerd actor for starting services remotely!
+# - prolly rename this to ActorServicesNursery since it spawns
+# new actors and supervises them to completion?
+@dataclass
+class ServiceMngr:
+ '''
+ A multi-subactor-as-service manager.
+
+ Spawn, supervise and monitor service/daemon subactors in a SC
+ process tree.
+
+ '''
+ an: ActorNursery
+ tn: trio.Nursery
+ debug_mode: bool = False # tractor sub-actor debug mode flag
+
+ service_tasks: dict[
+ str,
+ tuple[
+ trio.CancelScope,
+ trio.Event,
+ ]
+ ] = field(default_factory=dict)
+
+ service_ctxs: dict[
+ str,
+ tuple[
+ trio.CancelScope,
+ Context,
+ Portal,
+ trio.Event,
+ ]
+ ] = field(default_factory=dict)
+
+ # internal per-service task mutexs
+ _locks = defaultdict(trio.Lock)
+
+ # TODO, unify this interface with our `TaskManager` PR!
+ #
+ #
+ async def start_service_task(
+ self,
+ name: str,
+ # TODO: typevar for the return type of the target and then
+ # use it below for `ctx_res`?
+ fn: Callable,
+
+ allow_overruns: bool = False,
+ **ctx_kwargs,
+
+ ) -> tuple[
+ trio.CancelScope,
+ Any,
+ trio.Event,
+ ]:
+ async def _task_manager_start(
+ task_status: TaskStatus[
+ tuple[
+ trio.CancelScope,
+ trio.Event,
+ ]
+ ] = trio.TASK_STATUS_IGNORED,
+ ) -> Any:
+
+ task_cs = trio.CancelScope()
+ task_complete = trio.Event()
+
+ with task_cs as cs:
+ task_status.started((
+ cs,
+ task_complete,
+ ))
+ try:
+ await fn()
+ except trio.Cancelled as taskc:
+ log.cancel(
+ f'Service task for `{name}` was cancelled!\n'
+ # TODO: this would be a good spot to use
+ # a respawn feature Bo
+ )
+ raise taskc
+ finally:
+ task_complete.set()
+ (
+ cs,
+ complete,
+ ) = await self.tn.start(_task_manager_start)
+
+ # store the cancel scope and portal for later cancellation or
+ # retstart if needed.
+ self.service_tasks[name] = (
+ cs,
+ complete,
+ )
+ return (
+ cs,
+ complete,
+ )
+
+ async def cancel_service_task(
+ self,
+ name: str,
+
+ ) -> Any:
+ log.info(f'Cancelling `pikerd` service {name}')
+ cs, complete = self.service_tasks[name]
+
+ cs.cancel()
+ await complete.wait()
+ # TODO, if we use the `TaskMngr` from #346
+ # we can also get the return value from the task!
+
+ if name in self.service_tasks:
+ # TODO: custom err?
+ # raise ServiceError(
+ raise RuntimeError(
+ f'Service task {name!r} not terminated!?\n'
+ )
+
+ async def start_service_ctx(
+ self,
+ name: str,
+ portal: Portal,
+ # TODO: typevar for the return type of the target and then
+ # use it below for `ctx_res`?
+ ctx_fn: Callable,
+ **ctx_kwargs,
+
+ ) -> tuple[
+ trio.CancelScope,
+ Context,
+ Any,
+ ]:
+ '''
+ Start a remote IPC-context defined by `ctx_fn` in a background
+ task and immediately return supervision primitives to manage it:
+
+ - a `cs: CancelScope` for the newly allocated bg task
+ - the `ipc_ctx: Context` to manage the remotely scheduled
+ `trio.Task`.
+ - the `started: Any` value returned by the remote endpoint
+ task's `Context.started()` call.
+
+ The bg task supervises the ctx such that when it terminates the supporting
+ actor runtime is also cancelled, see `_open_and_supervise_service_ctx()`
+ for details.
+
+ '''
+ cs, ipc_ctx, complete, started = await self.tn.start(
+ functools.partial(
+ _open_and_supervise_service_ctx,
+ serman=self,
+ name=name,
+ ctx_fn=ctx_fn,
+ portal=portal,
+ **ctx_kwargs,
+ )
+ )
+
+ # store the cancel scope and portal for later cancellation or
+ # retstart if needed.
+ self.service_ctxs[name] = (cs, ipc_ctx, portal, complete)
+ return (
+ cs,
+ ipc_ctx,
+ started,
+ )
+
+ async def start_service(
+ self,
+ daemon_name: str,
+ ctx_ep: Callable, # kwargs must `partial`-ed in!
+ # ^TODO, type for `@tractor.context` deco-ed funcs!
+
+ debug_mode: bool = False,
+ **start_actor_kwargs,
+
+ ) -> Context:
+ '''
+ Start new subactor and schedule a supervising "service task"
+ in it which explicitly defines the sub's lifetime.
+
+ "Service daemon subactors" are cancelled (and thus
+ terminated) using the paired `.cancel_service()`.
+
+ Effectively this API can be used to manage "service daemons"
+ spawned under a single parent actor with supervision
+ semantics equivalent to a one-cancels-one style actor-nursery
+ or "(subactor) task manager" where each subprocess's (and
+ thus its embedded actor runtime) lifetime is synced to that
+ of the remotely spawned task defined by `ctx_ep`.
+
+ The funcionality can be likened to a "daemonized" version of
+ `.hilevel.worker.run_in_actor()` but with supervision
+ controls offered by `tractor.Context` where the main/root
+ remotely scheduled `trio.Task` invoking `ctx_ep` determines
+ the underlying subactor's lifetime.
+
+ '''
+ entry: tuple|None = self.service_ctxs.get(daemon_name)
+ if entry:
+ (cs, sub_ctx, portal, complete) = entry
+ return sub_ctx
+
+ if daemon_name not in self.service_ctxs:
+ portal: Portal = await self.an.start_actor(
+ daemon_name,
+ debug_mode=( # maybe set globally during allocate
+ debug_mode
+ or
+ self.debug_mode
+ ),
+ **start_actor_kwargs,
+ )
+ ctx_kwargs: dict[str, Any] = {}
+ if isinstance(ctx_ep, functools.partial):
+ ctx_kwargs: dict[str, Any] = ctx_ep.keywords
+ ctx_ep: Callable = ctx_ep.func
+
+ (
+ cs,
+ sub_ctx,
+ started,
+ ) = await self.start_service_ctx(
+ name=daemon_name,
+ portal=portal,
+ ctx_fn=ctx_ep,
+ **ctx_kwargs,
+ )
+
+ return sub_ctx
+
+ async def cancel_service(
+ self,
+ name: str,
+
+ ) -> Any:
+ '''
+ Cancel the service task and actor for the given ``name``.
+
+ '''
+ log.info(f'Cancelling `pikerd` service {name}')
+ cs, sub_ctx, portal, complete = self.service_ctxs[name]
+
+ # cs.cancel()
+ await sub_ctx.cancel()
+ await complete.wait()
+
+ if name in self.service_ctxs:
+ # TODO: custom err?
+ # raise ServiceError(
+ raise RuntimeError(
+ f'Service actor for {name} not terminated and/or unknown?'
+ )
+
+ # assert name not in self.service_ctxs, \
+ # f'Serice task for {name} not terminated?'