Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding a `.cancel_service_task()`. Also, - move `_open_and_supervise_service_ctx()` to mod level. - rename `target` -> `ctx_fn` params througout. - fill out method doc strings.
							parent
							
								
									8e162d7a70
								
							
						
					
					
						commit
						5e2b7c557e
					
				|  | @ -197,7 +197,7 @@ async def open_service_mngr( | |||
|             yield mngr | ||||
|         finally: | ||||
|             # TODO: is this more clever/efficient? | ||||
|             # if 'samplerd' in mngr.service_tasks: | ||||
|             # if 'samplerd' in mngr.service_ctxs: | ||||
|             #     await mngr.cancel_service('samplerd') | ||||
|             tn.cancel_scope.cancel() | ||||
| 
 | ||||
|  | @ -231,6 +231,108 @@ def get_service_mngr() -> ServiceMngr: | |||
|     return maybe_mngr | ||||
| 
 | ||||
| 
 | ||||
| async def _open_and_supervise_service_ctx( | ||||
|     serman: ServiceMngr, | ||||
|     name: str, | ||||
|     ctx_fn: Callable,  # TODO, type for `@tractor.context` requirement | ||||
|     portal: Portal, | ||||
| 
 | ||||
|     allow_overruns: bool = False, | ||||
|     task_status: TaskStatus[ | ||||
|         tuple[ | ||||
|             trio.CancelScope, | ||||
|             Context, | ||||
|             trio.Event, | ||||
|             Any, | ||||
|         ] | ||||
|     ] = trio.TASK_STATUS_IGNORED, | ||||
|     **ctx_kwargs, | ||||
| 
 | ||||
| ) -> Any: | ||||
|     ''' | ||||
|     Open a remote IPC-context defined by `ctx_fn` in the | ||||
|     (service) actor accessed via `portal` and supervise the | ||||
|     (local) parent task to termination at which point the remote | ||||
|     actor runtime is cancelled alongside it. | ||||
| 
 | ||||
|     The main application is for allocating long-running | ||||
|     "sub-services" in a main daemon and explicitly controlling | ||||
|     their lifetimes from an actor-global singleton. | ||||
| 
 | ||||
|     ''' | ||||
|     # TODO: use the ctx._scope directly here instead? | ||||
|     # -[ ] actually what semantics do we expect for this | ||||
|     #   usage!? | ||||
|     with trio.CancelScope() as cs: | ||||
|         try: | ||||
|             async with portal.open_context( | ||||
|                 ctx_fn, | ||||
|                 allow_overruns=allow_overruns, | ||||
|                 **ctx_kwargs, | ||||
| 
 | ||||
|             ) as (ctx, started): | ||||
| 
 | ||||
|                 # unblock once the remote context has started | ||||
|                 complete = trio.Event() | ||||
|                 task_status.started(( | ||||
|                     cs, | ||||
|                     ctx, | ||||
|                     complete, | ||||
|                     started, | ||||
|                 )) | ||||
|                 log.info( | ||||
|                     f'`pikerd` service {name} started with value {started}' | ||||
|                 ) | ||||
|                 # wait on any context's return value | ||||
|                 # and any final portal result from the | ||||
|                 # sub-actor. | ||||
|                 ctx_res: Any = await ctx.wait_for_result() | ||||
| 
 | ||||
|                 # NOTE: blocks indefinitely until cancelled | ||||
|                 # either by error from the target context | ||||
|                 # function or by being cancelled here by the | ||||
|                 # surrounding cancel scope. | ||||
|                 return ( | ||||
|                     await portal.wait_for_result(), | ||||
|                     ctx_res, | ||||
|                 ) | ||||
| 
 | ||||
|         except ContextCancelled as ctxe: | ||||
|             canceller: tuple[str, str] = ctxe.canceller | ||||
|             our_uid: tuple[str, str] = current_actor().uid | ||||
|             if ( | ||||
|                 canceller != portal.chan.uid | ||||
|                 and | ||||
|                 canceller != our_uid | ||||
|             ): | ||||
|                 log.cancel( | ||||
|                     f'Actor-service `{name}` was remotely cancelled by a peer?\n' | ||||
| 
 | ||||
|                     # TODO: this would be a good spot to use | ||||
|                     # a respawn feature Bo | ||||
|                     f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' | ||||
| 
 | ||||
|                     f'cancellee: {portal.chan.uid}\n' | ||||
|                     f'canceller: {canceller}\n' | ||||
|                 ) | ||||
|             else: | ||||
|                 raise | ||||
| 
 | ||||
|         finally: | ||||
|             # NOTE: the ctx MUST be cancelled first if we | ||||
|             # don't want the above `ctx.wait_for_result()` to | ||||
|             # raise a self-ctxc. WHY, well since from the ctx's | ||||
|             # perspective the cancel request will have | ||||
|             # arrived out-out-of-band at the `Actor.cancel()` | ||||
|             # level, thus `Context.cancel_called == False`, | ||||
|             # meaning `ctx._is_self_cancelled() == False`. | ||||
|             # with trio.CancelScope(shield=True): | ||||
|             # await ctx.cancel() | ||||
|             await portal.cancel_actor()  # terminate (remote) sub-actor | ||||
|             complete.set()  # signal caller this task is done | ||||
|             serman.service_ctxs.pop(name)  # remove mngr entry | ||||
| 
 | ||||
| 
 | ||||
| # TODO: we need remote wrapping and a general soln: | ||||
| # - factor this into a ``tractor.highlevel`` extension # pack for the | ||||
| #   library. | ||||
|  | @ -252,6 +354,14 @@ class ServiceMngr: | |||
|     debug_mode: bool = False # tractor sub-actor debug mode flag | ||||
| 
 | ||||
|     service_tasks: dict[ | ||||
|         str, | ||||
|         tuple[ | ||||
|             trio.CancelScope, | ||||
|             trio.Event, | ||||
|         ] | ||||
|     ] = field(default_factory=dict) | ||||
| 
 | ||||
|     service_ctxs: dict[ | ||||
|         str, | ||||
|         tuple[ | ||||
|             trio.CancelScope, | ||||
|  | @ -319,8 +429,6 @@ class ServiceMngr: | |||
|         # retstart if needed. | ||||
|         self.service_tasks[name] = ( | ||||
|             cs, | ||||
|             None, | ||||
|             None, | ||||
|             complete, | ||||
|         ) | ||||
|         return ( | ||||
|  | @ -328,13 +436,33 @@ class ServiceMngr: | |||
|             complete, | ||||
|         ) | ||||
| 
 | ||||
|     async def cancel_service_task( | ||||
|         self, | ||||
|         name: str, | ||||
| 
 | ||||
|     ) -> Any: | ||||
|         log.info(f'Cancelling `pikerd` service {name}') | ||||
|         cs, complete = self.service_tasks[name] | ||||
| 
 | ||||
|         cs.cancel() | ||||
|         await complete.wait() | ||||
|         # TODO, if we use the `TaskMngr` from #346 | ||||
|         # we can also get the return value from the task! | ||||
| 
 | ||||
|         if name in self.service_tasks: | ||||
|             # TODO: custom err? | ||||
|             # raise ServiceError( | ||||
|             raise RuntimeError( | ||||
|                 f'Service task {name!r} not terminated!?\n' | ||||
|             ) | ||||
| 
 | ||||
|     async def start_service_ctx( | ||||
|         self, | ||||
|         name: str, | ||||
|         portal: Portal, | ||||
|         # TODO: typevar for the return type of the target and then | ||||
|         # use it below for `ctx_res`? | ||||
|         target: Callable, | ||||
|         ctx_fn: Callable, | ||||
|         **ctx_kwargs, | ||||
| 
 | ||||
|     ) -> tuple[ | ||||
|  | @ -342,150 +470,41 @@ class ServiceMngr: | |||
|         Context, | ||||
|         Any, | ||||
|     ]: | ||||
|         cs, sub_ctx, complete, started = await self.service_n.start( | ||||
|         ''' | ||||
|         Start a remote IPC-context defined by `ctx_fn` in a background | ||||
|         task and immediately return supervision primitives to manage it: | ||||
| 
 | ||||
|         - a `cs: CancelScope` for the newly allocated bg task | ||||
|         - the `ipc_ctx: Context` to manage the remotely scheduled | ||||
|           `trio.Task`. | ||||
|         - the `started: Any` value returned by the remote endpoint | ||||
|           task's `Context.started(<value>)` call. | ||||
| 
 | ||||
|         The bg task supervises the ctx such that when it terminates the supporting | ||||
|         actor runtime is also cancelled, see `_open_and_supervise_service_ctx()` | ||||
|         for details. | ||||
| 
 | ||||
|         ''' | ||||
|         cs, ipc_ctx, complete, started = await self.service_n.start( | ||||
|             functools.partial( | ||||
|                 self._open_and_supervise_service_ctx, | ||||
|                 _open_and_supervise_service_ctx, | ||||
|                 serman=self, | ||||
|                 name=name, | ||||
|                 target=target, | ||||
|                 ctx_fn=ctx_fn, | ||||
|                 portal=portal, | ||||
|                 **ctx_kwargs, | ||||
| 
 | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|         # store the cancel scope and portal for later cancellation or | ||||
|         # retstart if needed. | ||||
|         self.service_tasks[name] = (cs, sub_ctx, portal, complete) | ||||
|         self.service_ctxs[name] = (cs, ipc_ctx, portal, complete) | ||||
|         return ( | ||||
|             cs, | ||||
|             sub_ctx, | ||||
|             ipc_ctx, | ||||
|             started, | ||||
|         ) | ||||
| 
 | ||||
|     async def _open_and_supervise_service_ctx( | ||||
|         self, | ||||
|         name: str, | ||||
|         target: Callable,  # TODO, type for `@tractor.context` requirement | ||||
|         portal: Portal, | ||||
| 
 | ||||
|         allow_overruns: bool = False, | ||||
|         task_status: TaskStatus[ | ||||
|             tuple[ | ||||
|                 trio.CancelScope, | ||||
|                 Context, | ||||
|                 trio.Event, | ||||
|                 Any, | ||||
|             ] | ||||
|         ] = trio.TASK_STATUS_IGNORED, | ||||
|         **ctx_kwargs, | ||||
| 
 | ||||
|     ) -> Any: | ||||
|         ''' | ||||
|         Open a context in a service sub-actor, add to a stack | ||||
|         that gets unwound at ``pikerd`` teardown. | ||||
| 
 | ||||
|         This allows for allocating long-running sub-services in our main | ||||
|         daemon and explicitly controlling their lifetimes. | ||||
| 
 | ||||
|         ''' | ||||
|         # TODO: use the ctx._scope directly here instead? | ||||
|         # -[ ] actually what semantics do we expect for this | ||||
|         #   usage!? | ||||
|         with trio.CancelScope() as cs: | ||||
|             try: | ||||
|                 async with portal.open_context( | ||||
|                     target, | ||||
|                     allow_overruns=allow_overruns, | ||||
|                     **ctx_kwargs, | ||||
| 
 | ||||
|                 ) as (ctx, started): | ||||
| 
 | ||||
|                     # unblock once the remote context has started | ||||
|                     complete = trio.Event() | ||||
|                     task_status.started(( | ||||
|                         cs, | ||||
|                         ctx, | ||||
|                         complete, | ||||
|                         started, | ||||
|                     )) | ||||
|                     log.info( | ||||
|                         f'`pikerd` service {name} started with value {started}' | ||||
|                     ) | ||||
|                     # wait on any context's return value | ||||
|                     # and any final portal result from the | ||||
|                     # sub-actor. | ||||
|                     ctx_res: Any = await ctx.wait_for_result() | ||||
| 
 | ||||
|                     # NOTE: blocks indefinitely until cancelled | ||||
|                     # either by error from the target context | ||||
|                     # function or by being cancelled here by the | ||||
|                     # surrounding cancel scope. | ||||
|                     return ( | ||||
|                         await portal.wait_for_result(), | ||||
|                         ctx_res, | ||||
|                     ) | ||||
| 
 | ||||
|             except ContextCancelled as ctxe: | ||||
|                 canceller: tuple[str, str] = ctxe.canceller | ||||
|                 our_uid: tuple[str, str] = current_actor().uid | ||||
|                 if ( | ||||
|                     canceller != portal.chan.uid | ||||
|                     and | ||||
|                     canceller != our_uid | ||||
|                 ): | ||||
|                     log.cancel( | ||||
|                         f'Actor-service `{name}` was remotely cancelled by a peer?\n' | ||||
| 
 | ||||
|                         # TODO: this would be a good spot to use | ||||
|                         # a respawn feature Bo | ||||
|                         f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' | ||||
| 
 | ||||
|                         f'cancellee: {portal.chan.uid}\n' | ||||
|                         f'canceller: {canceller}\n' | ||||
|                     ) | ||||
|                 else: | ||||
|                     raise | ||||
| 
 | ||||
|             finally: | ||||
|                 # NOTE: the ctx MUST be cancelled first if we | ||||
|                 # don't want the above `ctx.wait_for_result()` to | ||||
|                 # raise a self-ctxc. WHY, well since from the ctx's | ||||
|                 # perspective the cancel request will have | ||||
|                 # arrived out-out-of-band at the `Actor.cancel()` | ||||
|                 # level, thus `Context.cancel_called == False`, | ||||
|                 # meaning `ctx._is_self_cancelled() == False`. | ||||
|                 # with trio.CancelScope(shield=True): | ||||
|                 # await ctx.cancel() | ||||
|                 await portal.cancel_actor()  # terminate (remote) sub-actor | ||||
|                 complete.set()  # signal caller this task is done | ||||
|                 self.service_tasks.pop(name)  # remove mngr entry | ||||
| 
 | ||||
|     async def cancel_service( | ||||
|         self, | ||||
|         name: str, | ||||
| 
 | ||||
|     ) -> Any: | ||||
|         ''' | ||||
|         Cancel the service task and actor for the given ``name``. | ||||
| 
 | ||||
|         ''' | ||||
|         log.info(f'Cancelling `pikerd` service {name}') | ||||
|         cs, sub_ctx, portal, complete = self.service_tasks[name] | ||||
| 
 | ||||
|         # cs.cancel() | ||||
|         await sub_ctx.cancel() | ||||
|         await complete.wait() | ||||
| 
 | ||||
|         if name in self.service_tasks: | ||||
|             # TODO: custom err? | ||||
|             # raise ServiceError( | ||||
|             raise RuntimeError( | ||||
|                 f'Service actor for {name} not terminated and/or unknown?' | ||||
|             ) | ||||
| 
 | ||||
|         # assert name not in self.service_tasks, \ | ||||
|         #     f'Serice task for {name} not terminated?' | ||||
| 
 | ||||
|     async def start_service( | ||||
|         self, | ||||
|         daemon_name: str, | ||||
|  | @ -493,23 +512,36 @@ class ServiceMngr: | |||
|         # ^TODO, type for `@tractor.context` deco-ed funcs! | ||||
| 
 | ||||
|         debug_mode: bool = False, | ||||
|         **tractor_actor_kwargs, | ||||
|         **start_actor_kwargs, | ||||
| 
 | ||||
|     ) -> Context: | ||||
|         ''' | ||||
|         Start a "service" task in a (new) sub-actor and manage its | ||||
|         lifetime indefinitely until termination. | ||||
|         Start new subactor and schedule a supervising "service task" | ||||
|         in it which explicitly defines the sub's lifetime. | ||||
| 
 | ||||
|         Service actors can be cancelled (and thus shutdown) using | ||||
|         `.cancel_service()`. | ||||
|         "Service daemon subactors" are cancelled (and thus | ||||
|         terminated) using the paired `.cancel_service()`. | ||||
| 
 | ||||
|         Effectively this API can be used to manage "service daemons" | ||||
|         spawned under a single parent actor with supervision | ||||
|         semantics equivalent to a one-cancels-one style actor-nursery | ||||
|         or "(subactor) task manager" where each subprocess's (and | ||||
|         thus its embedded actor runtime) lifetime is synced to that | ||||
|         of the remotely spawned task defined by `ctx_ep`. | ||||
| 
 | ||||
|         The funcionality can be likened to a "daemonized" version of | ||||
|         `.hilevel.worker.run_in_actor()` but with supervision | ||||
|         controls offered by `tractor.Context` where the main/root | ||||
|         remotely scheduled `trio.Task` invoking `ctx_ep` determines | ||||
|         the underlying subactor's lifetime. | ||||
| 
 | ||||
|         ''' | ||||
|         entry: tuple|None = self.service_tasks.get(daemon_name) | ||||
|         entry: tuple|None = self.service_ctxs.get(daemon_name) | ||||
|         if entry: | ||||
|             (cs, sub_ctx, portal, complete) = entry | ||||
|             return sub_ctx | ||||
| 
 | ||||
|         if daemon_name not in self.service_tasks: | ||||
|         if daemon_name not in self.service_ctxs: | ||||
|             portal: Portal = await self.actor_n.start_actor( | ||||
|                 daemon_name, | ||||
|                 debug_mode=(  # maybe set globally during allocate | ||||
|  | @ -517,7 +549,7 @@ class ServiceMngr: | |||
|                     or | ||||
|                     self.debug_mode | ||||
|                 ), | ||||
|                 **tractor_actor_kwargs, | ||||
|                 **start_actor_kwargs, | ||||
|             ) | ||||
|             ctx_kwargs: dict[str, Any] = {} | ||||
|             if isinstance(ctx_ep, functools.partial): | ||||
|  | @ -531,8 +563,34 @@ class ServiceMngr: | |||
|             ) = await self.start_service_ctx( | ||||
|                 name=daemon_name, | ||||
|                 portal=portal, | ||||
|                 target=ctx_ep, | ||||
|                 ctx_fn=ctx_ep, | ||||
|                 **ctx_kwargs, | ||||
|             ) | ||||
| 
 | ||||
|             return sub_ctx | ||||
| 
 | ||||
|     async def cancel_service( | ||||
|         self, | ||||
|         name: str, | ||||
| 
 | ||||
|     ) -> Any: | ||||
|         ''' | ||||
|         Cancel the service task and actor for the given ``name``. | ||||
| 
 | ||||
|         ''' | ||||
|         log.info(f'Cancelling `pikerd` service {name}') | ||||
|         cs, sub_ctx, portal, complete = self.service_ctxs[name] | ||||
| 
 | ||||
|         # cs.cancel() | ||||
|         await sub_ctx.cancel() | ||||
|         await complete.wait() | ||||
| 
 | ||||
|         if name in self.service_ctxs: | ||||
|             # TODO: custom err? | ||||
|             # raise ServiceError( | ||||
|             raise RuntimeError( | ||||
|                 f'Service actor for {name} not terminated and/or unknown?' | ||||
|             ) | ||||
| 
 | ||||
|         # assert name not in self.service_ctxs, \ | ||||
|         #     f'Serice task for {name} not terminated?' | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue