Better separate service tasks vs. ctxs via methods
Namely splitting the handles for each in 2 separate tables and adding a `.cancel_service_task()`. Also, - move `_open_and_supervise_service_ctx()` to mod level. - rename `target` -> `ctx_fn` params througout. - fill out method doc strings.
							parent
							
								
									087aaa1c36
								
							
						
					
					
						commit
						dd011c0b2f
					
				|  | @ -197,7 +197,7 @@ async def open_service_mngr( | ||||||
|             yield mngr |             yield mngr | ||||||
|         finally: |         finally: | ||||||
|             # TODO: is this more clever/efficient? |             # TODO: is this more clever/efficient? | ||||||
|             # if 'samplerd' in mngr.service_tasks: |             # if 'samplerd' in mngr.service_ctxs: | ||||||
|             #     await mngr.cancel_service('samplerd') |             #     await mngr.cancel_service('samplerd') | ||||||
|             tn.cancel_scope.cancel() |             tn.cancel_scope.cancel() | ||||||
| 
 | 
 | ||||||
|  | @ -231,6 +231,108 @@ def get_service_mngr() -> ServiceMngr: | ||||||
|     return maybe_mngr |     return maybe_mngr | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | async def _open_and_supervise_service_ctx( | ||||||
|  |     serman: ServiceMngr, | ||||||
|  |     name: str, | ||||||
|  |     ctx_fn: Callable,  # TODO, type for `@tractor.context` requirement | ||||||
|  |     portal: Portal, | ||||||
|  | 
 | ||||||
|  |     allow_overruns: bool = False, | ||||||
|  |     task_status: TaskStatus[ | ||||||
|  |         tuple[ | ||||||
|  |             trio.CancelScope, | ||||||
|  |             Context, | ||||||
|  |             trio.Event, | ||||||
|  |             Any, | ||||||
|  |         ] | ||||||
|  |     ] = trio.TASK_STATUS_IGNORED, | ||||||
|  |     **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  | ) -> Any: | ||||||
|  |     ''' | ||||||
|  |     Open a remote IPC-context defined by `ctx_fn` in the | ||||||
|  |     (service) actor accessed via `portal` and supervise the | ||||||
|  |     (local) parent task to termination at which point the remote | ||||||
|  |     actor runtime is cancelled alongside it. | ||||||
|  | 
 | ||||||
|  |     The main application is for allocating long-running | ||||||
|  |     "sub-services" in a main daemon and explicitly controlling | ||||||
|  |     their lifetimes from an actor-global singleton. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     # TODO: use the ctx._scope directly here instead? | ||||||
|  |     # -[ ] actually what semantics do we expect for this | ||||||
|  |     #   usage!? | ||||||
|  |     with trio.CancelScope() as cs: | ||||||
|  |         try: | ||||||
|  |             async with portal.open_context( | ||||||
|  |                 ctx_fn, | ||||||
|  |                 allow_overruns=allow_overruns, | ||||||
|  |                 **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  |             ) as (ctx, started): | ||||||
|  | 
 | ||||||
|  |                 # unblock once the remote context has started | ||||||
|  |                 complete = trio.Event() | ||||||
|  |                 task_status.started(( | ||||||
|  |                     cs, | ||||||
|  |                     ctx, | ||||||
|  |                     complete, | ||||||
|  |                     started, | ||||||
|  |                 )) | ||||||
|  |                 log.info( | ||||||
|  |                     f'`pikerd` service {name} started with value {started}' | ||||||
|  |                 ) | ||||||
|  |                 # wait on any context's return value | ||||||
|  |                 # and any final portal result from the | ||||||
|  |                 # sub-actor. | ||||||
|  |                 ctx_res: Any = await ctx.wait_for_result() | ||||||
|  | 
 | ||||||
|  |                 # NOTE: blocks indefinitely until cancelled | ||||||
|  |                 # either by error from the target context | ||||||
|  |                 # function or by being cancelled here by the | ||||||
|  |                 # surrounding cancel scope. | ||||||
|  |                 return ( | ||||||
|  |                     await portal.wait_for_result(), | ||||||
|  |                     ctx_res, | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |         except ContextCancelled as ctxe: | ||||||
|  |             canceller: tuple[str, str] = ctxe.canceller | ||||||
|  |             our_uid: tuple[str, str] = current_actor().uid | ||||||
|  |             if ( | ||||||
|  |                 canceller != portal.chan.uid | ||||||
|  |                 and | ||||||
|  |                 canceller != our_uid | ||||||
|  |             ): | ||||||
|  |                 log.cancel( | ||||||
|  |                     f'Actor-service `{name}` was remotely cancelled by a peer?\n' | ||||||
|  | 
 | ||||||
|  |                     # TODO: this would be a good spot to use | ||||||
|  |                     # a respawn feature Bo | ||||||
|  |                     f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' | ||||||
|  | 
 | ||||||
|  |                     f'cancellee: {portal.chan.uid}\n' | ||||||
|  |                     f'canceller: {canceller}\n' | ||||||
|  |                 ) | ||||||
|  |             else: | ||||||
|  |                 raise | ||||||
|  | 
 | ||||||
|  |         finally: | ||||||
|  |             # NOTE: the ctx MUST be cancelled first if we | ||||||
|  |             # don't want the above `ctx.wait_for_result()` to | ||||||
|  |             # raise a self-ctxc. WHY, well since from the ctx's | ||||||
|  |             # perspective the cancel request will have | ||||||
|  |             # arrived out-out-of-band at the `Actor.cancel()` | ||||||
|  |             # level, thus `Context.cancel_called == False`, | ||||||
|  |             # meaning `ctx._is_self_cancelled() == False`. | ||||||
|  |             # with trio.CancelScope(shield=True): | ||||||
|  |             # await ctx.cancel() | ||||||
|  |             await portal.cancel_actor()  # terminate (remote) sub-actor | ||||||
|  |             complete.set()  # signal caller this task is done | ||||||
|  |             serman.service_ctxs.pop(name)  # remove mngr entry | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| # TODO: we need remote wrapping and a general soln: | # TODO: we need remote wrapping and a general soln: | ||||||
| # - factor this into a ``tractor.highlevel`` extension # pack for the | # - factor this into a ``tractor.highlevel`` extension # pack for the | ||||||
| #   library. | #   library. | ||||||
|  | @ -252,6 +354,14 @@ class ServiceMngr: | ||||||
|     debug_mode: bool = False # tractor sub-actor debug mode flag |     debug_mode: bool = False # tractor sub-actor debug mode flag | ||||||
| 
 | 
 | ||||||
|     service_tasks: dict[ |     service_tasks: dict[ | ||||||
|  |         str, | ||||||
|  |         tuple[ | ||||||
|  |             trio.CancelScope, | ||||||
|  |             trio.Event, | ||||||
|  |         ] | ||||||
|  |     ] = field(default_factory=dict) | ||||||
|  | 
 | ||||||
|  |     service_ctxs: dict[ | ||||||
|         str, |         str, | ||||||
|         tuple[ |         tuple[ | ||||||
|             trio.CancelScope, |             trio.CancelScope, | ||||||
|  | @ -319,8 +429,6 @@ class ServiceMngr: | ||||||
|         # retstart if needed. |         # retstart if needed. | ||||||
|         self.service_tasks[name] = ( |         self.service_tasks[name] = ( | ||||||
|             cs, |             cs, | ||||||
|             None, |  | ||||||
|             None, |  | ||||||
|             complete, |             complete, | ||||||
|         ) |         ) | ||||||
|         return ( |         return ( | ||||||
|  | @ -328,13 +436,33 @@ class ServiceMngr: | ||||||
|             complete, |             complete, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |     async def cancel_service_task( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  | 
 | ||||||
|  |     ) -> Any: | ||||||
|  |         log.info(f'Cancelling `pikerd` service {name}') | ||||||
|  |         cs, complete = self.service_tasks[name] | ||||||
|  | 
 | ||||||
|  |         cs.cancel() | ||||||
|  |         await complete.wait() | ||||||
|  |         # TODO, if we use the `TaskMngr` from #346 | ||||||
|  |         # we can also get the return value from the task! | ||||||
|  | 
 | ||||||
|  |         if name in self.service_tasks: | ||||||
|  |             # TODO: custom err? | ||||||
|  |             # raise ServiceError( | ||||||
|  |             raise RuntimeError( | ||||||
|  |                 f'Service task {name!r} not terminated!?\n' | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|     async def start_service_ctx( |     async def start_service_ctx( | ||||||
|         self, |         self, | ||||||
|         name: str, |         name: str, | ||||||
|         portal: Portal, |         portal: Portal, | ||||||
|         # TODO: typevar for the return type of the target and then |         # TODO: typevar for the return type of the target and then | ||||||
|         # use it below for `ctx_res`? |         # use it below for `ctx_res`? | ||||||
|         target: Callable, |         ctx_fn: Callable, | ||||||
|         **ctx_kwargs, |         **ctx_kwargs, | ||||||
| 
 | 
 | ||||||
|     ) -> tuple[ |     ) -> tuple[ | ||||||
|  | @ -342,150 +470,41 @@ class ServiceMngr: | ||||||
|         Context, |         Context, | ||||||
|         Any, |         Any, | ||||||
|     ]: |     ]: | ||||||
|         cs, sub_ctx, complete, started = await self.service_n.start( |         ''' | ||||||
|  |         Start a remote IPC-context defined by `ctx_fn` in a background | ||||||
|  |         task and immediately return supervision primitives to manage it: | ||||||
|  | 
 | ||||||
|  |         - a `cs: CancelScope` for the newly allocated bg task | ||||||
|  |         - the `ipc_ctx: Context` to manage the remotely scheduled | ||||||
|  |           `trio.Task`. | ||||||
|  |         - the `started: Any` value returned by the remote endpoint | ||||||
|  |           task's `Context.started(<value>)` call. | ||||||
|  | 
 | ||||||
|  |         The bg task supervises the ctx such that when it terminates the supporting | ||||||
|  |         actor runtime is also cancelled, see `_open_and_supervise_service_ctx()` | ||||||
|  |         for details. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         cs, ipc_ctx, complete, started = await self.service_n.start( | ||||||
|             functools.partial( |             functools.partial( | ||||||
|                 self._open_and_supervise_service_ctx, |                 _open_and_supervise_service_ctx, | ||||||
|  |                 serman=self, | ||||||
|                 name=name, |                 name=name, | ||||||
|                 target=target, |                 ctx_fn=ctx_fn, | ||||||
|                 portal=portal, |                 portal=portal, | ||||||
|                 **ctx_kwargs, |                 **ctx_kwargs, | ||||||
| 
 |  | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         # store the cancel scope and portal for later cancellation or |         # store the cancel scope and portal for later cancellation or | ||||||
|         # retstart if needed. |         # retstart if needed. | ||||||
|         self.service_tasks[name] = (cs, sub_ctx, portal, complete) |         self.service_ctxs[name] = (cs, ipc_ctx, portal, complete) | ||||||
|         return ( |         return ( | ||||||
|             cs, |             cs, | ||||||
|             sub_ctx, |             ipc_ctx, | ||||||
|             started, |             started, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     async def _open_and_supervise_service_ctx( |  | ||||||
|         self, |  | ||||||
|         name: str, |  | ||||||
|         target: Callable,  # TODO, type for `@tractor.context` requirement |  | ||||||
|         portal: Portal, |  | ||||||
| 
 |  | ||||||
|         allow_overruns: bool = False, |  | ||||||
|         task_status: TaskStatus[ |  | ||||||
|             tuple[ |  | ||||||
|                 trio.CancelScope, |  | ||||||
|                 Context, |  | ||||||
|                 trio.Event, |  | ||||||
|                 Any, |  | ||||||
|             ] |  | ||||||
|         ] = trio.TASK_STATUS_IGNORED, |  | ||||||
|         **ctx_kwargs, |  | ||||||
| 
 |  | ||||||
|     ) -> Any: |  | ||||||
|         ''' |  | ||||||
|         Open a context in a service sub-actor, add to a stack |  | ||||||
|         that gets unwound at ``pikerd`` teardown. |  | ||||||
| 
 |  | ||||||
|         This allows for allocating long-running sub-services in our main |  | ||||||
|         daemon and explicitly controlling their lifetimes. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         # TODO: use the ctx._scope directly here instead? |  | ||||||
|         # -[ ] actually what semantics do we expect for this |  | ||||||
|         #   usage!? |  | ||||||
|         with trio.CancelScope() as cs: |  | ||||||
|             try: |  | ||||||
|                 async with portal.open_context( |  | ||||||
|                     target, |  | ||||||
|                     allow_overruns=allow_overruns, |  | ||||||
|                     **ctx_kwargs, |  | ||||||
| 
 |  | ||||||
|                 ) as (ctx, started): |  | ||||||
| 
 |  | ||||||
|                     # unblock once the remote context has started |  | ||||||
|                     complete = trio.Event() |  | ||||||
|                     task_status.started(( |  | ||||||
|                         cs, |  | ||||||
|                         ctx, |  | ||||||
|                         complete, |  | ||||||
|                         started, |  | ||||||
|                     )) |  | ||||||
|                     log.info( |  | ||||||
|                         f'`pikerd` service {name} started with value {started}' |  | ||||||
|                     ) |  | ||||||
|                     # wait on any context's return value |  | ||||||
|                     # and any final portal result from the |  | ||||||
|                     # sub-actor. |  | ||||||
|                     ctx_res: Any = await ctx.wait_for_result() |  | ||||||
| 
 |  | ||||||
|                     # NOTE: blocks indefinitely until cancelled |  | ||||||
|                     # either by error from the target context |  | ||||||
|                     # function or by being cancelled here by the |  | ||||||
|                     # surrounding cancel scope. |  | ||||||
|                     return ( |  | ||||||
|                         await portal.wait_for_result(), |  | ||||||
|                         ctx_res, |  | ||||||
|                     ) |  | ||||||
| 
 |  | ||||||
|             except ContextCancelled as ctxe: |  | ||||||
|                 canceller: tuple[str, str] = ctxe.canceller |  | ||||||
|                 our_uid: tuple[str, str] = current_actor().uid |  | ||||||
|                 if ( |  | ||||||
|                     canceller != portal.chan.uid |  | ||||||
|                     and |  | ||||||
|                     canceller != our_uid |  | ||||||
|                 ): |  | ||||||
|                     log.cancel( |  | ||||||
|                         f'Actor-service `{name}` was remotely cancelled by a peer?\n' |  | ||||||
| 
 |  | ||||||
|                         # TODO: this would be a good spot to use |  | ||||||
|                         # a respawn feature Bo |  | ||||||
|                         f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' |  | ||||||
| 
 |  | ||||||
|                         f'cancellee: {portal.chan.uid}\n' |  | ||||||
|                         f'canceller: {canceller}\n' |  | ||||||
|                     ) |  | ||||||
|                 else: |  | ||||||
|                     raise |  | ||||||
| 
 |  | ||||||
|             finally: |  | ||||||
|                 # NOTE: the ctx MUST be cancelled first if we |  | ||||||
|                 # don't want the above `ctx.wait_for_result()` to |  | ||||||
|                 # raise a self-ctxc. WHY, well since from the ctx's |  | ||||||
|                 # perspective the cancel request will have |  | ||||||
|                 # arrived out-out-of-band at the `Actor.cancel()` |  | ||||||
|                 # level, thus `Context.cancel_called == False`, |  | ||||||
|                 # meaning `ctx._is_self_cancelled() == False`. |  | ||||||
|                 # with trio.CancelScope(shield=True): |  | ||||||
|                 # await ctx.cancel() |  | ||||||
|                 await portal.cancel_actor()  # terminate (remote) sub-actor |  | ||||||
|                 complete.set()  # signal caller this task is done |  | ||||||
|                 self.service_tasks.pop(name)  # remove mngr entry |  | ||||||
| 
 |  | ||||||
|     async def cancel_service( |  | ||||||
|         self, |  | ||||||
|         name: str, |  | ||||||
| 
 |  | ||||||
|     ) -> Any: |  | ||||||
|         ''' |  | ||||||
|         Cancel the service task and actor for the given ``name``. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         log.info(f'Cancelling `pikerd` service {name}') |  | ||||||
|         cs, sub_ctx, portal, complete = self.service_tasks[name] |  | ||||||
| 
 |  | ||||||
|         # cs.cancel() |  | ||||||
|         await sub_ctx.cancel() |  | ||||||
|         await complete.wait() |  | ||||||
| 
 |  | ||||||
|         if name in self.service_tasks: |  | ||||||
|             # TODO: custom err? |  | ||||||
|             # raise ServiceError( |  | ||||||
|             raise RuntimeError( |  | ||||||
|                 f'Service actor for {name} not terminated and/or unknown?' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|         # assert name not in self.service_tasks, \ |  | ||||||
|         #     f'Serice task for {name} not terminated?' |  | ||||||
| 
 |  | ||||||
|     async def start_service( |     async def start_service( | ||||||
|         self, |         self, | ||||||
|         daemon_name: str, |         daemon_name: str, | ||||||
|  | @ -493,23 +512,36 @@ class ServiceMngr: | ||||||
|         # ^TODO, type for `@tractor.context` deco-ed funcs! |         # ^TODO, type for `@tractor.context` deco-ed funcs! | ||||||
| 
 | 
 | ||||||
|         debug_mode: bool = False, |         debug_mode: bool = False, | ||||||
|         **tractor_actor_kwargs, |         **start_actor_kwargs, | ||||||
| 
 | 
 | ||||||
|     ) -> Context: |     ) -> Context: | ||||||
|         ''' |         ''' | ||||||
|         Start a "service" task in a (new) sub-actor and manage its |         Start new subactor and schedule a supervising "service task" | ||||||
|         lifetime indefinitely until termination. |         in it which explicitly defines the sub's lifetime. | ||||||
| 
 | 
 | ||||||
|         Service actors can be cancelled (and thus shutdown) using |         "Service daemon subactors" are cancelled (and thus | ||||||
|         `.cancel_service()`. |         terminated) using the paired `.cancel_service()`. | ||||||
|  | 
 | ||||||
|  |         Effectively this API can be used to manage "service daemons" | ||||||
|  |         spawned under a single parent actor with supervision | ||||||
|  |         semantics equivalent to a one-cancels-one style actor-nursery | ||||||
|  |         or "(subactor) task manager" where each subprocess's (and | ||||||
|  |         thus its embedded actor runtime) lifetime is synced to that | ||||||
|  |         of the remotely spawned task defined by `ctx_ep`. | ||||||
|  | 
 | ||||||
|  |         The funcionality can be likened to a "daemonized" version of | ||||||
|  |         `.hilevel.worker.run_in_actor()` but with supervision | ||||||
|  |         controls offered by `tractor.Context` where the main/root | ||||||
|  |         remotely scheduled `trio.Task` invoking `ctx_ep` determines | ||||||
|  |         the underlying subactor's lifetime. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         entry: tuple|None = self.service_tasks.get(daemon_name) |         entry: tuple|None = self.service_ctxs.get(daemon_name) | ||||||
|         if entry: |         if entry: | ||||||
|             (cs, sub_ctx, portal, complete) = entry |             (cs, sub_ctx, portal, complete) = entry | ||||||
|             return sub_ctx |             return sub_ctx | ||||||
| 
 | 
 | ||||||
|         if daemon_name not in self.service_tasks: |         if daemon_name not in self.service_ctxs: | ||||||
|             portal: Portal = await self.actor_n.start_actor( |             portal: Portal = await self.actor_n.start_actor( | ||||||
|                 daemon_name, |                 daemon_name, | ||||||
|                 debug_mode=(  # maybe set globally during allocate |                 debug_mode=(  # maybe set globally during allocate | ||||||
|  | @ -517,7 +549,7 @@ class ServiceMngr: | ||||||
|                     or |                     or | ||||||
|                     self.debug_mode |                     self.debug_mode | ||||||
|                 ), |                 ), | ||||||
|                 **tractor_actor_kwargs, |                 **start_actor_kwargs, | ||||||
|             ) |             ) | ||||||
|             ctx_kwargs: dict[str, Any] = {} |             ctx_kwargs: dict[str, Any] = {} | ||||||
|             if isinstance(ctx_ep, functools.partial): |             if isinstance(ctx_ep, functools.partial): | ||||||
|  | @ -531,8 +563,34 @@ class ServiceMngr: | ||||||
|             ) = await self.start_service_ctx( |             ) = await self.start_service_ctx( | ||||||
|                 name=daemon_name, |                 name=daemon_name, | ||||||
|                 portal=portal, |                 portal=portal, | ||||||
|                 target=ctx_ep, |                 ctx_fn=ctx_ep, | ||||||
|                 **ctx_kwargs, |                 **ctx_kwargs, | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             return sub_ctx |             return sub_ctx | ||||||
|  | 
 | ||||||
|  |     async def cancel_service( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  | 
 | ||||||
|  |     ) -> Any: | ||||||
|  |         ''' | ||||||
|  |         Cancel the service task and actor for the given ``name``. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         log.info(f'Cancelling `pikerd` service {name}') | ||||||
|  |         cs, sub_ctx, portal, complete = self.service_ctxs[name] | ||||||
|  | 
 | ||||||
|  |         # cs.cancel() | ||||||
|  |         await sub_ctx.cancel() | ||||||
|  |         await complete.wait() | ||||||
|  | 
 | ||||||
|  |         if name in self.service_ctxs: | ||||||
|  |             # TODO: custom err? | ||||||
|  |             # raise ServiceError( | ||||||
|  |             raise RuntimeError( | ||||||
|  |                 f'Service actor for {name} not terminated and/or unknown?' | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         # assert name not in self.service_ctxs, \ | ||||||
|  |         #     f'Serice task for {name} not terminated?' | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue