diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 49d72de6..4cccf855 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -15,8 +15,8 @@ # along with this program. If not, see . ''' -Supervisor for ``docker`` with included async and SC wrapping -to ensure a cancellable container lifetime system. +Supervisor for ``docker`` with included async and SC wrapping to +ensure a cancellable container lifetime system. ''' from __future__ import annotations diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 70771593..89e98411 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -27,6 +27,12 @@ from typing import ( import trio from trio_typing import TaskStatus import tractor +from tractor import ( + current_actor, + ContextCancelled, + Context, + Portal, +) from ._util import ( log, # sub-sys logger @@ -38,6 +44,8 @@ from ._util import ( # library. # - wrap a "remote api" wherein you can get a method proxy # to the pikerd actor for starting services remotely! +# - prolly rename this to ActorServicesNursery since it spawns +# new actors and supervises them to completion? class Services: actor_n: tractor._supervise.ActorNursery @@ -47,7 +55,7 @@ class Services: str, tuple[ trio.CancelScope, - tractor.Portal, + Portal, trio.Event, ] ] = {} @@ -57,12 +65,12 @@ class Services: async def start_service_task( self, name: str, - portal: tractor.Portal, + portal: Portal, target: Callable, allow_overruns: bool = False, **ctx_kwargs, - ) -> (trio.CancelScope, tractor.Context): + ) -> (trio.CancelScope, Context): ''' Open a context in a service sub-actor, add to a stack that gets unwound at ``pikerd`` teardown. @@ -101,13 +109,30 @@ class Services: # wait on any context's return value # and any final portal result from the # sub-actor. - ctx_res = await ctx.result() + ctx_res: Any = await ctx.result() # NOTE: blocks indefinitely until cancelled # either by error from the target context # function or by being cancelled here by the # surrounding cancel scope. return (await portal.result(), ctx_res) + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.channel.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service {name} was remotely cancelled?\n' + f'remote canceller: {canceller}\n' + f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' + ) + else: + raise + + finally: await portal.cancel_actor() diff --git a/piker/service/_registry.py b/piker/service/_registry.py index 7391dd49..ed4569f7 100644 --- a/piker/service/_registry.py +++ b/piker/service/_registry.py @@ -27,6 +27,7 @@ from typing import ( ) import tractor +from tractor import Portal from ._util import ( log, # sub-sys logger @@ -140,7 +141,11 @@ async def find_service( first_only: bool = True, -) -> tractor.Portal | None: +) -> ( + Portal + | list[Portal] + | None +): reg_addrs: list[tuple[str, int]] async with open_registry( @@ -153,6 +158,9 @@ async def find_service( ), ) as reg_addrs: log.info(f'Scanning for service `{service_name}`') + + maybe_portals: list[Portal] | Portal | None + # attach to existing daemon by name if possible async with tractor.find_actor( service_name,