Mv over `ServiceMngr` from `piker` with mods
Namely distinguishing service "IPC contexts" (opened in a
subactor via a `Portal`) from just local `trio.Task`s started
and managed under the `.service_n` (more or less wrapping in the
interface of a "task-manager" style nursery - aka a one-cancels-one
supervision start).
API changes from original (`piker`) impl,
- mk `.start_service_task()` do ONLY that, start a task with a wrapping
  cancel-scope and completion event.
  |_ ideally this gets factored-out/re-implemented using the
    task-manager/OCO-style-nursery from GH #363.
- change what was the impl of `.start_service_task()` to `.start_service_ctx()`
  since it more explicitly defines the functionality of entering
  `Portal.open_context()` with a wrapping cs and completion event inside
  a bg task (which syncs the ctx's lifetime with termination of the
  remote actor runtime).
- factor out what was a `.start_service_ctx()` closure to a new
  `_open_and_supervise_service_ctx()` mod-func holding the meat of
  the supervision logic.
`ServiceMngr` API brief,
- use `open_service_mngr()` and `get_service_mngr()` to acquire the
  actor-global singleton.
- `ServiceMngr.start_service()` and `.cancel_service()` which allow for
  straight forward mgmt of "service subactor daemons".
			
			
				hilevel_serman
			
			
		
							parent
							
								
									885137ac19
								
							
						
					
					
						commit
						2afb624c48
					
				|  | @ -0,0 +1,26 @@ | ||||||
|  | # tractor: structured concurrent "actors". | ||||||
|  | # Copyright 2024-eternity Tyler Goodlet. | ||||||
|  | 
 | ||||||
|  | # This program is free software: you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Affero General Public License as published by | ||||||
|  | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | 
 | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Affero General Public License for more details. | ||||||
|  | 
 | ||||||
|  | # You should have received a copy of the GNU Affero General Public License | ||||||
|  | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | High level design patterns, APIs and runtime extensions built on top | ||||||
|  | of the `tractor` runtime core. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | from ._service import ( | ||||||
|  |     open_service_mngr as open_service_mngr, | ||||||
|  |     get_service_mngr as get_service_mngr, | ||||||
|  |     ServiceMngr as ServiceMngr, | ||||||
|  | ) | ||||||
|  | @ -21,18 +21,26 @@ and API. | ||||||
| ''' | ''' | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
| from contextlib import ( | from contextlib import ( | ||||||
|     # asynccontextmanager as acm, |     asynccontextmanager as acm, | ||||||
|     contextmanager as cm, |     # contextmanager as cm, | ||||||
| ) | ) | ||||||
| from collections import defaultdict | from collections import defaultdict | ||||||
|  | from dataclasses import ( | ||||||
|  |     dataclass, | ||||||
|  |     field, | ||||||
|  | ) | ||||||
|  | import functools | ||||||
|  | import inspect | ||||||
| from typing import ( | from typing import ( | ||||||
|     Callable, |     Callable, | ||||||
|     Any, |     Any, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | import tractor | ||||||
| import trio | import trio | ||||||
| from trio import TaskStatus | from trio import TaskStatus | ||||||
| from tractor import ( | from tractor import ( | ||||||
|  |     log, | ||||||
|     ActorNursery, |     ActorNursery, | ||||||
|     current_actor, |     current_actor, | ||||||
|     ContextCancelled, |     ContextCancelled, | ||||||
|  | @ -40,9 +48,7 @@ from tractor import ( | ||||||
|     Portal, |     Portal, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| from ._util import ( | log = log.get_logger('tractor') | ||||||
|     log,  # sub-sys logger |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # TODO: implement a `@singleton` deco-API for wrapping the below | # TODO: implement a `@singleton` deco-API for wrapping the below | ||||||
|  | @ -93,11 +99,30 @@ from ._util import ( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # TODO: implement a singleton deco-API for wrapping the below | ||||||
|  | # factory's impl for general actor-singleton use? | ||||||
|  | # | ||||||
|  | # @singleton | ||||||
|  | # async def open_service_mngr( | ||||||
|  | #     **init_kwargs, | ||||||
|  | # ) -> ServiceMngr: | ||||||
|  | #     ''' | ||||||
|  | #     Note this function body is invoke IFF no existing singleton instance already | ||||||
|  | #     exists in this proc's memory. | ||||||
|  | 
 | ||||||
|  | #     ''' | ||||||
|  | #     # setup | ||||||
|  | #     yield ServiceMngr(**init_kwargs) | ||||||
|  | #     # teardown | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| # TODO: singleton factory API instead of a class API | # TODO: singleton factory API instead of a class API | ||||||
| @cm | @acm | ||||||
| def open_service_mngr( | async def open_service_mngr( | ||||||
|     *, |     *, | ||||||
|     _singleton: list[ServiceMngr|None] = [None], |     debug_mode: bool = False, | ||||||
|  | 
 | ||||||
|     # NOTE; since default values for keyword-args are effectively |     # NOTE; since default values for keyword-args are effectively | ||||||
|     # module-vars/globals as per the note from, |     # module-vars/globals as per the note from, | ||||||
|     # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values |     # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values | ||||||
|  | @ -106,27 +131,408 @@ def open_service_mngr( | ||||||
|     #   a difference when the default is a mutable object such as |     #   a difference when the default is a mutable object such as | ||||||
|     #   a list, dictionary, or instances of most classes" |     #   a list, dictionary, or instances of most classes" | ||||||
|     # |     # | ||||||
|  |     _singleton: list[ServiceMngr|None] = [None], | ||||||
|     **init_kwargs, |     **init_kwargs, | ||||||
| 
 | 
 | ||||||
| ) -> ServiceMngr: | ) -> ServiceMngr: | ||||||
|     ''' |     ''' | ||||||
|     Open a multi-subactor-as-service-daemon tree supervisor. |     Open an actor-global "service-manager" for supervising a tree | ||||||
|  |     of subactors and/or actor-global tasks. | ||||||
| 
 | 
 | ||||||
|     The delivered `ServiceMngr` is a singleton instance for each |     The delivered `ServiceMngr` is singleton instance for each | ||||||
|     actor-process and is allocated on first open and never |     actor-process, that is, allocated on first open and never | ||||||
|     de-allocated unless explicitly deleted by al call to |     de-allocated unless explicitly deleted by al call to | ||||||
|     `del_service_mngr()`. |     `del_service_mngr()`. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|  |     # TODO: factor this an allocation into | ||||||
|  |     # a `._mngr.open_service_mngr()` and put in the | ||||||
|  |     # once-n-only-once setup/`.__aenter__()` part! | ||||||
|  |     # -[ ] how to make this only happen on the `mngr == None` case? | ||||||
|  |     #  |_ use `.trionics.maybe_open_context()` (for generic | ||||||
|  |     #     async-with-style-only-once of the factory impl, though | ||||||
|  |     #     what do we do for the allocation case? | ||||||
|  |     #    / `.maybe_open_nursery()` (since for this specific case | ||||||
|  |     #    it's simpler?) to activate | ||||||
|  |     async with ( | ||||||
|  |         tractor.open_nursery() as an, | ||||||
|  |         trio.open_nursery() as tn, | ||||||
|  |     ): | ||||||
|  |         # impl specific obvi.. | ||||||
|  |         init_kwargs.update({ | ||||||
|  |             'actor_n': an, | ||||||
|  |             'service_n': tn, | ||||||
|  |         }) | ||||||
|  | 
 | ||||||
|         mngr: ServiceMngr|None |         mngr: ServiceMngr|None | ||||||
|         if (mngr := _singleton[0]) is None: |         if (mngr := _singleton[0]) is None: | ||||||
|  | 
 | ||||||
|             log.info('Allocating a new service mngr!') |             log.info('Allocating a new service mngr!') | ||||||
|             mngr = _singleton[0] = ServiceMngr(**init_kwargs) |             mngr = _singleton[0] = ServiceMngr(**init_kwargs) | ||||||
|  | 
 | ||||||
|  |             # TODO: put into `.__aenter__()` section of | ||||||
|  |             # eventual `@singleton_acm` API wrapper. | ||||||
|  |             # | ||||||
|  |             # assign globally for future daemon/task creation | ||||||
|  |             mngr.actor_n = an | ||||||
|  |             mngr.service_n = tn | ||||||
|  | 
 | ||||||
|         else: |         else: | ||||||
|  |             assert ( | ||||||
|  |                 mngr.actor_n | ||||||
|  |                 and | ||||||
|  |                 mngr.service_tn | ||||||
|  |             ) | ||||||
|             log.info( |             log.info( | ||||||
|                 'Using extant service mngr!\n\n' |                 'Using extant service mngr!\n\n' | ||||||
|                 f'{mngr!r}\n'  # it has a nice `.__repr__()` of services state |                 f'{mngr!r}\n'  # it has a nice `.__repr__()` of services state | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|     with mngr: |         try: | ||||||
|  |             # NOTE: this is a singleton factory impl specific detail | ||||||
|  |             # which should be supported in the condensed | ||||||
|  |             # `@singleton_acm` API? | ||||||
|  |             mngr.debug_mode = debug_mode | ||||||
|  | 
 | ||||||
|             yield mngr |             yield mngr | ||||||
|  |         finally: | ||||||
|  |             # TODO: is this more clever/efficient? | ||||||
|  |             # if 'samplerd' in mngr.service_tasks: | ||||||
|  |             #     await mngr.cancel_service('samplerd') | ||||||
|  |             tn.cancel_scope.cancel() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def get_service_mngr() -> ServiceMngr: | ||||||
|  |     ''' | ||||||
|  |     Try to get the singleton service-mngr for this actor presuming it | ||||||
|  |     has already been allocated using, | ||||||
|  | 
 | ||||||
|  |     .. code:: python | ||||||
|  | 
 | ||||||
|  |         async with open_<@singleton_acm(func)>() as mngr` | ||||||
|  |             ... this block kept open ... | ||||||
|  | 
 | ||||||
|  |     If not yet allocated raise a `ServiceError`. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     # https://stackoverflow.com/a/12627202 | ||||||
|  |     # https://docs.python.org/3/library/inspect.html#inspect.Signature | ||||||
|  |     maybe_mngr: ServiceMngr|None = inspect.signature( | ||||||
|  |         open_service_mngr | ||||||
|  |     ).parameters['_singleton'].default[0] | ||||||
|  | 
 | ||||||
|  |     if maybe_mngr is None: | ||||||
|  |         raise RuntimeError( | ||||||
|  |             'Someone must allocate a `ServiceMngr` using\n\n' | ||||||
|  |             '`async with open_service_mngr()` beforehand!!\n' | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     return maybe_mngr | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: we need remote wrapping and a general soln: | ||||||
|  | # - factor this into a ``tractor.highlevel`` extension # pack for the | ||||||
|  | #   library. | ||||||
|  | # - wrap a "remote api" wherein you can get a method proxy | ||||||
|  | #   to the pikerd actor for starting services remotely! | ||||||
|  | # - prolly rename this to ActorServicesNursery since it spawns | ||||||
|  | #   new actors and supervises them to completion? | ||||||
|  | @dataclass | ||||||
|  | class ServiceMngr: | ||||||
|  |     ''' | ||||||
|  |     A multi-subactor-as-service manager. | ||||||
|  | 
 | ||||||
|  |     Spawn, supervise and monitor service/daemon subactors in a SC | ||||||
|  |     process tree. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     actor_n: ActorNursery | ||||||
|  |     service_n: trio.Nursery | ||||||
|  |     debug_mode: bool = False # tractor sub-actor debug mode flag | ||||||
|  | 
 | ||||||
|  |     service_tasks: dict[ | ||||||
|  |         str, | ||||||
|  |         tuple[ | ||||||
|  |             trio.CancelScope, | ||||||
|  |             Context, | ||||||
|  |             Portal, | ||||||
|  |             trio.Event, | ||||||
|  |         ] | ||||||
|  |     ] = field(default_factory=dict) | ||||||
|  | 
 | ||||||
|  |     # internal per-service task mutexs | ||||||
|  |     _locks = defaultdict(trio.Lock) | ||||||
|  | 
 | ||||||
|  |     # TODO, unify this interface with our `TaskManager` PR! | ||||||
|  |     # | ||||||
|  |     # | ||||||
|  |     async def start_service_task( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  |         # TODO: typevar for the return type of the target and then | ||||||
|  |         # use it below for `ctx_res`? | ||||||
|  |         fn: Callable, | ||||||
|  | 
 | ||||||
|  |         allow_overruns: bool = False, | ||||||
|  |         **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  |     ) -> tuple[ | ||||||
|  |         trio.CancelScope, | ||||||
|  |         Any, | ||||||
|  |         trio.Event, | ||||||
|  |     ]: | ||||||
|  |         async def _task_manager_start( | ||||||
|  |             task_status: TaskStatus[ | ||||||
|  |                 tuple[ | ||||||
|  |                     trio.CancelScope, | ||||||
|  |                     trio.Event, | ||||||
|  |                 ] | ||||||
|  |             ] = trio.TASK_STATUS_IGNORED, | ||||||
|  |         ) -> Any: | ||||||
|  | 
 | ||||||
|  |             task_cs = trio.CancelScope() | ||||||
|  |             task_complete = trio.Event() | ||||||
|  | 
 | ||||||
|  |             with task_cs as cs: | ||||||
|  |                 task_status.started(( | ||||||
|  |                     cs, | ||||||
|  |                     task_complete, | ||||||
|  |                 )) | ||||||
|  |                 try: | ||||||
|  |                     await fn() | ||||||
|  |                 except trio.Cancelled as taskc: | ||||||
|  |                     log.cancel( | ||||||
|  |                         f'Service task for `{name}` was cancelled!\n' | ||||||
|  |                         # TODO: this would be a good spot to use | ||||||
|  |                         # a respawn feature Bo | ||||||
|  |                     ) | ||||||
|  |                     raise taskc | ||||||
|  |                 finally: | ||||||
|  |                     task_complete.set() | ||||||
|  |         ( | ||||||
|  |             cs, | ||||||
|  |             complete, | ||||||
|  |         ) = await self.service_n.start(_task_manager_start) | ||||||
|  | 
 | ||||||
|  |         # store the cancel scope and portal for later cancellation or | ||||||
|  |         # retstart if needed. | ||||||
|  |         self.service_tasks[name] = ( | ||||||
|  |             cs, | ||||||
|  |             None, | ||||||
|  |             None, | ||||||
|  |             complete, | ||||||
|  |         ) | ||||||
|  |         return ( | ||||||
|  |             cs, | ||||||
|  |             complete, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     async def start_service_ctx( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  |         portal: Portal, | ||||||
|  |         # TODO: typevar for the return type of the target and then | ||||||
|  |         # use it below for `ctx_res`? | ||||||
|  |         target: Callable, | ||||||
|  |         **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  |     ) -> tuple[ | ||||||
|  |         trio.CancelScope, | ||||||
|  |         Context, | ||||||
|  |         Any, | ||||||
|  |     ]: | ||||||
|  |         cs, sub_ctx, complete, started = await self.service_n.start( | ||||||
|  |             functools.partial( | ||||||
|  |                 self._open_and_supervise_service_ctx, | ||||||
|  |                 name=name, | ||||||
|  |                 target=target, | ||||||
|  |                 portal=portal, | ||||||
|  |                 **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  |             ) | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         # store the cancel scope and portal for later cancellation or | ||||||
|  |         # retstart if needed. | ||||||
|  |         self.service_tasks[name] = (cs, sub_ctx, portal, complete) | ||||||
|  |         return ( | ||||||
|  |             cs, | ||||||
|  |             sub_ctx, | ||||||
|  |             started, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     async def _open_and_supervise_service_ctx( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  |         target: Callable,  # TODO, type for `@tractor.context` requirement | ||||||
|  |         portal: Portal, | ||||||
|  | 
 | ||||||
|  |         allow_overruns: bool = False, | ||||||
|  |         task_status: TaskStatus[ | ||||||
|  |             tuple[ | ||||||
|  |                 trio.CancelScope, | ||||||
|  |                 Context, | ||||||
|  |                 trio.Event, | ||||||
|  |                 Any, | ||||||
|  |             ] | ||||||
|  |         ] = trio.TASK_STATUS_IGNORED, | ||||||
|  |         **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  |     ) -> Any: | ||||||
|  |         ''' | ||||||
|  |         Open a context in a service sub-actor, add to a stack | ||||||
|  |         that gets unwound at ``pikerd`` teardown. | ||||||
|  | 
 | ||||||
|  |         This allows for allocating long-running sub-services in our main | ||||||
|  |         daemon and explicitly controlling their lifetimes. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         # TODO: use the ctx._scope directly here instead? | ||||||
|  |         # -[ ] actually what semantics do we expect for this | ||||||
|  |         #   usage!? | ||||||
|  |         with trio.CancelScope() as cs: | ||||||
|  |             try: | ||||||
|  |                 async with portal.open_context( | ||||||
|  |                     target, | ||||||
|  |                     allow_overruns=allow_overruns, | ||||||
|  |                     **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  |                 ) as (ctx, started): | ||||||
|  | 
 | ||||||
|  |                     # unblock once the remote context has started | ||||||
|  |                     complete = trio.Event() | ||||||
|  |                     task_status.started(( | ||||||
|  |                         cs, | ||||||
|  |                         ctx, | ||||||
|  |                         complete, | ||||||
|  |                         started, | ||||||
|  |                     )) | ||||||
|  |                     log.info( | ||||||
|  |                         f'`pikerd` service {name} started with value {started}' | ||||||
|  |                     ) | ||||||
|  |                     # wait on any context's return value | ||||||
|  |                     # and any final portal result from the | ||||||
|  |                     # sub-actor. | ||||||
|  |                     ctx_res: Any = await ctx.wait_for_result() | ||||||
|  | 
 | ||||||
|  |                     # NOTE: blocks indefinitely until cancelled | ||||||
|  |                     # either by error from the target context | ||||||
|  |                     # function or by being cancelled here by the | ||||||
|  |                     # surrounding cancel scope. | ||||||
|  |                     return ( | ||||||
|  |                         await portal.wait_for_result(), | ||||||
|  |                         ctx_res, | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |             except ContextCancelled as ctxe: | ||||||
|  |                 canceller: tuple[str, str] = ctxe.canceller | ||||||
|  |                 our_uid: tuple[str, str] = current_actor().uid | ||||||
|  |                 if ( | ||||||
|  |                     canceller != portal.chan.uid | ||||||
|  |                     and | ||||||
|  |                     canceller != our_uid | ||||||
|  |                 ): | ||||||
|  |                     log.cancel( | ||||||
|  |                         f'Actor-service `{name}` was remotely cancelled by a peer?\n' | ||||||
|  | 
 | ||||||
|  |                         # TODO: this would be a good spot to use | ||||||
|  |                         # a respawn feature Bo | ||||||
|  |                         f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' | ||||||
|  | 
 | ||||||
|  |                         f'cancellee: {portal.chan.uid}\n' | ||||||
|  |                         f'canceller: {canceller}\n' | ||||||
|  |                     ) | ||||||
|  |                 else: | ||||||
|  |                     raise | ||||||
|  | 
 | ||||||
|  |             finally: | ||||||
|  |                 # NOTE: the ctx MUST be cancelled first if we | ||||||
|  |                 # don't want the above `ctx.wait_for_result()` to | ||||||
|  |                 # raise a self-ctxc. WHY, well since from the ctx's | ||||||
|  |                 # perspective the cancel request will have | ||||||
|  |                 # arrived out-out-of-band at the `Actor.cancel()` | ||||||
|  |                 # level, thus `Context.cancel_called == False`, | ||||||
|  |                 # meaning `ctx._is_self_cancelled() == False`. | ||||||
|  |                 # with trio.CancelScope(shield=True): | ||||||
|  |                 # await ctx.cancel() | ||||||
|  |                 await portal.cancel_actor()  # terminate (remote) sub-actor | ||||||
|  |                 complete.set()  # signal caller this task is done | ||||||
|  |                 self.service_tasks.pop(name)  # remove mngr entry | ||||||
|  | 
 | ||||||
|  |     async def cancel_service( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  | 
 | ||||||
|  |     ) -> Any: | ||||||
|  |         ''' | ||||||
|  |         Cancel the service task and actor for the given ``name``. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         log.info(f'Cancelling `pikerd` service {name}') | ||||||
|  |         cs, sub_ctx, portal, complete = self.service_tasks[name] | ||||||
|  | 
 | ||||||
|  |         # cs.cancel() | ||||||
|  |         await sub_ctx.cancel() | ||||||
|  |         await complete.wait() | ||||||
|  | 
 | ||||||
|  |         if name in self.service_tasks: | ||||||
|  |             # TODO: custom err? | ||||||
|  |             # raise ServiceError( | ||||||
|  |             raise RuntimeError( | ||||||
|  |                 f'Service actor for {name} not terminated and/or unknown?' | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         # assert name not in self.service_tasks, \ | ||||||
|  |         #     f'Serice task for {name} not terminated?' | ||||||
|  | 
 | ||||||
|  |     async def start_service( | ||||||
|  |         self, | ||||||
|  |         daemon_name: str, | ||||||
|  |         ctx_ep: Callable,  # kwargs must `partial`-ed in! | ||||||
|  |         # ^TODO, type for `@tractor.context` deco-ed funcs! | ||||||
|  | 
 | ||||||
|  |         debug_mode: bool = False, | ||||||
|  |         **tractor_actor_kwargs, | ||||||
|  | 
 | ||||||
|  |     ) -> Context: | ||||||
|  |         ''' | ||||||
|  |         Start a "service" task in a (new) sub-actor and manage its | ||||||
|  |         lifetime indefinitely until termination. | ||||||
|  | 
 | ||||||
|  |         Service actors can be cancelled (and thus shutdown) using | ||||||
|  |         `.cancel_service()`. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         entry: tuple|None = self.service_tasks.get(daemon_name) | ||||||
|  |         if entry: | ||||||
|  |             (cs, sub_ctx, portal, complete) = entry | ||||||
|  |             return sub_ctx | ||||||
|  | 
 | ||||||
|  |         if daemon_name not in self.service_tasks: | ||||||
|  |             portal: Portal = await self.actor_n.start_actor( | ||||||
|  |                 daemon_name, | ||||||
|  |                 debug_mode=(  # maybe set globally during allocate | ||||||
|  |                     debug_mode | ||||||
|  |                     or | ||||||
|  |                     self.debug_mode | ||||||
|  |                 ), | ||||||
|  |                 **tractor_actor_kwargs, | ||||||
|  |             ) | ||||||
|  |             ctx_kwargs: dict[str, Any] = {} | ||||||
|  |             if isinstance(ctx_ep, functools.partial): | ||||||
|  |                 ctx_kwargs: dict[str, Any] = ctx_ep.keywords | ||||||
|  |                 ctx_ep: Callable = ctx_ep.func | ||||||
|  | 
 | ||||||
|  |             ( | ||||||
|  |                 cs, | ||||||
|  |                 sub_ctx, | ||||||
|  |                 started, | ||||||
|  |             ) = await self.start_service_ctx( | ||||||
|  |                 name=daemon_name, | ||||||
|  |                 portal=portal, | ||||||
|  |                 target=ctx_ep, | ||||||
|  |                 **ctx_kwargs, | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             return sub_ctx | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue