diff --git a/tractor/hilevel/_service.py b/tractor/hilevel/_service.py index 869d486..c94e421 100644 --- a/tractor/hilevel/_service.py +++ b/tractor/hilevel/_service.py @@ -197,7 +197,7 @@ async def open_service_mngr( yield mngr finally: # TODO: is this more clever/efficient? - # if 'samplerd' in mngr.service_tasks: + # if 'samplerd' in mngr.service_ctxs: # await mngr.cancel_service('samplerd') tn.cancel_scope.cancel() @@ -231,6 +231,108 @@ def get_service_mngr() -> ServiceMngr: return maybe_mngr +async def _open_and_supervise_service_ctx( + serman: ServiceMngr, + name: str, + ctx_fn: Callable, # TODO, type for `@tractor.context` requirement + portal: Portal, + + allow_overruns: bool = False, + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + Context, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, + **ctx_kwargs, + +) -> Any: + ''' + Open a remote IPC-context defined by `ctx_fn` in the + (service) actor accessed via `portal` and supervise the + (local) parent task to termination at which point the remote + actor runtime is cancelled alongside it. + + The main application is for allocating long-running + "sub-services" in a main daemon and explicitly controlling + their lifetimes from an actor-global singleton. + + ''' + # TODO: use the ctx._scope directly here instead? + # -[ ] actually what semantics do we expect for this + # usage!? + with trio.CancelScope() as cs: + try: + async with portal.open_context( + ctx_fn, + allow_overruns=allow_overruns, + **ctx_kwargs, + + ) as (ctx, started): + + # unblock once the remote context has started + complete = trio.Event() + task_status.started(( + cs, + ctx, + complete, + started, + )) + log.info( + f'`pikerd` service {name} started with value {started}' + ) + # wait on any context's return value + # and any final portal result from the + # sub-actor. + ctx_res: Any = await ctx.wait_for_result() + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. + return ( + await portal.wait_for_result(), + ctx_res, + ) + + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.chan.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service `{name}` was remotely cancelled by a peer?\n' + + # TODO: this would be a good spot to use + # a respawn feature Bo + f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' + + f'cancellee: {portal.chan.uid}\n' + f'canceller: {canceller}\n' + ) + else: + raise + + finally: + # NOTE: the ctx MUST be cancelled first if we + # don't want the above `ctx.wait_for_result()` to + # raise a self-ctxc. WHY, well since from the ctx's + # perspective the cancel request will have + # arrived out-out-of-band at the `Actor.cancel()` + # level, thus `Context.cancel_called == False`, + # meaning `ctx._is_self_cancelled() == False`. + # with trio.CancelScope(shield=True): + # await ctx.cancel() + await portal.cancel_actor() # terminate (remote) sub-actor + complete.set() # signal caller this task is done + serman.service_ctxs.pop(name) # remove mngr entry + + # TODO: we need remote wrapping and a general soln: # - factor this into a ``tractor.highlevel`` extension # pack for the # library. @@ -252,6 +354,14 @@ class ServiceMngr: debug_mode: bool = False # tractor sub-actor debug mode flag service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + trio.Event, + ] + ] = field(default_factory=dict) + + service_ctxs: dict[ str, tuple[ trio.CancelScope, @@ -319,8 +429,6 @@ class ServiceMngr: # retstart if needed. self.service_tasks[name] = ( cs, - None, - None, complete, ) return ( @@ -328,13 +436,33 @@ class ServiceMngr: complete, ) + async def cancel_service_task( + self, + name: str, + + ) -> Any: + log.info(f'Cancelling `pikerd` service {name}') + cs, complete = self.service_tasks[name] + + cs.cancel() + await complete.wait() + # TODO, if we use the `TaskMngr` from #346 + # we can also get the return value from the task! + + if name in self.service_tasks: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Service task {name!r} not terminated!?\n' + ) + async def start_service_ctx( self, name: str, portal: Portal, # TODO: typevar for the return type of the target and then # use it below for `ctx_res`? - target: Callable, + ctx_fn: Callable, **ctx_kwargs, ) -> tuple[ @@ -342,150 +470,41 @@ class ServiceMngr: Context, Any, ]: - cs, sub_ctx, complete, started = await self.service_n.start( + ''' + Start a remote IPC-context defined by `ctx_fn` in a background + task and immediately return supervision primitives to manage it: + + - a `cs: CancelScope` for the newly allocated bg task + - the `ipc_ctx: Context` to manage the remotely scheduled + `trio.Task`. + - the `started: Any` value returned by the remote endpoint + task's `Context.started(<value>)` call. + + The bg task supervises the ctx such that when it terminates the supporting + actor runtime is also cancelled, see `_open_and_supervise_service_ctx()` + for details. + + ''' + cs, ipc_ctx, complete, started = await self.service_n.start( functools.partial( - self._open_and_supervise_service_ctx, + _open_and_supervise_service_ctx, + serman=self, name=name, - target=target, + ctx_fn=ctx_fn, portal=portal, **ctx_kwargs, - ) ) # store the cancel scope and portal for later cancellation or # retstart if needed. - self.service_tasks[name] = (cs, sub_ctx, portal, complete) + self.service_ctxs[name] = (cs, ipc_ctx, portal, complete) return ( cs, - sub_ctx, + ipc_ctx, started, ) - async def _open_and_supervise_service_ctx( - self, - name: str, - target: Callable, # TODO, type for `@tractor.context` requirement - portal: Portal, - - allow_overruns: bool = False, - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - Context, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - **ctx_kwargs, - - ) -> Any: - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - # TODO: use the ctx._scope directly here instead? - # -[ ] actually what semantics do we expect for this - # usage!? - with trio.CancelScope() as cs: - try: - async with portal.open_context( - target, - allow_overruns=allow_overruns, - **ctx_kwargs, - - ) as (ctx, started): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started(( - cs, - ctx, - complete, - started, - )) - log.info( - f'`pikerd` service {name} started with value {started}' - ) - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res: Any = await ctx.wait_for_result() - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return ( - await portal.wait_for_result(), - ctx_res, - ) - - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.chan.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service `{name}` was remotely cancelled by a peer?\n' - - # TODO: this would be a good spot to use - # a respawn feature Bo - f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' - - f'cancellee: {portal.chan.uid}\n' - f'canceller: {canceller}\n' - ) - else: - raise - - finally: - # NOTE: the ctx MUST be cancelled first if we - # don't want the above `ctx.wait_for_result()` to - # raise a self-ctxc. WHY, well since from the ctx's - # perspective the cancel request will have - # arrived out-out-of-band at the `Actor.cancel()` - # level, thus `Context.cancel_called == False`, - # meaning `ctx._is_self_cancelled() == False`. - # with trio.CancelScope(shield=True): - # await ctx.cancel() - await portal.cancel_actor() # terminate (remote) sub-actor - complete.set() # signal caller this task is done - self.service_tasks.pop(name) # remove mngr entry - - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, sub_ctx, portal, complete = self.service_tasks[name] - - # cs.cancel() - await sub_ctx.cancel() - await complete.wait() - - if name in self.service_tasks: - # TODO: custom err? - # raise ServiceError( - raise RuntimeError( - f'Service actor for {name} not terminated and/or unknown?' - ) - - # assert name not in self.service_tasks, \ - # f'Serice task for {name} not terminated?' - async def start_service( self, daemon_name: str, @@ -493,23 +512,36 @@ class ServiceMngr: # ^TODO, type for `@tractor.context` deco-ed funcs! debug_mode: bool = False, - **tractor_actor_kwargs, + **start_actor_kwargs, ) -> Context: ''' - Start a "service" task in a (new) sub-actor and manage its - lifetime indefinitely until termination. + Start new subactor and schedule a supervising "service task" + in it which explicitly defines the sub's lifetime. - Service actors can be cancelled (and thus shutdown) using - `.cancel_service()`. + "Service daemon subactors" are cancelled (and thus + terminated) using the paired `.cancel_service()`. + + Effectively this API can be used to manage "service daemons" + spawned under a single parent actor with supervision + semantics equivalent to a one-cancels-one style actor-nursery + or "(subactor) task manager" where each subprocess's (and + thus its embedded actor runtime) lifetime is synced to that + of the remotely spawned task defined by `ctx_ep`. + + The funcionality can be likened to a "daemonized" version of + `.hilevel.worker.run_in_actor()` but with supervision + controls offered by `tractor.Context` where the main/root + remotely scheduled `trio.Task` invoking `ctx_ep` determines + the underlying subactor's lifetime. ''' - entry: tuple|None = self.service_tasks.get(daemon_name) + entry: tuple|None = self.service_ctxs.get(daemon_name) if entry: (cs, sub_ctx, portal, complete) = entry return sub_ctx - if daemon_name not in self.service_tasks: + if daemon_name not in self.service_ctxs: portal: Portal = await self.actor_n.start_actor( daemon_name, debug_mode=( # maybe set globally during allocate @@ -517,7 +549,7 @@ class ServiceMngr: or self.debug_mode ), - **tractor_actor_kwargs, + **start_actor_kwargs, ) ctx_kwargs: dict[str, Any] = {} if isinstance(ctx_ep, functools.partial): @@ -531,8 +563,34 @@ class ServiceMngr: ) = await self.start_service_ctx( name=daemon_name, portal=portal, - target=ctx_ep, + ctx_fn=ctx_ep, **ctx_kwargs, ) return sub_ctx + + async def cancel_service( + self, + name: str, + + ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' + log.info(f'Cancelling `pikerd` service {name}') + cs, sub_ctx, portal, complete = self.service_ctxs[name] + + # cs.cancel() + await sub_ctx.cancel() + await complete.wait() + + if name in self.service_ctxs: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Service actor for {name} not terminated and/or unknown?' + ) + + # assert name not in self.service_ctxs, \ + # f'Serice task for {name} not terminated?'