From 896b2c73f45b8e8f4d95c6d97e461b07e811c8ab Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Nov 2024 20:29:16 -0500 Subject: [PATCH 01/12] Initial idea-notes dump and @singleton factory idea from `trio`-gitter --- tractor/hilevel/__init__.py | 0 tractor/hilevel/_service.py | 132 ++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 tractor/hilevel/__init__.py create mode 100644 tractor/hilevel/_service.py diff --git a/tractor/hilevel/__init__.py b/tractor/hilevel/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py new file mode 100644 index 00000000..1376c2a1 --- /dev/null +++ b/tractor/hilevel/_service.py @@ -0,0 +1,132 @@ +# tractor: structured concurrent "actors". +# Copyright 2024-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Daemon subactor as service(s) management and supervision primitives +and API. + +''' +from __future__ import annotations +from contextlib import ( + # asynccontextmanager as acm, + contextmanager as cm, +) +from collections import defaultdict +from typing import ( + Callable, + Any, +) + +import trio +from trio import TaskStatus +from tractor import ( + ActorNursery, + current_actor, + ContextCancelled, + Context, + Portal, +) + +from ._util import ( + log, # sub-sys logger +) + + +# TODO: implement a `@singleton` deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# -[ ] go through the options peeps on SO did? +# * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python +# * including @mikenerone's answer +# |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313 +# +# -[ ] put it in `tractor.lowlevel._globals` ? +# * fits with our oustanding actor-local/global feat req? +# |_ https://github.com/goodboy/tractor/issues/55 +# * how can it relate to the `Actor.lifetime_stack` that was +# silently patched in? +# |_ we could implicitly call both of these in the same +# spot in the runtime using the lifetime stack? +# - `open_singleton_cm().__exit__()` +# -`del_singleton()` +# |_ gives SC fixtue semantics to sync code oriented around +# sub-process lifetime? +# * what about with `trio.RunVar`? +# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar +# - which we'll need for no-GIL cpython (right?) presuming +# multiple `trio.run()` calls in process? +# +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + +# a deletion API for explicit instance de-allocation? +# @open_service_mngr.deleter +# def del_service_mngr() -> None: +# mngr = open_service_mngr._singleton[0] +# open_service_mngr._singleton[0] = None +# del mngr + + + +# TODO: singleton factory API instead of a class API +@cm +def open_service_mngr( + *, + _singleton: list[ServiceMngr|None] = [None], + # NOTE; since default values for keyword-args are effectively + # module-vars/globals as per the note from, + # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values + # + # > "The default value is evaluated only once. This makes + # a difference when the default is a mutable object such as + # a list, dictionary, or instances of most classes" + # + **init_kwargs, + +) -> ServiceMngr: + ''' + Open a multi-subactor-as-service-daemon tree supervisor. + + The delivered `ServiceMngr` is a singleton instance for each + actor-process and is allocated on first open and never + de-allocated unless explicitly deleted by al call to + `del_service_mngr()`. + + ''' + mngr: ServiceMngr|None + if (mngr := _singleton[0]) is None: + log.info('Allocating a new service mngr!') + mngr = _singleton[0] = ServiceMngr(**init_kwargs) + else: + log.info( + 'Using extant service mngr!\n\n' + f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + ) + + with mngr: + yield mngr -- 2.34.1 From dfa2914c1d0f872ea4e0ecf1adfdb19555e609a6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Dec 2024 12:38:35 -0500 Subject: [PATCH 02/12] Mv over `ServiceMngr` from `piker` with mods Namely distinguishing service "IPC contexts" (opened in a subactor via a `Portal`) from just local `trio.Task`s started and managed under the `.service_n` (more or less wrapping in the interface of a "task-manager" style nursery - aka a one-cancels-one supervision start). API changes from original (`piker`) impl, - mk `.start_service_task()` do ONLY that, start a task with a wrapping cancel-scope and completion event. |_ ideally this gets factored-out/re-implemented using the task-manager/OCO-style-nursery from GH #363. - change what was the impl of `.start_service_task()` to `.start_service_ctx()` since it more explicitly defines the functionality of entering `Portal.open_context()` with a wrapping cs and completion event inside a bg task (which syncs the ctx's lifetime with termination of the remote actor runtime). - factor out what was a `.start_service_ctx()` closure to a new `_open_and_supervise_service_ctx()` mod-func holding the meat of the supervision logic. `ServiceMngr` API brief, - use `open_service_mngr()` and `get_service_mngr()` to acquire the actor-global singleton. - `ServiceMngr.start_service()` and `.cancel_service()` which allow for straight forward mgmt of "service subactor daemons". --- tractor/hilevel/__init__.py | 26 +++ tractor/hilevel/_service.py | 448 ++++++++++++++++++++++++++++++++++-- 2 files changed, 453 insertions(+), 21 deletions(-) diff --git a/tractor/hilevel/__init__.py b/tractor/hilevel/__init__.py index e69de29b..cf2741d8 100644 --- a/tractor/hilevel/__init__.py +++ b/tractor/hilevel/__init__.py @@ -0,0 +1,26 @@ +# tractor: structured concurrent "actors". +# Copyright 2024-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +High level design patterns, APIs and runtime extensions built on top +of the `tractor` runtime core. + +''' +from ._service import ( + open_service_mngr as open_service_mngr, + get_service_mngr as get_service_mngr, + ServiceMngr as ServiceMngr, +) diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py index 1376c2a1..869d4861 100644 --- a/tractor/hilevel/_service.py +++ b/tractor/hilevel/_service.py @@ -21,18 +21,26 @@ and API. ''' from __future__ import annotations from contextlib import ( - # asynccontextmanager as acm, - contextmanager as cm, + asynccontextmanager as acm, + # contextmanager as cm, ) from collections import defaultdict +from dataclasses import ( + dataclass, + field, +) +import functools +import inspect from typing import ( Callable, Any, ) +import tractor import trio from trio import TaskStatus from tractor import ( + log, ActorNursery, current_actor, ContextCancelled, @@ -40,9 +48,7 @@ from tractor import ( Portal, ) -from ._util import ( - log, # sub-sys logger -) +log = log.get_logger('tractor') # TODO: implement a `@singleton` deco-API for wrapping the below @@ -93,11 +99,30 @@ from ._util import ( +# TODO: implement a singleton deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + + # TODO: singleton factory API instead of a class API -@cm -def open_service_mngr( +@acm +async def open_service_mngr( *, - _singleton: list[ServiceMngr|None] = [None], + debug_mode: bool = False, + # NOTE; since default values for keyword-args are effectively # module-vars/globals as per the note from, # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values @@ -106,27 +131,408 @@ def open_service_mngr( # a difference when the default is a mutable object such as # a list, dictionary, or instances of most classes" # + _singleton: list[ServiceMngr|None] = [None], **init_kwargs, ) -> ServiceMngr: ''' - Open a multi-subactor-as-service-daemon tree supervisor. + Open an actor-global "service-manager" for supervising a tree + of subactors and/or actor-global tasks. - The delivered `ServiceMngr` is a singleton instance for each - actor-process and is allocated on first open and never + The delivered `ServiceMngr` is singleton instance for each + actor-process, that is, allocated on first open and never de-allocated unless explicitly deleted by al call to `del_service_mngr()`. ''' - mngr: ServiceMngr|None - if (mngr := _singleton[0]) is None: - log.info('Allocating a new service mngr!') - mngr = _singleton[0] = ServiceMngr(**init_kwargs) - else: - log.info( - 'Using extant service mngr!\n\n' - f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + # TODO: factor this an allocation into + # a `._mngr.open_service_mngr()` and put in the + # once-n-only-once setup/`.__aenter__()` part! + # -[ ] how to make this only happen on the `mngr == None` case? + # |_ use `.trionics.maybe_open_context()` (for generic + # async-with-style-only-once of the factory impl, though + # what do we do for the allocation case? + # / `.maybe_open_nursery()` (since for this specific case + # it's simpler?) to activate + async with ( + tractor.open_nursery() as an, + trio.open_nursery() as tn, + ): + # impl specific obvi.. + init_kwargs.update({ + 'actor_n': an, + 'service_n': tn, + }) + + mngr: ServiceMngr|None + if (mngr := _singleton[0]) is None: + + log.info('Allocating a new service mngr!') + mngr = _singleton[0] = ServiceMngr(**init_kwargs) + + # TODO: put into `.__aenter__()` section of + # eventual `@singleton_acm` API wrapper. + # + # assign globally for future daemon/task creation + mngr.actor_n = an + mngr.service_n = tn + + else: + assert ( + mngr.actor_n + and + mngr.service_tn + ) + log.info( + 'Using extant service mngr!\n\n' + f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + ) + + try: + # NOTE: this is a singleton factory impl specific detail + # which should be supported in the condensed + # `@singleton_acm` API? + mngr.debug_mode = debug_mode + + yield mngr + finally: + # TODO: is this more clever/efficient? + # if 'samplerd' in mngr.service_tasks: + # await mngr.cancel_service('samplerd') + tn.cancel_scope.cancel() + + + +def get_service_mngr() -> ServiceMngr: + ''' + Try to get the singleton service-mngr for this actor presuming it + has already been allocated using, + + .. code:: python + + async with open_<@singleton_acm(func)>() as mngr` + ... this block kept open ... + + If not yet allocated raise a `ServiceError`. + + ''' + # https://stackoverflow.com/a/12627202 + # https://docs.python.org/3/library/inspect.html#inspect.Signature + maybe_mngr: ServiceMngr|None = inspect.signature( + open_service_mngr + ).parameters['_singleton'].default[0] + + if maybe_mngr is None: + raise RuntimeError( + 'Someone must allocate a `ServiceMngr` using\n\n' + '`async with open_service_mngr()` beforehand!!\n' ) - with mngr: - yield mngr + return maybe_mngr + + +# TODO: we need remote wrapping and a general soln: +# - factor this into a ``tractor.highlevel`` extension # pack for the +# library. +# - wrap a "remote api" wherein you can get a method proxy +# to the pikerd actor for starting services remotely! +# - prolly rename this to ActorServicesNursery since it spawns +# new actors and supervises them to completion? +@dataclass +class ServiceMngr: + ''' + A multi-subactor-as-service manager. + + Spawn, supervise and monitor service/daemon subactors in a SC + process tree. + + ''' + actor_n: ActorNursery + service_n: trio.Nursery + debug_mode: bool = False # tractor sub-actor debug mode flag + + service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + Context, + Portal, + trio.Event, + ] + ] = field(default_factory=dict) + + # internal per-service task mutexs + _locks = defaultdict(trio.Lock) + + # TODO, unify this interface with our `TaskManager` PR! + # + # + async def start_service_task( + self, + name: str, + # TODO: typevar for the return type of the target and then + # use it below for `ctx_res`? + fn: Callable, + + allow_overruns: bool = False, + **ctx_kwargs, + + ) -> tuple[ + trio.CancelScope, + Any, + trio.Event, + ]: + async def _task_manager_start( + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + trio.Event, + ] + ] = trio.TASK_STATUS_IGNORED, + ) -> Any: + + task_cs = trio.CancelScope() + task_complete = trio.Event() + + with task_cs as cs: + task_status.started(( + cs, + task_complete, + )) + try: + await fn() + except trio.Cancelled as taskc: + log.cancel( + f'Service task for `{name}` was cancelled!\n' + # TODO: this would be a good spot to use + # a respawn feature Bo + ) + raise taskc + finally: + task_complete.set() + ( + cs, + complete, + ) = await self.service_n.start(_task_manager_start) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = ( + cs, + None, + None, + complete, + ) + return ( + cs, + complete, + ) + + async def start_service_ctx( + self, + name: str, + portal: Portal, + # TODO: typevar for the return type of the target and then + # use it below for `ctx_res`? + target: Callable, + **ctx_kwargs, + + ) -> tuple[ + trio.CancelScope, + Context, + Any, + ]: + cs, sub_ctx, complete, started = await self.service_n.start( + functools.partial( + self._open_and_supervise_service_ctx, + name=name, + target=target, + portal=portal, + **ctx_kwargs, + + ) + ) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = (cs, sub_ctx, portal, complete) + return ( + cs, + sub_ctx, + started, + ) + + async def _open_and_supervise_service_ctx( + self, + name: str, + target: Callable, # TODO, type for `@tractor.context` requirement + portal: Portal, + + allow_overruns: bool = False, + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + Context, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, + **ctx_kwargs, + + ) -> Any: + ''' + Open a context in a service sub-actor, add to a stack + that gets unwound at ``pikerd`` teardown. + + This allows for allocating long-running sub-services in our main + daemon and explicitly controlling their lifetimes. + + ''' + # TODO: use the ctx._scope directly here instead? + # -[ ] actually what semantics do we expect for this + # usage!? + with trio.CancelScope() as cs: + try: + async with portal.open_context( + target, + allow_overruns=allow_overruns, + **ctx_kwargs, + + ) as (ctx, started): + + # unblock once the remote context has started + complete = trio.Event() + task_status.started(( + cs, + ctx, + complete, + started, + )) + log.info( + f'`pikerd` service {name} started with value {started}' + ) + # wait on any context's return value + # and any final portal result from the + # sub-actor. + ctx_res: Any = await ctx.wait_for_result() + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. + return ( + await portal.wait_for_result(), + ctx_res, + ) + + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.chan.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service `{name}` was remotely cancelled by a peer?\n' + + # TODO: this would be a good spot to use + # a respawn feature Bo + f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' + + f'cancellee: {portal.chan.uid}\n' + f'canceller: {canceller}\n' + ) + else: + raise + + finally: + # NOTE: the ctx MUST be cancelled first if we + # don't want the above `ctx.wait_for_result()` to + # raise a self-ctxc. WHY, well since from the ctx's + # perspective the cancel request will have + # arrived out-out-of-band at the `Actor.cancel()` + # level, thus `Context.cancel_called == False`, + # meaning `ctx._is_self_cancelled() == False`. + # with trio.CancelScope(shield=True): + # await ctx.cancel() + await portal.cancel_actor() # terminate (remote) sub-actor + complete.set() # signal caller this task is done + self.service_tasks.pop(name) # remove mngr entry + + async def cancel_service( + self, + name: str, + + ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' + log.info(f'Cancelling `pikerd` service {name}') + cs, sub_ctx, portal, complete = self.service_tasks[name] + + # cs.cancel() + await sub_ctx.cancel() + await complete.wait() + + if name in self.service_tasks: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Service actor for {name} not terminated and/or unknown?' + ) + + # assert name not in self.service_tasks, \ + # f'Serice task for {name} not terminated?' + + async def start_service( + self, + daemon_name: str, + ctx_ep: Callable, # kwargs must `partial`-ed in! + # ^TODO, type for `@tractor.context` deco-ed funcs! + + debug_mode: bool = False, + **tractor_actor_kwargs, + + ) -> Context: + ''' + Start a "service" task in a (new) sub-actor and manage its + lifetime indefinitely until termination. + + Service actors can be cancelled (and thus shutdown) using + `.cancel_service()`. + + ''' + entry: tuple|None = self.service_tasks.get(daemon_name) + if entry: + (cs, sub_ctx, portal, complete) = entry + return sub_ctx + + if daemon_name not in self.service_tasks: + portal: Portal = await self.actor_n.start_actor( + daemon_name, + debug_mode=( # maybe set globally during allocate + debug_mode + or + self.debug_mode + ), + **tractor_actor_kwargs, + ) + ctx_kwargs: dict[str, Any] = {} + if isinstance(ctx_ep, functools.partial): + ctx_kwargs: dict[str, Any] = ctx_ep.keywords + ctx_ep: Callable = ctx_ep.func + + ( + cs, + sub_ctx, + started, + ) = await self.start_service_ctx( + name=daemon_name, + portal=portal, + target=ctx_ep, + **ctx_kwargs, + ) + + return sub_ctx -- 2.34.1 From 106dca531a908d4ab81bd02d53b248c7c3ed65bf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Dec 2024 14:24:49 -0500 Subject: [PATCH 03/12] Better separate service tasks vs. ctxs via methods Namely splitting the handles for each in 2 separate tables and adding a `.cancel_service_task()`. Also, - move `_open_and_supervise_service_ctx()` to mod level. - rename `target` -> `ctx_fn` params througout. - fill out method doc strings. --- tractor/hilevel/_service.py | 344 +++++++++++++++++++++--------------- 1 file changed, 201 insertions(+), 143 deletions(-) diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py index 869d4861..c94e4212 100644 --- a/tractor/hilevel/_service.py +++ b/tractor/hilevel/_service.py @@ -197,7 +197,7 @@ async def open_service_mngr( yield mngr finally: # TODO: is this more clever/efficient? - # if 'samplerd' in mngr.service_tasks: + # if 'samplerd' in mngr.service_ctxs: # await mngr.cancel_service('samplerd') tn.cancel_scope.cancel() @@ -231,6 +231,108 @@ def get_service_mngr() -> ServiceMngr: return maybe_mngr +async def _open_and_supervise_service_ctx( + serman: ServiceMngr, + name: str, + ctx_fn: Callable, # TODO, type for `@tractor.context` requirement + portal: Portal, + + allow_overruns: bool = False, + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + Context, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, + **ctx_kwargs, + +) -> Any: + ''' + Open a remote IPC-context defined by `ctx_fn` in the + (service) actor accessed via `portal` and supervise the + (local) parent task to termination at which point the remote + actor runtime is cancelled alongside it. + + The main application is for allocating long-running + "sub-services" in a main daemon and explicitly controlling + their lifetimes from an actor-global singleton. + + ''' + # TODO: use the ctx._scope directly here instead? + # -[ ] actually what semantics do we expect for this + # usage!? + with trio.CancelScope() as cs: + try: + async with portal.open_context( + ctx_fn, + allow_overruns=allow_overruns, + **ctx_kwargs, + + ) as (ctx, started): + + # unblock once the remote context has started + complete = trio.Event() + task_status.started(( + cs, + ctx, + complete, + started, + )) + log.info( + f'`pikerd` service {name} started with value {started}' + ) + # wait on any context's return value + # and any final portal result from the + # sub-actor. + ctx_res: Any = await ctx.wait_for_result() + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. + return ( + await portal.wait_for_result(), + ctx_res, + ) + + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.chan.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service `{name}` was remotely cancelled by a peer?\n' + + # TODO: this would be a good spot to use + # a respawn feature Bo + f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' + + f'cancellee: {portal.chan.uid}\n' + f'canceller: {canceller}\n' + ) + else: + raise + + finally: + # NOTE: the ctx MUST be cancelled first if we + # don't want the above `ctx.wait_for_result()` to + # raise a self-ctxc. WHY, well since from the ctx's + # perspective the cancel request will have + # arrived out-out-of-band at the `Actor.cancel()` + # level, thus `Context.cancel_called == False`, + # meaning `ctx._is_self_cancelled() == False`. + # with trio.CancelScope(shield=True): + # await ctx.cancel() + await portal.cancel_actor() # terminate (remote) sub-actor + complete.set() # signal caller this task is done + serman.service_ctxs.pop(name) # remove mngr entry + + # TODO: we need remote wrapping and a general soln: # - factor this into a ``tractor.highlevel`` extension # pack for the # library. @@ -252,6 +354,14 @@ class ServiceMngr: debug_mode: bool = False # tractor sub-actor debug mode flag service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + trio.Event, + ] + ] = field(default_factory=dict) + + service_ctxs: dict[ str, tuple[ trio.CancelScope, @@ -319,8 +429,6 @@ class ServiceMngr: # retstart if needed. self.service_tasks[name] = ( cs, - None, - None, complete, ) return ( @@ -328,13 +436,33 @@ class ServiceMngr: complete, ) + async def cancel_service_task( + self, + name: str, + + ) -> Any: + log.info(f'Cancelling `pikerd` service {name}') + cs, complete = self.service_tasks[name] + + cs.cancel() + await complete.wait() + # TODO, if we use the `TaskMngr` from #346 + # we can also get the return value from the task! + + if name in self.service_tasks: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Service task {name!r} not terminated!?\n' + ) + async def start_service_ctx( self, name: str, portal: Portal, # TODO: typevar for the return type of the target and then # use it below for `ctx_res`? - target: Callable, + ctx_fn: Callable, **ctx_kwargs, ) -> tuple[ @@ -342,150 +470,41 @@ class ServiceMngr: Context, Any, ]: - cs, sub_ctx, complete, started = await self.service_n.start( + ''' + Start a remote IPC-context defined by `ctx_fn` in a background + task and immediately return supervision primitives to manage it: + + - a `cs: CancelScope` for the newly allocated bg task + - the `ipc_ctx: Context` to manage the remotely scheduled + `trio.Task`. + - the `started: Any` value returned by the remote endpoint + task's `Context.started()` call. + + The bg task supervises the ctx such that when it terminates the supporting + actor runtime is also cancelled, see `_open_and_supervise_service_ctx()` + for details. + + ''' + cs, ipc_ctx, complete, started = await self.service_n.start( functools.partial( - self._open_and_supervise_service_ctx, + _open_and_supervise_service_ctx, + serman=self, name=name, - target=target, + ctx_fn=ctx_fn, portal=portal, **ctx_kwargs, - ) ) # store the cancel scope and portal for later cancellation or # retstart if needed. - self.service_tasks[name] = (cs, sub_ctx, portal, complete) + self.service_ctxs[name] = (cs, ipc_ctx, portal, complete) return ( cs, - sub_ctx, + ipc_ctx, started, ) - async def _open_and_supervise_service_ctx( - self, - name: str, - target: Callable, # TODO, type for `@tractor.context` requirement - portal: Portal, - - allow_overruns: bool = False, - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - Context, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - **ctx_kwargs, - - ) -> Any: - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - # TODO: use the ctx._scope directly here instead? - # -[ ] actually what semantics do we expect for this - # usage!? - with trio.CancelScope() as cs: - try: - async with portal.open_context( - target, - allow_overruns=allow_overruns, - **ctx_kwargs, - - ) as (ctx, started): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started(( - cs, - ctx, - complete, - started, - )) - log.info( - f'`pikerd` service {name} started with value {started}' - ) - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res: Any = await ctx.wait_for_result() - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return ( - await portal.wait_for_result(), - ctx_res, - ) - - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.chan.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service `{name}` was remotely cancelled by a peer?\n' - - # TODO: this would be a good spot to use - # a respawn feature Bo - f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' - - f'cancellee: {portal.chan.uid}\n' - f'canceller: {canceller}\n' - ) - else: - raise - - finally: - # NOTE: the ctx MUST be cancelled first if we - # don't want the above `ctx.wait_for_result()` to - # raise a self-ctxc. WHY, well since from the ctx's - # perspective the cancel request will have - # arrived out-out-of-band at the `Actor.cancel()` - # level, thus `Context.cancel_called == False`, - # meaning `ctx._is_self_cancelled() == False`. - # with trio.CancelScope(shield=True): - # await ctx.cancel() - await portal.cancel_actor() # terminate (remote) sub-actor - complete.set() # signal caller this task is done - self.service_tasks.pop(name) # remove mngr entry - - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, sub_ctx, portal, complete = self.service_tasks[name] - - # cs.cancel() - await sub_ctx.cancel() - await complete.wait() - - if name in self.service_tasks: - # TODO: custom err? - # raise ServiceError( - raise RuntimeError( - f'Service actor for {name} not terminated and/or unknown?' - ) - - # assert name not in self.service_tasks, \ - # f'Serice task for {name} not terminated?' - async def start_service( self, daemon_name: str, @@ -493,23 +512,36 @@ class ServiceMngr: # ^TODO, type for `@tractor.context` deco-ed funcs! debug_mode: bool = False, - **tractor_actor_kwargs, + **start_actor_kwargs, ) -> Context: ''' - Start a "service" task in a (new) sub-actor and manage its - lifetime indefinitely until termination. + Start new subactor and schedule a supervising "service task" + in it which explicitly defines the sub's lifetime. - Service actors can be cancelled (and thus shutdown) using - `.cancel_service()`. + "Service daemon subactors" are cancelled (and thus + terminated) using the paired `.cancel_service()`. + + Effectively this API can be used to manage "service daemons" + spawned under a single parent actor with supervision + semantics equivalent to a one-cancels-one style actor-nursery + or "(subactor) task manager" where each subprocess's (and + thus its embedded actor runtime) lifetime is synced to that + of the remotely spawned task defined by `ctx_ep`. + + The funcionality can be likened to a "daemonized" version of + `.hilevel.worker.run_in_actor()` but with supervision + controls offered by `tractor.Context` where the main/root + remotely scheduled `trio.Task` invoking `ctx_ep` determines + the underlying subactor's lifetime. ''' - entry: tuple|None = self.service_tasks.get(daemon_name) + entry: tuple|None = self.service_ctxs.get(daemon_name) if entry: (cs, sub_ctx, portal, complete) = entry return sub_ctx - if daemon_name not in self.service_tasks: + if daemon_name not in self.service_ctxs: portal: Portal = await self.actor_n.start_actor( daemon_name, debug_mode=( # maybe set globally during allocate @@ -517,7 +549,7 @@ class ServiceMngr: or self.debug_mode ), - **tractor_actor_kwargs, + **start_actor_kwargs, ) ctx_kwargs: dict[str, Any] = {} if isinstance(ctx_ep, functools.partial): @@ -531,8 +563,34 @@ class ServiceMngr: ) = await self.start_service_ctx( name=daemon_name, portal=portal, - target=ctx_ep, + ctx_fn=ctx_ep, **ctx_kwargs, ) return sub_ctx + + async def cancel_service( + self, + name: str, + + ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' + log.info(f'Cancelling `pikerd` service {name}') + cs, sub_ctx, portal, complete = self.service_ctxs[name] + + # cs.cancel() + await sub_ctx.cancel() + await complete.wait() + + if name in self.service_ctxs: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Service actor for {name} not terminated and/or unknown?' + ) + + # assert name not in self.service_ctxs, \ + # f'Serice task for {name} not terminated?' -- 2.34.1 From 0279bb3311b4f9ad5a40378a5b35df4f9016c2af Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Dec 2024 20:26:13 -0500 Subject: [PATCH 04/12] Use shorthand nursery var-names per convention in codebase --- tractor/hilevel/_service.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py index c94e4212..70dddbdf 100644 --- a/tractor/hilevel/_service.py +++ b/tractor/hilevel/_service.py @@ -160,8 +160,8 @@ async def open_service_mngr( ): # impl specific obvi.. init_kwargs.update({ - 'actor_n': an, - 'service_n': tn, + 'an': an, + 'tn': tn, }) mngr: ServiceMngr|None @@ -174,15 +174,11 @@ async def open_service_mngr( # eventual `@singleton_acm` API wrapper. # # assign globally for future daemon/task creation - mngr.actor_n = an - mngr.service_n = tn + mngr.an = an + mngr.tn = tn else: - assert ( - mngr.actor_n - and - mngr.service_tn - ) + assert (mngr.an and mngr.tn) log.info( 'Using extant service mngr!\n\n' f'{mngr!r}\n' # it has a nice `.__repr__()` of services state @@ -349,8 +345,8 @@ class ServiceMngr: process tree. ''' - actor_n: ActorNursery - service_n: trio.Nursery + an: ActorNursery + tn: trio.Nursery debug_mode: bool = False # tractor sub-actor debug mode flag service_tasks: dict[ @@ -423,7 +419,7 @@ class ServiceMngr: ( cs, complete, - ) = await self.service_n.start(_task_manager_start) + ) = await self.tn.start(_task_manager_start) # store the cancel scope and portal for later cancellation or # retstart if needed. @@ -485,7 +481,7 @@ class ServiceMngr: for details. ''' - cs, ipc_ctx, complete, started = await self.service_n.start( + cs, ipc_ctx, complete, started = await self.tn.start( functools.partial( _open_and_supervise_service_ctx, serman=self, @@ -542,7 +538,7 @@ class ServiceMngr: return sub_ctx if daemon_name not in self.service_ctxs: - portal: Portal = await self.actor_n.start_actor( + portal: Portal = await self.an.start_actor( daemon_name, debug_mode=( # maybe set globally during allocate debug_mode -- 2.34.1 From 78f51a3fd86cbac49b9bd7b83f00c953011953e5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 May 2023 14:31:06 -0400 Subject: [PATCH 05/12] Initial prototype for a one-cancels-one style supervisor, nursery thing.. --- tractor/trionics/_supervisor.py | 256 ++++++++++++++++++++++++++++++++ 1 file changed, 256 insertions(+) create mode 100644 tractor/trionics/_supervisor.py diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py new file mode 100644 index 00000000..faeb7616 --- /dev/null +++ b/tractor/trionics/_supervisor.py @@ -0,0 +1,256 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Erlang-style (ish) "one-cancels-one" nursery. + +''' +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, + nullcontext, +) +from typing import ContextManager + +from outcome import ( + Outcome, + acapture, +) +import pdbp +from msgspec import Struct +import trio +from trio._core._run import ( + Task, + CancelScope, + Nursery, +) + +class MaybeOutcome(Struct): + + _ready: Event = trio.Event() + _outcome: Outcome | None = None + _result: Any | None = None + + @property + def result(self) -> Any: + ''' + Either Any or None depending on whether the Outcome has compeleted. + + ''' + if self._outcome is None: + raise RuntimeError( + # f'Task {task.name} is not complete.\n' + f'Outcome is not complete.\n' + 'wait on `await MaybeOutcome.unwrap()` first!' + ) + return self._result + + def _set_outcome( + self, + outcome: Outcome, + ): + self._outcome = outcome + self._result = outcome.unwrap() + self._ready.set() + + # TODO: maybe a better name like, + # - .wait_and_unwrap() + # - .wait_unwrap() + # - .aunwrap() ? + async def unwrap(self) -> Any: + if self._ready.is_set(): + return self._result + + await self._ready.wait() + + out = self._outcome + if out is None: + raise ValueError(f'{out} is not an outcome!?') + + return self.result + + +class TaskHandle(Struct): + task: Task + cs: CancelScope + exited: Event | None = None + _outcome: Outcome | None = None + + +class ScopePerTaskNursery(Struct): + _n: Nursery + _scopes: dict[ + Task, + tuple[CancelScope, Outcome] + ] = {} + + scope_manager: ContextManager | None = None + + async def start_soon( + self, + async_fn, + *args, + + name=None, + scope_manager: ContextManager | None = None, + + ) -> tuple[CancelScope, Task]: + + # NOTE: internals of a nursery don't let you know what + # the most recently spawned task is by order.. so we'd + # have to either change that or do set ops. + # pre_start_tasks: set[Task] = n._children.copy() + # new_tasks = n._children - pre_start_Tasks + # assert len(new_tasks) == 1 + # task = new_tasks.pop() + + n: Nursery = self._n + cs = CancelScope() + new_task: Task | None = None + to_return: tuple[Any] | None = None + maybe_outcome = MaybeOutcome() + + sm = self.scope_manager + if sm is None: + mngr = nullcontext([cs]) + else: + mngr = sm( + nursery=n, + scope=cs, + maybe_outcome=maybe_outcome, + ) + + async def _start_wrapped_in_scope( + task_status: TaskStatus[ + tuple[CancelScope, Task] + ] = trio.TASK_STATUS_IGNORED, + + ) -> None: + nonlocal maybe_outcome + nonlocal to_return + + with cs: + + task = trio.lowlevel.current_task() + self._scopes[cs] = task + + # TODO: instead we should probably just use + # `Outcome.send(mngr)` here no and inside a custom + # decorator `@trio.cancel_scope_manager` enforce + # that it's a single yield generator? + with mngr as to_return: + + # TODO: relay through whatever the + # started task passes back via `.started()` ? + # seems like that won't work with also returning + # a "task handle"? + task_status.started() + + # invoke underlying func now that cs is entered. + outcome = await acapture(async_fn, *args) + + # TODO: instead, mngr.send(outcome) so that we don't + # tie this `.start_soon()` impl to the + # `MaybeOutcome` type? Use `Outcome.send(mngr)` + # right? + maybe_outcome._set_outcome(outcome) + + await n.start(_start_wrapped_in_scope) + assert to_return is not None + + # TODO: better way to concat the values delivered by the user + # provided `.scope_manager` and the outcome? + return tuple([maybe_outcome] + to_return) + + +# TODO: maybe just make this a generator with a single yield that also +# delivers a value (of some std type) from the yield expression? +# @trio.cancel_scope_manager +@cm +def add_task_handle_and_crash_handling( + nursery: Nursery, + scope: CancelScope, + maybe_outcome: MaybeOutcome, + +) -> Generator[None, list[Any]]: + + cs: CancelScope = CancelScope() + + # if you need it you can ask trio for the task obj + task: Task = trio.lowlevel.current_task() + print(f'Spawning task: {task.name}') + + try: + # yields back when task is terminated, cancelled, returns? + with cs: + # the yielded values here are what are returned to the + # nursery's `.start_soon()` caller + + # TODO: actually make this work so that `MaybeOutcome` isn't + # tied to the impl of `.start_soon()` on our custom nursery! + task_outcome: Outcome = yield [cs] + + except Exception as err: + # Adds "crash handling" from `pdbp` by entering + # a REPL on std errors. + pdbp.xpm() + raise + + +@acm +async def open_nursery( + scope_manager = None, + **kwargs, +): + async with trio.open_nursery(**kwargs) as nurse: + yield ScopePerTaskNursery( + nurse, + scope_manager=scope_manager, + ) + + +async def sleep_then_err(): + await trio.sleep(1) + assert 0 + + +async def sleep_then_return_val(val: str): + await trio.sleep(0.2) + return val + + +if __name__ == '__main__': + + async def main(): + async with open_nursery( + scope_manager=add_task_handle_and_crash_handling, + ) as sn: + for _ in range(3): + outcome, cs = await sn.start_soon(trio.sleep_forever) + + # extra task we want to engage in debugger post mortem. + err_outcome, *_ = await sn.start_soon(sleep_then_err) + + val: str = 'yoyoyo' + val_outcome, cs = await sn.start_soon(sleep_then_return_val, val) + res = await val_outcome.unwrap() + assert res == val + print(f'GOT EXPECTED TASK VALUE: {res}') + + print('WAITING FOR CRASH..') + + trio.run(main) -- 2.34.1 From 5e25cf739990a734104745666ee5a73e69257ad2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 May 2023 15:27:29 -0400 Subject: [PATCH 06/12] Alias to `@acm` in broadcaster mod --- tractor/trionics/_broadcast.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 2286e70d..124685ed 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -22,7 +22,7 @@ https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html from __future__ import annotations from abc import abstractmethod from collections import deque -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from functools import partial from operator import ne from typing import ( @@ -398,7 +398,7 @@ class BroadcastReceiver(ReceiveChannel): return await self._receive_from_underlying(key, state) - @asynccontextmanager + @acm async def subscribe( self, raise_on_lag: bool = True, -- 2.34.1 From 7b4accf53f82377e5f12b4b4e152c789a03069ad Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 13:13:21 -0400 Subject: [PATCH 07/12] Do renaming, implement lowlevel `Outcome` sending As was listed in the many todos, this changes the `.start_soon()` impl to instead (manually) `.send()` into the user defined `@task_scope_manager` an `Outcome` from the spawned task. In this case the task manager wraps that in a user defined (and renamed) `TaskOutcome` and delivers that + a containing `trio.CancelScope` to the `.start_soon()` caller. Here the user defined `TaskOutcome` defines a `.wait_for_result()` method that can be used to await the task's exit and handle it's underlying returned value or raised error; the implementation could be different and subject to the user's own whims. Note that by default, if this was added to `trio`'s core, the `@task_scope_manager` would simply be implemented as either a `None` yielding single-yield-generator but more likely just entirely ignored by the runtime (as in no manual task outcome collecting, generator calling and sending is done at all) by default if the user does not provide the `task_scope_manager` to the nursery at open time. --- tractor/trionics/_supervisor.py | 145 +++++++++++++++++++------------- 1 file changed, 87 insertions(+), 58 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index faeb7616..2a664a9f 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -39,11 +39,17 @@ from trio._core._run import ( Nursery, ) -class MaybeOutcome(Struct): - _ready: Event = trio.Event() - _outcome: Outcome | None = None - _result: Any | None = None +class TaskOutcome(Struct): + ''' + The outcome of a scheduled ``trio`` task which includes an interface + for synchronizing to the completion of the task's runtime and access + to the eventual boxed result/value or raised exception. + + ''' + _exited: Event = trio.Event() # as per `trio.Runner.task_exited()` + _outcome: Outcome | None = None # as per `outcome.Outcome` + _result: Any | None = None # the eventual maybe-returned-value @property def result(self) -> Any: @@ -55,7 +61,7 @@ class MaybeOutcome(Struct): raise RuntimeError( # f'Task {task.name} is not complete.\n' f'Outcome is not complete.\n' - 'wait on `await MaybeOutcome.unwrap()` first!' + 'wait on `await TaskOutcome.wait_for_result()` first!' ) return self._result @@ -63,19 +69,27 @@ class MaybeOutcome(Struct): self, outcome: Outcome, ): + ''' + Set the ``Outcome`` for this task. + + This method should only ever be called by the task's supervising + nursery implemenation. + + ''' self._outcome = outcome self._result = outcome.unwrap() - self._ready.set() + self._exited.set() - # TODO: maybe a better name like, - # - .wait_and_unwrap() - # - .wait_unwrap() - # - .aunwrap() ? - async def unwrap(self) -> Any: - if self._ready.is_set(): + async def wait_for_result(self) -> Any: + ''' + Unwind the underlying task's ``Outcome`` by async waiting for + the task to first complete and then unwrap it's result-value. + + ''' + if self._exited.is_set(): return self._result - await self._ready.wait() + await self._exited.wait() out = self._outcome if out is None: @@ -84,13 +98,6 @@ class MaybeOutcome(Struct): return self.result -class TaskHandle(Struct): - task: Task - cs: CancelScope - exited: Event | None = None - _outcome: Outcome | None = None - - class ScopePerTaskNursery(Struct): _n: Nursery _scopes: dict[ @@ -122,17 +129,14 @@ class ScopePerTaskNursery(Struct): cs = CancelScope() new_task: Task | None = None to_return: tuple[Any] | None = None - maybe_outcome = MaybeOutcome() sm = self.scope_manager if sm is None: mngr = nullcontext([cs]) else: - mngr = sm( - nursery=n, - scope=cs, - maybe_outcome=maybe_outcome, - ) + # NOTE: what do we enforce as a signature for the + # `@task_scope_manager` here? + mngr = sm(nursery=n, scope=cs) async def _start_wrapped_in_scope( task_status: TaskStatus[ @@ -140,55 +144,81 @@ class ScopePerTaskNursery(Struct): ] = trio.TASK_STATUS_IGNORED, ) -> None: - nonlocal maybe_outcome - nonlocal to_return + + # TODO: this was working before?! + # nonlocal to_return with cs: task = trio.lowlevel.current_task() self._scopes[cs] = task - # TODO: instead we should probably just use - # `Outcome.send(mngr)` here no and inside a custom - # decorator `@trio.cancel_scope_manager` enforce - # that it's a single yield generator? - with mngr as to_return: + # execute up to the first yield + try: + to_return: tuple[Any] = next(mngr) + except StopIteration: + raise RuntimeError("task manager didn't yield") from None - # TODO: relay through whatever the - # started task passes back via `.started()` ? - # seems like that won't work with also returning - # a "task handle"? - task_status.started() + # TODO: how do we support `.start()` style? + # - relay through whatever the + # started task passes back via `.started()` ? + # seems like that won't work with also returning + # a "task handle"? + # - we were previously binding-out this `to_return` to + # the parent's lexical scope, why isn't that working + # now? + task_status.started(to_return) - # invoke underlying func now that cs is entered. - outcome = await acapture(async_fn, *args) + # invoke underlying func now that cs is entered. + outcome = await acapture(async_fn, *args) - # TODO: instead, mngr.send(outcome) so that we don't - # tie this `.start_soon()` impl to the - # `MaybeOutcome` type? Use `Outcome.send(mngr)` - # right? - maybe_outcome._set_outcome(outcome) + # execute from the 1st yield to return and expect + # generator-mngr `@task_scope_manager` thinger to + # terminate! + try: + mngr.send(outcome) - await n.start(_start_wrapped_in_scope) + # NOTE: this will instead send the underlying + # `.value`? Not sure if that's better or not? + # I would presume it's better to have a handle to + # the `Outcome` entirely? This method sends *into* + # the mngr this `Outcome.value`; seems like kinda + # weird semantics for our purposes? + # outcome.send(mngr) + + except StopIteration: + return + else: + raise RuntimeError(f"{mngr} didn't stop!") + + to_return = await n.start(_start_wrapped_in_scope) assert to_return is not None - # TODO: better way to concat the values delivered by the user - # provided `.scope_manager` and the outcome? - return tuple([maybe_outcome] + to_return) + # TODO: use the fancy type-check-time type signature stuff from + # mypy i guess..to like, relay the type of whatever the + # generator yielded through? betcha that'll be un-grokable XD + return to_return + + + +# TODO: you could wrap your output task handle in this? +# class TaskHandle(Struct): +# task: Task +# cs: CancelScope +# outcome: TaskOutcome # TODO: maybe just make this a generator with a single yield that also # delivers a value (of some std type) from the yield expression? -# @trio.cancel_scope_manager -@cm +# @trio.task_scope_manager def add_task_handle_and_crash_handling( nursery: Nursery, scope: CancelScope, - maybe_outcome: MaybeOutcome, ) -> Generator[None, list[Any]]: cs: CancelScope = CancelScope() + task_outcome = TaskOutcome() # if you need it you can ask trio for the task obj task: Task = trio.lowlevel.current_task() @@ -197,12 +227,11 @@ def add_task_handle_and_crash_handling( try: # yields back when task is terminated, cancelled, returns? with cs: - # the yielded values here are what are returned to the - # nursery's `.start_soon()` caller - # TODO: actually make this work so that `MaybeOutcome` isn't - # tied to the impl of `.start_soon()` on our custom nursery! - task_outcome: Outcome = yield [cs] + # the yielded value(s) here are what are returned to the + # nursery's `.start_soon()` caller B) + lowlevel_outcome: Outcome = yield (task_outcome, cs) + task_outcome._set_outcome(lowlevel_outcome) except Exception as err: # Adds "crash handling" from `pdbp` by entering @@ -247,7 +276,7 @@ if __name__ == '__main__': val: str = 'yoyoyo' val_outcome, cs = await sn.start_soon(sleep_then_return_val, val) - res = await val_outcome.unwrap() + res = await val_outcome.wait_for_result() assert res == val print(f'GOT EXPECTED TASK VALUE: {res}') -- 2.34.1 From 3613b6019c6fe814b4bb5931826bfaf99fa3f1fb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 13:34:00 -0400 Subject: [PATCH 08/12] Facepalm, don't pass in unecessary cancel scope --- tractor/trionics/_supervisor.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index 2a664a9f..dcd20502 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -136,7 +136,7 @@ class ScopePerTaskNursery(Struct): else: # NOTE: what do we enforce as a signature for the # `@task_scope_manager` here? - mngr = sm(nursery=n, scope=cs) + mngr = sm(nursery=n) async def _start_wrapped_in_scope( task_status: TaskStatus[ @@ -213,11 +213,9 @@ class ScopePerTaskNursery(Struct): # @trio.task_scope_manager def add_task_handle_and_crash_handling( nursery: Nursery, - scope: CancelScope, ) -> Generator[None, list[Any]]: - cs: CancelScope = CancelScope() task_outcome = TaskOutcome() # if you need it you can ask trio for the task obj @@ -226,7 +224,7 @@ def add_task_handle_and_crash_handling( try: # yields back when task is terminated, cancelled, returns? - with cs: + with CancelScope() as cs: # the yielded value(s) here are what are returned to the # nursery's `.start_soon()` caller B) -- 2.34.1 From c32520cb110473e2c84dd12946d4bb4b457ad412 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 14:03:07 -0400 Subject: [PATCH 09/12] Ensure user-allocated cancel scope just works! Turns out the nursery doesn't have to care about allocating a per task `CancelScope` since the user can just do that in the `@task_scope_manager` if desired B) So just mask all the nursery cs allocating with the intention of removal. Also add a test for per-task-cancellation by starting the crash task as a `trio.sleep_forever()` but then cancel it via the user allocated cs and ensure the crash propagates as expected :boom: --- tractor/trionics/_supervisor.py | 134 ++++++++++++++++++++------------ 1 file changed, 83 insertions(+), 51 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index dcd20502..d23e1df8 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -126,18 +126,23 @@ class ScopePerTaskNursery(Struct): # task = new_tasks.pop() n: Nursery = self._n - cs = CancelScope() + + sm = self.scope_manager + # we do default behavior of a scope-per-nursery + # if the user did not provide a task manager. + if sm is None: + return n.start_soon(async_fn, *args, name=None) + + # per_task_cs = CancelScope() new_task: Task | None = None to_return: tuple[Any] | None = None - sm = self.scope_manager - if sm is None: - mngr = nullcontext([cs]) - else: - # NOTE: what do we enforce as a signature for the - # `@task_scope_manager` here? - mngr = sm(nursery=n) - + # NOTE: what do we enforce as a signature for the + # `@task_scope_manager` here? + mngr = sm( + nursery=n, + # scope=per_task_cs, + ) async def _start_wrapped_in_scope( task_status: TaskStatus[ tuple[CancelScope, Task] @@ -148,48 +153,49 @@ class ScopePerTaskNursery(Struct): # TODO: this was working before?! # nonlocal to_return - with cs: + task = trio.lowlevel.current_task() + # self._scopes[per_task_cs] = task - task = trio.lowlevel.current_task() - self._scopes[cs] = task + # NOTE: we actually don't need this since the user can + # just to it themselves inside mngr! + # with per_task_cs: - # execute up to the first yield - try: - to_return: tuple[Any] = next(mngr) - except StopIteration: - raise RuntimeError("task manager didn't yield") from None + # execute up to the first yield + try: + to_return: tuple[Any] = next(mngr) + except StopIteration: + raise RuntimeError("task manager didn't yield") from None - # TODO: how do we support `.start()` style? - # - relay through whatever the - # started task passes back via `.started()` ? - # seems like that won't work with also returning - # a "task handle"? - # - we were previously binding-out this `to_return` to - # the parent's lexical scope, why isn't that working - # now? - task_status.started(to_return) + # TODO: how do we support `.start()` style? + # - relay through whatever the + # started task passes back via `.started()` ? + # seems like that won't work with also returning + # a "task handle"? + # - we were previously binding-out this `to_return` to + # the parent's lexical scope, why isn't that working + # now? + task_status.started(to_return) - # invoke underlying func now that cs is entered. - outcome = await acapture(async_fn, *args) + # invoke underlying func now that cs is entered. + outcome = await acapture(async_fn, *args) - # execute from the 1st yield to return and expect - # generator-mngr `@task_scope_manager` thinger to - # terminate! - try: - mngr.send(outcome) + # execute from the 1st yield to return and expect + # generator-mngr `@task_scope_manager` thinger to + # terminate! + try: + mngr.send(outcome) - # NOTE: this will instead send the underlying - # `.value`? Not sure if that's better or not? - # I would presume it's better to have a handle to - # the `Outcome` entirely? This method sends *into* - # the mngr this `Outcome.value`; seems like kinda - # weird semantics for our purposes? - # outcome.send(mngr) - except StopIteration: - return - else: - raise RuntimeError(f"{mngr} didn't stop!") + # I would presume it's better to have a handle to + # the `Outcome` entirely? This method sends *into* + # the mngr this `Outcome.value`; seems like kinda + # weird semantics for our purposes? + # outcome.send(mngr) + + except StopIteration: + return + else: + raise RuntimeError(f"{mngr} didn't stop!") to_return = await n.start(_start_wrapped_in_scope) assert to_return is not None @@ -200,7 +206,6 @@ class ScopePerTaskNursery(Struct): return to_return - # TODO: you could wrap your output task handle in this? # class TaskHandle(Struct): # task: Task @@ -214,6 +219,11 @@ class ScopePerTaskNursery(Struct): def add_task_handle_and_crash_handling( nursery: Nursery, + # TODO: is this the only way we can have a per-task scope + # allocated or can we allow the user to somehow do it if + # they want below? + # scope: CancelScope, + ) -> Generator[None, list[Any]]: task_outcome = TaskOutcome() @@ -222,8 +232,12 @@ def add_task_handle_and_crash_handling( task: Task = trio.lowlevel.current_task() print(f'Spawning task: {task.name}') + # yields back when task is terminated, cancelled, returns. try: - # yields back when task is terminated, cancelled, returns? + # XXX: wait, this isn't doing anything right since we'd have to + # manually activate this scope using something like: + # `task._activate_cancel_status(cs._cancel_status)` ?? + # oh wait, but `.__enter__()` does all that already? with CancelScope() as cs: # the yielded value(s) here are what are returned to the @@ -260,6 +274,19 @@ async def sleep_then_return_val(val: str): return val +async def ensure_cancelled(): + try: + await trio.sleep_forever() + + except trio.Cancelled: + task = trio.lowlevel.current_task() + print(f'heyyo ONLY {task.name} was cancelled as expected B)') + assert 0 + + except BaseException: + raise RuntimeError("woa woa woa this ain't right!") + + if __name__ == '__main__': async def main(): @@ -267,17 +294,22 @@ if __name__ == '__main__': scope_manager=add_task_handle_and_crash_handling, ) as sn: for _ in range(3): - outcome, cs = await sn.start_soon(trio.sleep_forever) + outcome, _ = await sn.start_soon(trio.sleep_forever) # extra task we want to engage in debugger post mortem. - err_outcome, *_ = await sn.start_soon(sleep_then_err) + err_outcome, cs = await sn.start_soon(ensure_cancelled) val: str = 'yoyoyo' - val_outcome, cs = await sn.start_soon(sleep_then_return_val, val) + val_outcome, _ = await sn.start_soon( + sleep_then_return_val, + val, + ) res = await val_outcome.wait_for_result() assert res == val - print(f'GOT EXPECTED TASK VALUE: {res}') + print(f'{res} -> GOT EXPECTED TASK VALUE') - print('WAITING FOR CRASH..') + await trio.sleep(0.6) + print('Cancelling and waiting for CRASH..') + cs.cancel() trio.run(main) -- 2.34.1 From 9bbe7ca945967f36f875c2e9e5869c8cdffd6ca8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 14:23:22 -0400 Subject: [PATCH 10/12] More refinements and proper typing - drop unneeded (and commented) internal cs allocating bits. - bypass all task manager stuff if no generator is provided by the caller; i.e. just call `.start_soon()` as normal. - fix `Generator` typing. - add some prints around task manager. - wrap in `TaskOutcome.lowlevel_task: Task`. --- tractor/trionics/_supervisor.py | 81 +++++++++++++++++---------------- 1 file changed, 43 insertions(+), 38 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index d23e1df8..46a1ccdf 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -24,7 +24,10 @@ from contextlib import ( contextmanager as cm, nullcontext, ) -from typing import ContextManager +from typing import ( + Generator, + Any, +) from outcome import ( Outcome, @@ -47,6 +50,7 @@ class TaskOutcome(Struct): to the eventual boxed result/value or raised exception. ''' + lowlevel_task: Task _exited: Event = trio.Event() # as per `trio.Runner.task_exited()` _outcome: Outcome | None = None # as per `outcome.Outcome` _result: Any | None = None # the eventual maybe-returned-value @@ -105,7 +109,7 @@ class ScopePerTaskNursery(Struct): tuple[CancelScope, Outcome] ] = {} - scope_manager: ContextManager | None = None + scope_manager: Generator[Any, Outcome, None] | None = None async def start_soon( self, @@ -133,16 +137,13 @@ class ScopePerTaskNursery(Struct): if sm is None: return n.start_soon(async_fn, *args, name=None) - # per_task_cs = CancelScope() new_task: Task | None = None to_return: tuple[Any] | None = None # NOTE: what do we enforce as a signature for the # `@task_scope_manager` here? - mngr = sm( - nursery=n, - # scope=per_task_cs, - ) + mngr = sm(nursery=n) + async def _start_wrapped_in_scope( task_status: TaskStatus[ tuple[CancelScope, Task] @@ -153,13 +154,6 @@ class ScopePerTaskNursery(Struct): # TODO: this was working before?! # nonlocal to_return - task = trio.lowlevel.current_task() - # self._scopes[per_task_cs] = task - - # NOTE: we actually don't need this since the user can - # just to it themselves inside mngr! - # with per_task_cs: - # execute up to the first yield try: to_return: tuple[Any] = next(mngr) @@ -206,15 +200,9 @@ class ScopePerTaskNursery(Struct): return to_return -# TODO: you could wrap your output task handle in this? -# class TaskHandle(Struct): -# task: Task -# cs: CancelScope -# outcome: TaskOutcome - - -# TODO: maybe just make this a generator with a single yield that also -# delivers a value (of some std type) from the yield expression? +# TODO: define a decorator to runtime type check that this a generator +# with a single yield that also delivers a value (of some std type) from +# the yield expression? # @trio.task_scope_manager def add_task_handle_and_crash_handling( nursery: Nursery, @@ -224,20 +212,35 @@ def add_task_handle_and_crash_handling( # they want below? # scope: CancelScope, -) -> Generator[None, list[Any]]: +) -> Generator[ + Any, + Outcome, + None, +]: + ''' + A customizable, user defined "task scope manager". - task_outcome = TaskOutcome() + With this specially crafted single-yield generator function you can + add more granular controls around every task spawned by `trio` B) + ''' # if you need it you can ask trio for the task obj task: Task = trio.lowlevel.current_task() print(f'Spawning task: {task.name}') - # yields back when task is terminated, cancelled, returns. + # User defined "task handle" for more granular supervision + # of each spawned task as needed for their particular usage. + task_outcome = TaskOutcome(task) + + # NOTE: if wanted the user could wrap the output task handle however + # they want! + # class TaskHandle(Struct): + # task: Task + # cs: CancelScope + # outcome: TaskOutcome + + # this yields back when the task is terminated, cancelled or returns. try: - # XXX: wait, this isn't doing anything right since we'd have to - # manually activate this scope using something like: - # `task._activate_cancel_status(cs._cancel_status)` ?? - # oh wait, but `.__enter__()` does all that already? with CancelScope() as cs: # the yielded value(s) here are what are returned to the @@ -245,12 +248,16 @@ def add_task_handle_and_crash_handling( lowlevel_outcome: Outcome = yield (task_outcome, cs) task_outcome._set_outcome(lowlevel_outcome) + # Adds "crash handling" from `pdbp` by entering + # a REPL on std errors. except Exception as err: - # Adds "crash handling" from `pdbp` by entering - # a REPL on std errors. + print(f'{task.name} crashed, entering debugger!') pdbp.xpm() raise + finally: + print(f'{task.name} Exitted') + @acm async def open_nursery( @@ -264,11 +271,6 @@ async def open_nursery( ) -async def sleep_then_err(): - await trio.sleep(1) - assert 0 - - async def sleep_then_return_val(val: str): await trio.sleep(0.2) return val @@ -309,7 +311,10 @@ if __name__ == '__main__': print(f'{res} -> GOT EXPECTED TASK VALUE') await trio.sleep(0.6) - print('Cancelling and waiting for CRASH..') + print( + 'Cancelling and waiting on {err_outcome.lowlevel_task} ' + 'to CRASH..' + ) cs.cancel() trio.run(main) -- 2.34.1 From c169417085b0d7623a847c6bfb4a2cbec8a8c3bb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 14:49:10 -0400 Subject: [PATCH 11/12] Go all in on "task manager" naming --- tractor/trionics/_supervisor.py | 37 +++++++++++++-------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index 46a1ccdf..2a5f73e1 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -22,7 +22,6 @@ from __future__ import annotations from contextlib import ( asynccontextmanager as acm, contextmanager as cm, - nullcontext, ) from typing import ( Generator, @@ -51,7 +50,7 @@ class TaskOutcome(Struct): ''' lowlevel_task: Task - _exited: Event = trio.Event() # as per `trio.Runner.task_exited()` + _exited = trio.Event() # as per `trio.Runner.task_exited()` _outcome: Outcome | None = None # as per `outcome.Outcome` _result: Any | None = None # the eventual maybe-returned-value @@ -63,9 +62,8 @@ class TaskOutcome(Struct): ''' if self._outcome is None: raise RuntimeError( - # f'Task {task.name} is not complete.\n' - f'Outcome is not complete.\n' - 'wait on `await TaskOutcome.wait_for_result()` first!' + f'Task {self.lowlevel_task.name} is not complete.\n' + 'First wait on `await TaskOutcome.wait_for_result()`!' ) return self._result @@ -102,14 +100,14 @@ class TaskOutcome(Struct): return self.result -class ScopePerTaskNursery(Struct): +class TaskManagerNursery(Struct): _n: Nursery _scopes: dict[ Task, tuple[CancelScope, Outcome] ] = {} - scope_manager: Generator[Any, Outcome, None] | None = None + task_manager: Generator[Any, Outcome, None] | None = None async def start_soon( self, @@ -117,7 +115,7 @@ class ScopePerTaskNursery(Struct): *args, name=None, - scope_manager: ContextManager | None = None, + task_manager: Generator[Any, Outcome, None] | None = None ) -> tuple[CancelScope, Task]: @@ -131,7 +129,7 @@ class ScopePerTaskNursery(Struct): n: Nursery = self._n - sm = self.scope_manager + sm = self.task_manager # we do default behavior of a scope-per-nursery # if the user did not provide a task manager. if sm is None: @@ -151,7 +149,8 @@ class ScopePerTaskNursery(Struct): ) -> None: - # TODO: this was working before?! + # TODO: this was working before?! and, do we need something + # like it to implement `.start()`? # nonlocal to_return # execute up to the first yield @@ -203,15 +202,10 @@ class ScopePerTaskNursery(Struct): # TODO: define a decorator to runtime type check that this a generator # with a single yield that also delivers a value (of some std type) from # the yield expression? -# @trio.task_scope_manager +# @trio.task_manager def add_task_handle_and_crash_handling( nursery: Nursery, - # TODO: is this the only way we can have a per-task scope - # allocated or can we allow the user to somehow do it if - # they want below? - # scope: CancelScope, - ) -> Generator[ Any, Outcome, @@ -261,14 +255,11 @@ def add_task_handle_and_crash_handling( @acm async def open_nursery( - scope_manager = None, + task_manager = None, **kwargs, ): async with trio.open_nursery(**kwargs) as nurse: - yield ScopePerTaskNursery( - nurse, - scope_manager=scope_manager, - ) + yield TaskManagerNursery(nurse, task_manager=task_manager) async def sleep_then_return_val(val: str): @@ -293,7 +284,7 @@ if __name__ == '__main__': async def main(): async with open_nursery( - scope_manager=add_task_handle_and_crash_handling, + task_manager=add_task_handle_and_crash_handling, ) as sn: for _ in range(3): outcome, _ = await sn.start_soon(trio.sleep_forever) @@ -312,7 +303,7 @@ if __name__ == '__main__': await trio.sleep(0.6) print( - 'Cancelling and waiting on {err_outcome.lowlevel_task} ' + f'Cancelling and waiting on {err_outcome.lowlevel_task} ' 'to CRASH..' ) cs.cancel() -- 2.34.1 From c362603d15811b6bd92acb218d9ecb10a3a0458f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 15:51:47 -0400 Subject: [PATCH 12/12] Add `debug_mode: bool` control to task mngr Allows dynamically importing `pdbp` when enabled and a way for eventually linking with `tractor`'s own debug mode flag. --- tractor/trionics/_supervisor.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index 2a5f73e1..1012571d 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -23,6 +23,7 @@ from contextlib import ( asynccontextmanager as acm, contextmanager as cm, ) +from functools import partial from typing import ( Generator, Any, @@ -32,7 +33,6 @@ from outcome import ( Outcome, acapture, ) -import pdbp from msgspec import Struct import trio from trio._core._run import ( @@ -206,6 +206,8 @@ class TaskManagerNursery(Struct): def add_task_handle_and_crash_handling( nursery: Nursery, + debug_mode: bool = False, + ) -> Generator[ Any, Outcome, @@ -246,7 +248,9 @@ def add_task_handle_and_crash_handling( # a REPL on std errors. except Exception as err: print(f'{task.name} crashed, entering debugger!') - pdbp.xpm() + if debug_mode: + import pdbp + pdbp.xpm() raise finally: @@ -255,11 +259,15 @@ def add_task_handle_and_crash_handling( @acm async def open_nursery( - task_manager = None, - **kwargs, + task_manager: Generator[Any, Outcome, None] | None = None, + + **lowlevel_nursery_kwargs, ): - async with trio.open_nursery(**kwargs) as nurse: - yield TaskManagerNursery(nurse, task_manager=task_manager) + async with trio.open_nursery(**lowlevel_nursery_kwargs) as nurse: + yield TaskManagerNursery( + nurse, + task_manager=task_manager, + ) async def sleep_then_return_val(val: str): @@ -284,7 +292,10 @@ if __name__ == '__main__': async def main(): async with open_nursery( - task_manager=add_task_handle_and_crash_handling, + task_manager=partial( + add_task_handle_and_crash_handling, + debug_mode=True, + ), ) as sn: for _ in range(3): outcome, _ = await sn.start_soon(trio.sleep_forever) -- 2.34.1