From 05f874001ac919edf6685ad818a6233ea57857f4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 4 Jan 2024 10:06:42 -0500 Subject: [PATCH] Ignore `ContextCancelled`s from non-mngr requests Since service daemon actors may be cancelled remotely by clients (who maybe also requested said daemon-actor's spawn in the first place) we specifically ignore `tractor.ContextCancelled`s from the `ctx.wait()` inside `Services.start_service_task()` to avoid crashing the service mngr, and thus for now `pikerd`, (which **does** happen now due to updated and more explicit remote cancellation semantics implemented in `tractor`) since the `.canceller` field is not going to match the `pikerd` uid in such cases! This explicit check makes sense since the `Services` mngr is built to allow remote requests to "spawn-n-supervise service actors" where the services can remain persistent but also cancelled later as requested. We may want to consider only allowing cancellation by actors who requested spawn in the future tho? Also change to more explicit imports to `tractor` types for annots throughout the sub-pkg. --- piker/service/_ahab.py | 4 ++-- piker/service/_mngr.py | 33 +++++++++++++++++++++++++++++---- piker/service/_registry.py | 10 +++++++++- 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 49d72de6..4cccf855 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -15,8 +15,8 @@ # along with this program. If not, see . ''' -Supervisor for ``docker`` with included async and SC wrapping -to ensure a cancellable container lifetime system. +Supervisor for ``docker`` with included async and SC wrapping to +ensure a cancellable container lifetime system. ''' from __future__ import annotations diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 70771593..89e98411 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -27,6 +27,12 @@ from typing import ( import trio from trio_typing import TaskStatus import tractor +from tractor import ( + current_actor, + ContextCancelled, + Context, + Portal, +) from ._util import ( log, # sub-sys logger @@ -38,6 +44,8 @@ from ._util import ( # library. # - wrap a "remote api" wherein you can get a method proxy # to the pikerd actor for starting services remotely! +# - prolly rename this to ActorServicesNursery since it spawns +# new actors and supervises them to completion? class Services: actor_n: tractor._supervise.ActorNursery @@ -47,7 +55,7 @@ class Services: str, tuple[ trio.CancelScope, - tractor.Portal, + Portal, trio.Event, ] ] = {} @@ -57,12 +65,12 @@ class Services: async def start_service_task( self, name: str, - portal: tractor.Portal, + portal: Portal, target: Callable, allow_overruns: bool = False, **ctx_kwargs, - ) -> (trio.CancelScope, tractor.Context): + ) -> (trio.CancelScope, Context): ''' Open a context in a service sub-actor, add to a stack that gets unwound at ``pikerd`` teardown. @@ -101,13 +109,30 @@ class Services: # wait on any context's return value # and any final portal result from the # sub-actor. - ctx_res = await ctx.result() + ctx_res: Any = await ctx.result() # NOTE: blocks indefinitely until cancelled # either by error from the target context # function or by being cancelled here by the # surrounding cancel scope. return (await portal.result(), ctx_res) + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.channel.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service {name} was remotely cancelled?\n' + f'remote canceller: {canceller}\n' + f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' + ) + else: + raise + + finally: await portal.cancel_actor() diff --git a/piker/service/_registry.py b/piker/service/_registry.py index 7391dd49..ed4569f7 100644 --- a/piker/service/_registry.py +++ b/piker/service/_registry.py @@ -27,6 +27,7 @@ from typing import ( ) import tractor +from tractor import Portal from ._util import ( log, # sub-sys logger @@ -140,7 +141,11 @@ async def find_service( first_only: bool = True, -) -> tractor.Portal | None: +) -> ( + Portal + | list[Portal] + | None +): reg_addrs: list[tuple[str, int]] async with open_registry( @@ -153,6 +158,9 @@ async def find_service( ), ) as reg_addrs: log.info(f'Scanning for service `{service_name}`') + + maybe_portals: list[Portal] | Portal | None + # attach to existing daemon by name if possible async with tractor.find_actor( service_name,