Compare commits
	
		
			14 Commits 
		
	
	
		
			653f23a04c
			...
			23809b8468
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 23809b8468 | |
|  | 60427329ee | |
|  | f946041d44 | |
|  | a4339d6ac6 | |
|  | 9123fbdbfa | |
|  | e7b3254b7b | |
|  | e468f62c26 | |
|  | 6c65729c20 | |
|  | 94fbbe0b05 | |
|  | d5b54f3f5e | |
|  | fd314deecb | |
|  | dd011c0b2f | |
|  | 087aaa1c36 | |
|  | 66b7410eab | 
|  | @ -0,0 +1,26 @@ | ||||||
|  | # tractor: structured concurrent "actors". | ||||||
|  | # Copyright 2024-eternity Tyler Goodlet. | ||||||
|  | 
 | ||||||
|  | # This program is free software: you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Affero General Public License as published by | ||||||
|  | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | 
 | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Affero General Public License for more details. | ||||||
|  | 
 | ||||||
|  | # You should have received a copy of the GNU Affero General Public License | ||||||
|  | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | High level design patterns, APIs and runtime extensions built on top | ||||||
|  | of the `tractor` runtime core. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | from ._service import ( | ||||||
|  |     open_service_mngr as open_service_mngr, | ||||||
|  |     get_service_mngr as get_service_mngr, | ||||||
|  |     ServiceMngr as ServiceMngr, | ||||||
|  | ) | ||||||
|  | @ -0,0 +1,592 @@ | ||||||
|  | # tractor: structured concurrent "actors". | ||||||
|  | # Copyright 2024-eternity Tyler Goodlet. | ||||||
|  | 
 | ||||||
|  | # This program is free software: you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Affero General Public License as published by | ||||||
|  | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | 
 | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Affero General Public License for more details. | ||||||
|  | 
 | ||||||
|  | # You should have received a copy of the GNU Affero General Public License | ||||||
|  | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | Daemon subactor as service(s) management and supervision primitives | ||||||
|  | and API. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | from __future__ import annotations | ||||||
|  | from contextlib import ( | ||||||
|  |     asynccontextmanager as acm, | ||||||
|  |     # contextmanager as cm, | ||||||
|  | ) | ||||||
|  | from collections import defaultdict | ||||||
|  | from dataclasses import ( | ||||||
|  |     dataclass, | ||||||
|  |     field, | ||||||
|  | ) | ||||||
|  | import functools | ||||||
|  | import inspect | ||||||
|  | from typing import ( | ||||||
|  |     Callable, | ||||||
|  |     Any, | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | import tractor | ||||||
|  | import trio | ||||||
|  | from trio import TaskStatus | ||||||
|  | from tractor import ( | ||||||
|  |     log, | ||||||
|  |     ActorNursery, | ||||||
|  |     current_actor, | ||||||
|  |     ContextCancelled, | ||||||
|  |     Context, | ||||||
|  |     Portal, | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | log = log.get_logger('tractor') | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: implement a `@singleton` deco-API for wrapping the below | ||||||
|  | # factory's impl for general actor-singleton use? | ||||||
|  | # | ||||||
|  | # -[ ] go through the options peeps on SO did? | ||||||
|  | #  * https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python | ||||||
|  | #  * including @mikenerone's answer | ||||||
|  | #   |_https://stackoverflow.com/questions/6760685/what-is-the-best-way-of-implementing-singleton-in-python/39186313#39186313 | ||||||
|  | # | ||||||
|  | # -[ ] put it in `tractor.lowlevel._globals` ? | ||||||
|  | #  * fits with our oustanding actor-local/global feat req? | ||||||
|  | #   |_ https://github.com/goodboy/tractor/issues/55 | ||||||
|  | #  * how can it relate to the `Actor.lifetime_stack` that was | ||||||
|  | #    silently patched in? | ||||||
|  | #   |_ we could implicitly call both of these in the same | ||||||
|  | #     spot in the runtime using the lifetime stack? | ||||||
|  | #    - `open_singleton_cm().__exit__()` | ||||||
|  | #    -`del_singleton()` | ||||||
|  | #   |_ gives SC fixtue semantics to sync code oriented around | ||||||
|  | #     sub-process lifetime? | ||||||
|  | #  * what about with `trio.RunVar`? | ||||||
|  | #   |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.RunVar | ||||||
|  | #    - which we'll need for no-GIL cpython (right?) presuming | ||||||
|  | #      multiple `trio.run()` calls in process? | ||||||
|  | # | ||||||
|  | # | ||||||
|  | # @singleton | ||||||
|  | # async def open_service_mngr( | ||||||
|  | #     **init_kwargs, | ||||||
|  | # ) -> ServiceMngr: | ||||||
|  | #     ''' | ||||||
|  | #     Note this function body is invoke IFF no existing singleton instance already | ||||||
|  | #     exists in this proc's memory. | ||||||
|  | 
 | ||||||
|  | #     ''' | ||||||
|  | #     # setup | ||||||
|  | #     yield ServiceMngr(**init_kwargs) | ||||||
|  | #     # teardown | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # a deletion API for explicit instance de-allocation? | ||||||
|  | # @open_service_mngr.deleter | ||||||
|  | # def del_service_mngr() -> None: | ||||||
|  | #     mngr = open_service_mngr._singleton[0] | ||||||
|  | #     open_service_mngr._singleton[0] = None | ||||||
|  | #     del mngr | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: implement a singleton deco-API for wrapping the below | ||||||
|  | # factory's impl for general actor-singleton use? | ||||||
|  | # | ||||||
|  | # @singleton | ||||||
|  | # async def open_service_mngr( | ||||||
|  | #     **init_kwargs, | ||||||
|  | # ) -> ServiceMngr: | ||||||
|  | #     ''' | ||||||
|  | #     Note this function body is invoke IFF no existing singleton instance already | ||||||
|  | #     exists in this proc's memory. | ||||||
|  | 
 | ||||||
|  | #     ''' | ||||||
|  | #     # setup | ||||||
|  | #     yield ServiceMngr(**init_kwargs) | ||||||
|  | #     # teardown | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: singleton factory API instead of a class API | ||||||
|  | @acm | ||||||
|  | async def open_service_mngr( | ||||||
|  |     *, | ||||||
|  |     debug_mode: bool = False, | ||||||
|  | 
 | ||||||
|  |     # NOTE; since default values for keyword-args are effectively | ||||||
|  |     # module-vars/globals as per the note from, | ||||||
|  |     # https://docs.python.org/3/tutorial/controlflow.html#default-argument-values | ||||||
|  |     # | ||||||
|  |     # > "The default value is evaluated only once. This makes | ||||||
|  |     #   a difference when the default is a mutable object such as | ||||||
|  |     #   a list, dictionary, or instances of most classes" | ||||||
|  |     # | ||||||
|  |     _singleton: list[ServiceMngr|None] = [None], | ||||||
|  |     **init_kwargs, | ||||||
|  | 
 | ||||||
|  | ) -> ServiceMngr: | ||||||
|  |     ''' | ||||||
|  |     Open an actor-global "service-manager" for supervising a tree | ||||||
|  |     of subactors and/or actor-global tasks. | ||||||
|  | 
 | ||||||
|  |     The delivered `ServiceMngr` is singleton instance for each | ||||||
|  |     actor-process, that is, allocated on first open and never | ||||||
|  |     de-allocated unless explicitly deleted by al call to | ||||||
|  |     `del_service_mngr()`. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     # TODO: factor this an allocation into | ||||||
|  |     # a `._mngr.open_service_mngr()` and put in the | ||||||
|  |     # once-n-only-once setup/`.__aenter__()` part! | ||||||
|  |     # -[ ] how to make this only happen on the `mngr == None` case? | ||||||
|  |     #  |_ use `.trionics.maybe_open_context()` (for generic | ||||||
|  |     #     async-with-style-only-once of the factory impl, though | ||||||
|  |     #     what do we do for the allocation case? | ||||||
|  |     #    / `.maybe_open_nursery()` (since for this specific case | ||||||
|  |     #    it's simpler?) to activate | ||||||
|  |     async with ( | ||||||
|  |         tractor.open_nursery() as an, | ||||||
|  |         trio.open_nursery() as tn, | ||||||
|  |     ): | ||||||
|  |         # impl specific obvi.. | ||||||
|  |         init_kwargs.update({ | ||||||
|  |             'an': an, | ||||||
|  |             'tn': tn, | ||||||
|  |         }) | ||||||
|  | 
 | ||||||
|  |         mngr: ServiceMngr|None | ||||||
|  |         if (mngr := _singleton[0]) is None: | ||||||
|  | 
 | ||||||
|  |             log.info('Allocating a new service mngr!') | ||||||
|  |             mngr = _singleton[0] = ServiceMngr(**init_kwargs) | ||||||
|  | 
 | ||||||
|  |             # TODO: put into `.__aenter__()` section of | ||||||
|  |             # eventual `@singleton_acm` API wrapper. | ||||||
|  |             # | ||||||
|  |             # assign globally for future daemon/task creation | ||||||
|  |             mngr.an = an | ||||||
|  |             mngr.tn = tn | ||||||
|  | 
 | ||||||
|  |         else: | ||||||
|  |             assert (mngr.an and mngr.tn) | ||||||
|  |             log.info( | ||||||
|  |                 'Using extant service mngr!\n\n' | ||||||
|  |                 f'{mngr!r}\n'  # it has a nice `.__repr__()` of services state | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         try: | ||||||
|  |             # NOTE: this is a singleton factory impl specific detail | ||||||
|  |             # which should be supported in the condensed | ||||||
|  |             # `@singleton_acm` API? | ||||||
|  |             mngr.debug_mode = debug_mode | ||||||
|  | 
 | ||||||
|  |             yield mngr | ||||||
|  |         finally: | ||||||
|  |             # TODO: is this more clever/efficient? | ||||||
|  |             # if 'samplerd' in mngr.service_ctxs: | ||||||
|  |             #     await mngr.cancel_service('samplerd') | ||||||
|  |             tn.cancel_scope.cancel() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def get_service_mngr() -> ServiceMngr: | ||||||
|  |     ''' | ||||||
|  |     Try to get the singleton service-mngr for this actor presuming it | ||||||
|  |     has already been allocated using, | ||||||
|  | 
 | ||||||
|  |     .. code:: python | ||||||
|  | 
 | ||||||
|  |         async with open_<@singleton_acm(func)>() as mngr` | ||||||
|  |             ... this block kept open ... | ||||||
|  | 
 | ||||||
|  |     If not yet allocated raise a `ServiceError`. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     # https://stackoverflow.com/a/12627202 | ||||||
|  |     # https://docs.python.org/3/library/inspect.html#inspect.Signature | ||||||
|  |     maybe_mngr: ServiceMngr|None = inspect.signature( | ||||||
|  |         open_service_mngr | ||||||
|  |     ).parameters['_singleton'].default[0] | ||||||
|  | 
 | ||||||
|  |     if maybe_mngr is None: | ||||||
|  |         raise RuntimeError( | ||||||
|  |             'Someone must allocate a `ServiceMngr` using\n\n' | ||||||
|  |             '`async with open_service_mngr()` beforehand!!\n' | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     return maybe_mngr | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def _open_and_supervise_service_ctx( | ||||||
|  |     serman: ServiceMngr, | ||||||
|  |     name: str, | ||||||
|  |     ctx_fn: Callable,  # TODO, type for `@tractor.context` requirement | ||||||
|  |     portal: Portal, | ||||||
|  | 
 | ||||||
|  |     allow_overruns: bool = False, | ||||||
|  |     task_status: TaskStatus[ | ||||||
|  |         tuple[ | ||||||
|  |             trio.CancelScope, | ||||||
|  |             Context, | ||||||
|  |             trio.Event, | ||||||
|  |             Any, | ||||||
|  |         ] | ||||||
|  |     ] = trio.TASK_STATUS_IGNORED, | ||||||
|  |     **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  | ) -> Any: | ||||||
|  |     ''' | ||||||
|  |     Open a remote IPC-context defined by `ctx_fn` in the | ||||||
|  |     (service) actor accessed via `portal` and supervise the | ||||||
|  |     (local) parent task to termination at which point the remote | ||||||
|  |     actor runtime is cancelled alongside it. | ||||||
|  | 
 | ||||||
|  |     The main application is for allocating long-running | ||||||
|  |     "sub-services" in a main daemon and explicitly controlling | ||||||
|  |     their lifetimes from an actor-global singleton. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     # TODO: use the ctx._scope directly here instead? | ||||||
|  |     # -[ ] actually what semantics do we expect for this | ||||||
|  |     #   usage!? | ||||||
|  |     with trio.CancelScope() as cs: | ||||||
|  |         try: | ||||||
|  |             async with portal.open_context( | ||||||
|  |                 ctx_fn, | ||||||
|  |                 allow_overruns=allow_overruns, | ||||||
|  |                 **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  |             ) as (ctx, started): | ||||||
|  | 
 | ||||||
|  |                 # unblock once the remote context has started | ||||||
|  |                 complete = trio.Event() | ||||||
|  |                 task_status.started(( | ||||||
|  |                     cs, | ||||||
|  |                     ctx, | ||||||
|  |                     complete, | ||||||
|  |                     started, | ||||||
|  |                 )) | ||||||
|  |                 log.info( | ||||||
|  |                     f'`pikerd` service {name} started with value {started}' | ||||||
|  |                 ) | ||||||
|  |                 # wait on any context's return value | ||||||
|  |                 # and any final portal result from the | ||||||
|  |                 # sub-actor. | ||||||
|  |                 ctx_res: Any = await ctx.wait_for_result() | ||||||
|  | 
 | ||||||
|  |                 # NOTE: blocks indefinitely until cancelled | ||||||
|  |                 # either by error from the target context | ||||||
|  |                 # function or by being cancelled here by the | ||||||
|  |                 # surrounding cancel scope. | ||||||
|  |                 return ( | ||||||
|  |                     await portal.wait_for_result(), | ||||||
|  |                     ctx_res, | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |         except ContextCancelled as ctxe: | ||||||
|  |             canceller: tuple[str, str] = ctxe.canceller | ||||||
|  |             our_uid: tuple[str, str] = current_actor().uid | ||||||
|  |             if ( | ||||||
|  |                 canceller != portal.chan.uid | ||||||
|  |                 and | ||||||
|  |                 canceller != our_uid | ||||||
|  |             ): | ||||||
|  |                 log.cancel( | ||||||
|  |                     f'Actor-service `{name}` was remotely cancelled by a peer?\n' | ||||||
|  | 
 | ||||||
|  |                     # TODO: this would be a good spot to use | ||||||
|  |                     # a respawn feature Bo | ||||||
|  |                     f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' | ||||||
|  | 
 | ||||||
|  |                     f'cancellee: {portal.chan.uid}\n' | ||||||
|  |                     f'canceller: {canceller}\n' | ||||||
|  |                 ) | ||||||
|  |             else: | ||||||
|  |                 raise | ||||||
|  | 
 | ||||||
|  |         finally: | ||||||
|  |             # NOTE: the ctx MUST be cancelled first if we | ||||||
|  |             # don't want the above `ctx.wait_for_result()` to | ||||||
|  |             # raise a self-ctxc. WHY, well since from the ctx's | ||||||
|  |             # perspective the cancel request will have | ||||||
|  |             # arrived out-out-of-band at the `Actor.cancel()` | ||||||
|  |             # level, thus `Context.cancel_called == False`, | ||||||
|  |             # meaning `ctx._is_self_cancelled() == False`. | ||||||
|  |             # with trio.CancelScope(shield=True): | ||||||
|  |             # await ctx.cancel() | ||||||
|  |             await portal.cancel_actor()  # terminate (remote) sub-actor | ||||||
|  |             complete.set()  # signal caller this task is done | ||||||
|  |             serman.service_ctxs.pop(name)  # remove mngr entry | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: we need remote wrapping and a general soln: | ||||||
|  | # - factor this into a ``tractor.highlevel`` extension # pack for the | ||||||
|  | #   library. | ||||||
|  | # - wrap a "remote api" wherein you can get a method proxy | ||||||
|  | #   to the pikerd actor for starting services remotely! | ||||||
|  | # - prolly rename this to ActorServicesNursery since it spawns | ||||||
|  | #   new actors and supervises them to completion? | ||||||
|  | @dataclass | ||||||
|  | class ServiceMngr: | ||||||
|  |     ''' | ||||||
|  |     A multi-subactor-as-service manager. | ||||||
|  | 
 | ||||||
|  |     Spawn, supervise and monitor service/daemon subactors in a SC | ||||||
|  |     process tree. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     an: ActorNursery | ||||||
|  |     tn: trio.Nursery | ||||||
|  |     debug_mode: bool = False # tractor sub-actor debug mode flag | ||||||
|  | 
 | ||||||
|  |     service_tasks: dict[ | ||||||
|  |         str, | ||||||
|  |         tuple[ | ||||||
|  |             trio.CancelScope, | ||||||
|  |             trio.Event, | ||||||
|  |         ] | ||||||
|  |     ] = field(default_factory=dict) | ||||||
|  | 
 | ||||||
|  |     service_ctxs: dict[ | ||||||
|  |         str, | ||||||
|  |         tuple[ | ||||||
|  |             trio.CancelScope, | ||||||
|  |             Context, | ||||||
|  |             Portal, | ||||||
|  |             trio.Event, | ||||||
|  |         ] | ||||||
|  |     ] = field(default_factory=dict) | ||||||
|  | 
 | ||||||
|  |     # internal per-service task mutexs | ||||||
|  |     _locks = defaultdict(trio.Lock) | ||||||
|  | 
 | ||||||
|  |     # TODO, unify this interface with our `TaskManager` PR! | ||||||
|  |     # | ||||||
|  |     # | ||||||
|  |     async def start_service_task( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  |         # TODO: typevar for the return type of the target and then | ||||||
|  |         # use it below for `ctx_res`? | ||||||
|  |         fn: Callable, | ||||||
|  | 
 | ||||||
|  |         allow_overruns: bool = False, | ||||||
|  |         **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  |     ) -> tuple[ | ||||||
|  |         trio.CancelScope, | ||||||
|  |         Any, | ||||||
|  |         trio.Event, | ||||||
|  |     ]: | ||||||
|  |         async def _task_manager_start( | ||||||
|  |             task_status: TaskStatus[ | ||||||
|  |                 tuple[ | ||||||
|  |                     trio.CancelScope, | ||||||
|  |                     trio.Event, | ||||||
|  |                 ] | ||||||
|  |             ] = trio.TASK_STATUS_IGNORED, | ||||||
|  |         ) -> Any: | ||||||
|  | 
 | ||||||
|  |             task_cs = trio.CancelScope() | ||||||
|  |             task_complete = trio.Event() | ||||||
|  | 
 | ||||||
|  |             with task_cs as cs: | ||||||
|  |                 task_status.started(( | ||||||
|  |                     cs, | ||||||
|  |                     task_complete, | ||||||
|  |                 )) | ||||||
|  |                 try: | ||||||
|  |                     await fn() | ||||||
|  |                 except trio.Cancelled as taskc: | ||||||
|  |                     log.cancel( | ||||||
|  |                         f'Service task for `{name}` was cancelled!\n' | ||||||
|  |                         # TODO: this would be a good spot to use | ||||||
|  |                         # a respawn feature Bo | ||||||
|  |                     ) | ||||||
|  |                     raise taskc | ||||||
|  |                 finally: | ||||||
|  |                     task_complete.set() | ||||||
|  |         ( | ||||||
|  |             cs, | ||||||
|  |             complete, | ||||||
|  |         ) = await self.tn.start(_task_manager_start) | ||||||
|  | 
 | ||||||
|  |         # store the cancel scope and portal for later cancellation or | ||||||
|  |         # retstart if needed. | ||||||
|  |         self.service_tasks[name] = ( | ||||||
|  |             cs, | ||||||
|  |             complete, | ||||||
|  |         ) | ||||||
|  |         return ( | ||||||
|  |             cs, | ||||||
|  |             complete, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     async def cancel_service_task( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  | 
 | ||||||
|  |     ) -> Any: | ||||||
|  |         log.info(f'Cancelling `pikerd` service {name}') | ||||||
|  |         cs, complete = self.service_tasks[name] | ||||||
|  | 
 | ||||||
|  |         cs.cancel() | ||||||
|  |         await complete.wait() | ||||||
|  |         # TODO, if we use the `TaskMngr` from #346 | ||||||
|  |         # we can also get the return value from the task! | ||||||
|  | 
 | ||||||
|  |         if name in self.service_tasks: | ||||||
|  |             # TODO: custom err? | ||||||
|  |             # raise ServiceError( | ||||||
|  |             raise RuntimeError( | ||||||
|  |                 f'Service task {name!r} not terminated!?\n' | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |     async def start_service_ctx( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  |         portal: Portal, | ||||||
|  |         # TODO: typevar for the return type of the target and then | ||||||
|  |         # use it below for `ctx_res`? | ||||||
|  |         ctx_fn: Callable, | ||||||
|  |         **ctx_kwargs, | ||||||
|  | 
 | ||||||
|  |     ) -> tuple[ | ||||||
|  |         trio.CancelScope, | ||||||
|  |         Context, | ||||||
|  |         Any, | ||||||
|  |     ]: | ||||||
|  |         ''' | ||||||
|  |         Start a remote IPC-context defined by `ctx_fn` in a background | ||||||
|  |         task and immediately return supervision primitives to manage it: | ||||||
|  | 
 | ||||||
|  |         - a `cs: CancelScope` for the newly allocated bg task | ||||||
|  |         - the `ipc_ctx: Context` to manage the remotely scheduled | ||||||
|  |           `trio.Task`. | ||||||
|  |         - the `started: Any` value returned by the remote endpoint | ||||||
|  |           task's `Context.started(<value>)` call. | ||||||
|  | 
 | ||||||
|  |         The bg task supervises the ctx such that when it terminates the supporting | ||||||
|  |         actor runtime is also cancelled, see `_open_and_supervise_service_ctx()` | ||||||
|  |         for details. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         cs, ipc_ctx, complete, started = await self.tn.start( | ||||||
|  |             functools.partial( | ||||||
|  |                 _open_and_supervise_service_ctx, | ||||||
|  |                 serman=self, | ||||||
|  |                 name=name, | ||||||
|  |                 ctx_fn=ctx_fn, | ||||||
|  |                 portal=portal, | ||||||
|  |                 **ctx_kwargs, | ||||||
|  |             ) | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         # store the cancel scope and portal for later cancellation or | ||||||
|  |         # retstart if needed. | ||||||
|  |         self.service_ctxs[name] = (cs, ipc_ctx, portal, complete) | ||||||
|  |         return ( | ||||||
|  |             cs, | ||||||
|  |             ipc_ctx, | ||||||
|  |             started, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     async def start_service( | ||||||
|  |         self, | ||||||
|  |         daemon_name: str, | ||||||
|  |         ctx_ep: Callable,  # kwargs must `partial`-ed in! | ||||||
|  |         # ^TODO, type for `@tractor.context` deco-ed funcs! | ||||||
|  | 
 | ||||||
|  |         debug_mode: bool = False, | ||||||
|  |         **start_actor_kwargs, | ||||||
|  | 
 | ||||||
|  |     ) -> Context: | ||||||
|  |         ''' | ||||||
|  |         Start new subactor and schedule a supervising "service task" | ||||||
|  |         in it which explicitly defines the sub's lifetime. | ||||||
|  | 
 | ||||||
|  |         "Service daemon subactors" are cancelled (and thus | ||||||
|  |         terminated) using the paired `.cancel_service()`. | ||||||
|  | 
 | ||||||
|  |         Effectively this API can be used to manage "service daemons" | ||||||
|  |         spawned under a single parent actor with supervision | ||||||
|  |         semantics equivalent to a one-cancels-one style actor-nursery | ||||||
|  |         or "(subactor) task manager" where each subprocess's (and | ||||||
|  |         thus its embedded actor runtime) lifetime is synced to that | ||||||
|  |         of the remotely spawned task defined by `ctx_ep`. | ||||||
|  | 
 | ||||||
|  |         The funcionality can be likened to a "daemonized" version of | ||||||
|  |         `.hilevel.worker.run_in_actor()` but with supervision | ||||||
|  |         controls offered by `tractor.Context` where the main/root | ||||||
|  |         remotely scheduled `trio.Task` invoking `ctx_ep` determines | ||||||
|  |         the underlying subactor's lifetime. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         entry: tuple|None = self.service_ctxs.get(daemon_name) | ||||||
|  |         if entry: | ||||||
|  |             (cs, sub_ctx, portal, complete) = entry | ||||||
|  |             return sub_ctx | ||||||
|  | 
 | ||||||
|  |         if daemon_name not in self.service_ctxs: | ||||||
|  |             portal: Portal = await self.an.start_actor( | ||||||
|  |                 daemon_name, | ||||||
|  |                 debug_mode=(  # maybe set globally during allocate | ||||||
|  |                     debug_mode | ||||||
|  |                     or | ||||||
|  |                     self.debug_mode | ||||||
|  |                 ), | ||||||
|  |                 **start_actor_kwargs, | ||||||
|  |             ) | ||||||
|  |             ctx_kwargs: dict[str, Any] = {} | ||||||
|  |             if isinstance(ctx_ep, functools.partial): | ||||||
|  |                 ctx_kwargs: dict[str, Any] = ctx_ep.keywords | ||||||
|  |                 ctx_ep: Callable = ctx_ep.func | ||||||
|  | 
 | ||||||
|  |             ( | ||||||
|  |                 cs, | ||||||
|  |                 sub_ctx, | ||||||
|  |                 started, | ||||||
|  |             ) = await self.start_service_ctx( | ||||||
|  |                 name=daemon_name, | ||||||
|  |                 portal=portal, | ||||||
|  |                 ctx_fn=ctx_ep, | ||||||
|  |                 **ctx_kwargs, | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             return sub_ctx | ||||||
|  | 
 | ||||||
|  |     async def cancel_service( | ||||||
|  |         self, | ||||||
|  |         name: str, | ||||||
|  | 
 | ||||||
|  |     ) -> Any: | ||||||
|  |         ''' | ||||||
|  |         Cancel the service task and actor for the given ``name``. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         log.info(f'Cancelling `pikerd` service {name}') | ||||||
|  |         cs, sub_ctx, portal, complete = self.service_ctxs[name] | ||||||
|  | 
 | ||||||
|  |         # cs.cancel() | ||||||
|  |         await sub_ctx.cancel() | ||||||
|  |         await complete.wait() | ||||||
|  | 
 | ||||||
|  |         if name in self.service_ctxs: | ||||||
|  |             # TODO: custom err? | ||||||
|  |             # raise ServiceError( | ||||||
|  |             raise RuntimeError( | ||||||
|  |                 f'Service actor for {name} not terminated and/or unknown?' | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         # assert name not in self.service_ctxs, \ | ||||||
|  |         #     f'Serice task for {name} not terminated?' | ||||||
|  | @ -21,7 +21,6 @@ Sugary patterns for trio + tractor designs. | ||||||
| from ._mngrs import ( | from ._mngrs import ( | ||||||
|     gather_contexts as gather_contexts, |     gather_contexts as gather_contexts, | ||||||
|     maybe_open_context as maybe_open_context, |     maybe_open_context as maybe_open_context, | ||||||
|     maybe_open_nursery as maybe_open_nursery, |  | ||||||
| ) | ) | ||||||
| from ._broadcast import ( | from ._broadcast import ( | ||||||
|     AsyncReceiver as AsyncReceiver, |     AsyncReceiver as AsyncReceiver, | ||||||
|  | @ -37,3 +36,6 @@ from ._beg import ( | ||||||
| from ._taskc import ( | from ._taskc import ( | ||||||
|     maybe_raise_from_masking_exc as maybe_raise_from_masking_exc, |     maybe_raise_from_masking_exc as maybe_raise_from_masking_exc, | ||||||
| ) | ) | ||||||
|  | from ._tn import ( | ||||||
|  |     maybe_open_nursery as maybe_open_nursery, | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | @ -22,7 +22,7 @@ https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
| from abc import abstractmethod | from abc import abstractmethod | ||||||
| from collections import deque | from collections import deque | ||||||
| from contextlib import asynccontextmanager | from contextlib import asynccontextmanager as acm | ||||||
| from functools import partial | from functools import partial | ||||||
| from operator import ne | from operator import ne | ||||||
| from typing import ( | from typing import ( | ||||||
|  | @ -398,7 +398,7 @@ class BroadcastReceiver(ReceiveChannel): | ||||||
| 
 | 
 | ||||||
|             return await self._receive_from_underlying(key, state) |             return await self._receive_from_underlying(key, state) | ||||||
| 
 | 
 | ||||||
|     @asynccontextmanager |     @acm | ||||||
|     async def subscribe( |     async def subscribe( | ||||||
|         self, |         self, | ||||||
|         raise_on_lag: bool = True, |         raise_on_lag: bool = True, | ||||||
|  |  | ||||||
|  | @ -23,7 +23,6 @@ from contextlib import ( | ||||||
|     asynccontextmanager as acm, |     asynccontextmanager as acm, | ||||||
| ) | ) | ||||||
| import inspect | import inspect | ||||||
| from types import ModuleType |  | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     AsyncContextManager, |     AsyncContextManager, | ||||||
|  | @ -34,21 +33,18 @@ from typing import ( | ||||||
|     Optional, |     Optional, | ||||||
|     Sequence, |     Sequence, | ||||||
|     TypeVar, |     TypeVar, | ||||||
|     TYPE_CHECKING, |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| import trio | import trio | ||||||
| from tractor._state import current_actor | from tractor._state import current_actor | ||||||
| from tractor.log import get_logger | from tractor.log import get_logger | ||||||
|  | from ._tn import maybe_open_nursery | ||||||
| # from ._beg import collapse_eg | # from ._beg import collapse_eg | ||||||
| # from ._taskc import ( | # from ._taskc import ( | ||||||
| #     maybe_raise_from_masking_exc, | #     maybe_raise_from_masking_exc, | ||||||
| # ) | # ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| if TYPE_CHECKING: |  | ||||||
|     from tractor import ActorNursery |  | ||||||
| 
 |  | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
|  | @ -56,30 +52,6 @@ log = get_logger(__name__) | ||||||
| T = TypeVar("T") | T = TypeVar("T") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @acm |  | ||||||
| async def maybe_open_nursery( |  | ||||||
|     nursery: trio.Nursery|ActorNursery|None = None, |  | ||||||
|     shield: bool = False, |  | ||||||
|     lib: ModuleType = trio, |  | ||||||
| 
 |  | ||||||
|     **kwargs,  # proxy thru |  | ||||||
| 
 |  | ||||||
| ) -> AsyncGenerator[trio.Nursery, Any]: |  | ||||||
|     ''' |  | ||||||
|     Create a new nursery if None provided. |  | ||||||
| 
 |  | ||||||
|     Blocks on exit as expected if no input nursery is provided. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     if nursery is not None: |  | ||||||
|         yield nursery |  | ||||||
|     else: |  | ||||||
|         async with lib.open_nursery(**kwargs) as nursery: |  | ||||||
|             if lib == trio: |  | ||||||
|                 nursery.cancel_scope.shield = shield |  | ||||||
|             yield nursery |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def _enter_and_wait( | async def _enter_and_wait( | ||||||
|     mngr: AsyncContextManager[T], |     mngr: AsyncContextManager[T], | ||||||
|     unwrapped: dict[int, T], |     unwrapped: dict[int, T], | ||||||
|  |  | ||||||
|  | @ -0,0 +1,341 @@ | ||||||
|  | # tractor: structured concurrent "actors". | ||||||
|  | # Copyright 2018-eternity Tyler Goodlet. | ||||||
|  | 
 | ||||||
|  | # This program is free software: you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Affero General Public License as published by | ||||||
|  | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | 
 | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Affero General Public License for more details. | ||||||
|  | 
 | ||||||
|  | # You should have received a copy of the GNU Affero General Public License | ||||||
|  | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | Erlang-style (ish) "one-cancels-one" nursery, what we just call | ||||||
|  | a "task manager". | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | from __future__ import annotations | ||||||
|  | from contextlib import ( | ||||||
|  |     asynccontextmanager as acm, | ||||||
|  |     # contextmanager as cm, | ||||||
|  | ) | ||||||
|  | from functools import partial | ||||||
|  | from typing import ( | ||||||
|  |     Generator, | ||||||
|  |     Any, | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | from outcome import ( | ||||||
|  |     Outcome, | ||||||
|  |     acapture, | ||||||
|  | ) | ||||||
|  | from msgspec import Struct | ||||||
|  | import trio | ||||||
|  | from trio import ( | ||||||
|  |     TaskStatus, | ||||||
|  |     CancelScope, | ||||||
|  |     Nursery, | ||||||
|  | ) | ||||||
|  | from trio.lowlevel import ( | ||||||
|  |     Task, | ||||||
|  | ) | ||||||
|  | from tractor.log import get_logger | ||||||
|  | 
 | ||||||
|  | log = get_logger(__name__) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class TaskOutcome(Struct): | ||||||
|  |     ''' | ||||||
|  |     The outcome of a scheduled ``trio`` task which includes an interface | ||||||
|  |     for synchronizing to the completion of the task's runtime and access | ||||||
|  |     to the eventual boxed result/value or raised exception. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     lowlevel_task: Task | ||||||
|  |     _exited = trio.Event()  # as per `trio.Runner.task_exited()` | ||||||
|  |     _outcome: Outcome | None = None  # as per `outcome.Outcome` | ||||||
|  |     _result: Any | None = None  # the eventual maybe-returned-value | ||||||
|  | 
 | ||||||
|  |     @property | ||||||
|  |     def result(self) -> Any: | ||||||
|  |         ''' | ||||||
|  |         Either Any or None depending on whether the Outcome has compeleted. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         if self._outcome is None: | ||||||
|  |             raise RuntimeError( | ||||||
|  |                 f'Task {self.lowlevel_task.name} is not complete.\n' | ||||||
|  |                 'First wait on `await TaskOutcome.wait_for_result()`!' | ||||||
|  |             ) | ||||||
|  |         return self._result | ||||||
|  | 
 | ||||||
|  |     def _set_outcome( | ||||||
|  |         self, | ||||||
|  |         outcome: Outcome, | ||||||
|  |     ): | ||||||
|  |         ''' | ||||||
|  |         Set the ``Outcome`` for this task. | ||||||
|  | 
 | ||||||
|  |         This method should only ever be called by the task's supervising | ||||||
|  |         nursery implemenation. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         self._outcome = outcome | ||||||
|  |         self._result = outcome.unwrap() | ||||||
|  |         self._exited.set() | ||||||
|  | 
 | ||||||
|  |     async def wait_for_result(self) -> Any: | ||||||
|  |         ''' | ||||||
|  |         Unwind the underlying task's ``Outcome`` by async waiting for | ||||||
|  |         the task to first complete and then unwrap it's result-value. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         if self._exited.is_set(): | ||||||
|  |             return self._result | ||||||
|  | 
 | ||||||
|  |         await self._exited.wait() | ||||||
|  | 
 | ||||||
|  |         out = self._outcome | ||||||
|  |         if out is None: | ||||||
|  |             raise ValueError(f'{out} is not an outcome!?') | ||||||
|  | 
 | ||||||
|  |         return self.result | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class TaskManagerNursery(Struct): | ||||||
|  |     _tn: Nursery | ||||||
|  |     _scopes: dict[ | ||||||
|  |         Task, | ||||||
|  |         tuple[CancelScope, Outcome] | ||||||
|  |     ] = {} | ||||||
|  | 
 | ||||||
|  |     task_manager: Generator[Any, Outcome, None] | None = None | ||||||
|  | 
 | ||||||
|  |     async def start_soon( | ||||||
|  |         self, | ||||||
|  |         async_fn, | ||||||
|  |         *args, | ||||||
|  | 
 | ||||||
|  |         name=None, | ||||||
|  |         task_manager: Generator[Any, Outcome, None] | None = None | ||||||
|  | 
 | ||||||
|  |     ) -> tuple[CancelScope, Task]: | ||||||
|  | 
 | ||||||
|  |         # NOTE: internals of a nursery don't let you know what | ||||||
|  |         # the most recently spawned task is by order.. so we'd | ||||||
|  |         # have to either change that or do set ops. | ||||||
|  |         # pre_start_tasks: set[Task] = n._children.copy() | ||||||
|  |         # new_tasks = n._children - pre_start_Tasks | ||||||
|  |         # assert len(new_tasks) == 1 | ||||||
|  |         # task = new_tasks.pop() | ||||||
|  | 
 | ||||||
|  |         tn: Nursery = self._tn | ||||||
|  | 
 | ||||||
|  |         sm = self.task_manager | ||||||
|  |         # we do default behavior of a scope-per-nursery | ||||||
|  |         # if the user did not provide a task manager. | ||||||
|  |         if sm is None: | ||||||
|  |             return tn.start_soon(async_fn, *args, name=None) | ||||||
|  | 
 | ||||||
|  |         # new_task: Task|None = None | ||||||
|  |         to_return: tuple[Any] | None = None | ||||||
|  | 
 | ||||||
|  |         # NOTE: what do we enforce as a signature for the | ||||||
|  |         # `@task_scope_manager` here? | ||||||
|  |         mngr = sm(nursery=tn) | ||||||
|  | 
 | ||||||
|  |         async def _start_wrapped_in_scope( | ||||||
|  |             task_status: TaskStatus[ | ||||||
|  |                 tuple[CancelScope, Task] | ||||||
|  |             ] = trio.TASK_STATUS_IGNORED, | ||||||
|  | 
 | ||||||
|  |         ) -> None: | ||||||
|  | 
 | ||||||
|  |             # TODO: this was working before?! and, do we need something | ||||||
|  |             # like it to implement `.start()`? | ||||||
|  |             # nonlocal to_return | ||||||
|  | 
 | ||||||
|  |             # execute up to the first yield | ||||||
|  |             try: | ||||||
|  |                 to_return: tuple[Any] = next(mngr) | ||||||
|  |             except StopIteration: | ||||||
|  |                 raise RuntimeError("task manager didn't yield") from None | ||||||
|  | 
 | ||||||
|  |             # TODO: how do we support `.start()` style? | ||||||
|  |             # - relay through whatever the | ||||||
|  |             #   started task passes back via `.started()` ? | ||||||
|  |             #   seems like that won't work with also returning | ||||||
|  |             #   a "task handle"? | ||||||
|  |             # - we were previously binding-out this `to_return` to | ||||||
|  |             #   the parent's lexical scope, why isn't that working | ||||||
|  |             #   now? | ||||||
|  |             task_status.started(to_return) | ||||||
|  | 
 | ||||||
|  |             # invoke underlying func now that cs is entered. | ||||||
|  |             outcome = await acapture(async_fn, *args) | ||||||
|  | 
 | ||||||
|  |             # execute from the 1st yield to return and expect | ||||||
|  |             # generator-mngr `@task_scope_manager` thinger to | ||||||
|  |             # terminate! | ||||||
|  |             try: | ||||||
|  |                 mngr.send(outcome) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  |                 # I would presume it's better to have a handle to | ||||||
|  |                 # the `Outcome` entirely? This method sends *into* | ||||||
|  |                 # the mngr this `Outcome.value`; seems like kinda | ||||||
|  |                 # weird semantics for our purposes? | ||||||
|  |                 # outcome.send(mngr) | ||||||
|  | 
 | ||||||
|  |             except StopIteration: | ||||||
|  |                 return | ||||||
|  |             else: | ||||||
|  |                 raise RuntimeError(f"{mngr} didn't stop!") | ||||||
|  | 
 | ||||||
|  |         to_return = await tn.start(_start_wrapped_in_scope) | ||||||
|  |         assert to_return is not None | ||||||
|  | 
 | ||||||
|  |         # TODO: use the fancy type-check-time type signature stuff from | ||||||
|  |         # mypy i guess..to like, relay the type of whatever the | ||||||
|  |         # generator yielded through? betcha that'll be un-grokable XD | ||||||
|  |         return to_return | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: define a decorator to runtime type check that this a generator | ||||||
|  | # with a single yield that also delivers a value (of some std type) from | ||||||
|  | # the yield expression? | ||||||
|  | # @trio.task_manager | ||||||
|  | def add_task_handle_and_crash_handling( | ||||||
|  |     nursery: Nursery, | ||||||
|  | 
 | ||||||
|  |     debug_mode: bool = False, | ||||||
|  | 
 | ||||||
|  | ) -> Generator[ | ||||||
|  |     Any, | ||||||
|  |     Outcome, | ||||||
|  |     None, | ||||||
|  | ]: | ||||||
|  |     ''' | ||||||
|  |     A customizable, user defined "task scope manager". | ||||||
|  | 
 | ||||||
|  |     With this specially crafted single-yield generator function you can | ||||||
|  |     add more granular controls around every task spawned by `trio` B) | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     # if you need it you can ask trio for the task obj | ||||||
|  |     task: Task = trio.lowlevel.current_task() | ||||||
|  |     log.info(f'Spawning task: {task.name}') | ||||||
|  | 
 | ||||||
|  |     # User defined "task handle" for more granular supervision | ||||||
|  |     # of each spawned task as needed for their particular usage. | ||||||
|  |     task_outcome = TaskOutcome(task) | ||||||
|  | 
 | ||||||
|  |     # NOTE: if wanted the user could wrap the output task handle however | ||||||
|  |     # they want! | ||||||
|  |     # class TaskHandle(Struct): | ||||||
|  |     #     task: Task | ||||||
|  |     #     cs: CancelScope | ||||||
|  |     #     outcome: TaskOutcome | ||||||
|  | 
 | ||||||
|  |     # this yields back when the task is terminated, cancelled or returns. | ||||||
|  |     try: | ||||||
|  |         with CancelScope() as cs: | ||||||
|  | 
 | ||||||
|  |             # the yielded value(s) here are what are returned to the | ||||||
|  |             # nursery's `.start_soon()` caller B) | ||||||
|  |             lowlevel_outcome: Outcome = yield (task_outcome, cs) | ||||||
|  |             task_outcome._set_outcome(lowlevel_outcome) | ||||||
|  | 
 | ||||||
|  |     # Adds "crash handling" from `pdbp` by entering | ||||||
|  |     # a REPL on std errors. | ||||||
|  |     except Exception as err: | ||||||
|  |         if debug_mode: | ||||||
|  |             log.exception( | ||||||
|  |                 f'{task.name} crashed, entering debugger!' | ||||||
|  |             ) | ||||||
|  |             import pdbp | ||||||
|  |             pdbp.xpm() | ||||||
|  | 
 | ||||||
|  |         raise err | ||||||
|  | 
 | ||||||
|  |     finally: | ||||||
|  |         log.info( | ||||||
|  |             f'Task exitted\n' | ||||||
|  |             f')>\n' | ||||||
|  |             f' |_{task}\n' | ||||||
|  |             # ^^TODO? use sclang formatter? | ||||||
|  |             # -[ ] .devx.pformat.nest_from_op()` yo! | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @acm | ||||||
|  | async def open_taskman( | ||||||
|  |     task_manager: Generator[Any, Outcome, None] | None = None, | ||||||
|  | 
 | ||||||
|  |     **lowlevel_nursery_kwargs, | ||||||
|  | ): | ||||||
|  |     async with trio.open_nursery(**lowlevel_nursery_kwargs) as nurse: | ||||||
|  |         yield TaskManagerNursery( | ||||||
|  |             nurse, | ||||||
|  |             task_manager=task_manager, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def sleep_then_return_val(val: str): | ||||||
|  |     await trio.sleep(0.2) | ||||||
|  |     return val | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def ensure_cancelled(): | ||||||
|  |     try: | ||||||
|  |         await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  |     except trio.Cancelled: | ||||||
|  |         task = trio.lowlevel.current_task() | ||||||
|  |         log.cancel(f'heyyo ONLY {task.name} was cancelled as expected B)') | ||||||
|  |         assert 0 | ||||||
|  | 
 | ||||||
|  |     except BaseException: | ||||||
|  |         raise RuntimeError("woa woa woa this ain't right!") | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  | 
 | ||||||
|  |     from tractor.log import get_console_log | ||||||
|  |     get_console_log(level='info') | ||||||
|  | 
 | ||||||
|  |     async def main(): | ||||||
|  |         async with open_taskman( | ||||||
|  |             task_manager=partial( | ||||||
|  |                 add_task_handle_and_crash_handling, | ||||||
|  |                 debug_mode=True, | ||||||
|  |             ), | ||||||
|  |         ) as tm: | ||||||
|  |             for _ in range(3): | ||||||
|  |                 outcome, _ = await tm.start_soon(trio.sleep_forever) | ||||||
|  | 
 | ||||||
|  |             # extra task we want to engage in debugger post mortem. | ||||||
|  |             err_outcome, cs = await tm.start_soon(ensure_cancelled) | ||||||
|  | 
 | ||||||
|  |             val: str = 'yoyoyo' | ||||||
|  |             val_outcome, _ = await tm.start_soon( | ||||||
|  |                 sleep_then_return_val, | ||||||
|  |                 val, | ||||||
|  |             ) | ||||||
|  |             res = await val_outcome.wait_for_result() | ||||||
|  |             assert res == val | ||||||
|  |             log.info(f'{res} -> GOT EXPECTED TASK VALUE') | ||||||
|  | 
 | ||||||
|  |             await trio.sleep(0.6) | ||||||
|  |             log.cancel( | ||||||
|  |                 f'Cancelling and waiting on {err_outcome.lowlevel_task} ' | ||||||
|  |                 'to CRASH..' | ||||||
|  |             ) | ||||||
|  |             cs.cancel() | ||||||
|  | 
 | ||||||
|  |     trio.run(main) | ||||||
|  | @ -0,0 +1,94 @@ | ||||||
|  | # tractor: structured concurrent "actors". | ||||||
|  | # Copyright 2018-eternity Tyler Goodlet. | ||||||
|  | 
 | ||||||
|  | # This program is free software: you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Affero General Public License as published by | ||||||
|  | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | 
 | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Affero General Public License for more details. | ||||||
|  | 
 | ||||||
|  | # You should have received a copy of the GNU Affero General Public License | ||||||
|  | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | `trio.Nursery` wrappers which we short-hand refer to as | ||||||
|  | `tn`: "task nursery". | ||||||
|  | 
 | ||||||
|  | (whereas we refer to `tractor.ActorNursery` as the short-hand `an`) | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | from __future__ import annotations | ||||||
|  | from contextlib import ( | ||||||
|  |     asynccontextmanager as acm, | ||||||
|  | ) | ||||||
|  | from types import ModuleType | ||||||
|  | from typing import ( | ||||||
|  |     Any, | ||||||
|  |     AsyncGenerator, | ||||||
|  |     TYPE_CHECKING, | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | import trio | ||||||
|  | from tractor.log import get_logger | ||||||
|  | 
 | ||||||
|  | # from ._beg import ( | ||||||
|  | #     collapse_eg, | ||||||
|  | # ) | ||||||
|  | 
 | ||||||
|  | if TYPE_CHECKING: | ||||||
|  |     from tractor import ActorNursery | ||||||
|  | 
 | ||||||
|  | log = get_logger(__name__) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # ??TODO? is this even a good idea?? | ||||||
|  | # it's an extra LoC to stack `collapse_eg()` vs. | ||||||
|  | # a new/foreign/bad-std-named very thing wrapper..? | ||||||
|  | # -[ ] is there a better/simpler name? | ||||||
|  | # @acm | ||||||
|  | # async def open_loose_tn() -> trio.Nursery: | ||||||
|  | #     ''' | ||||||
|  | #     Implements the equivalent of the old style loose eg raising | ||||||
|  | #     task-nursery from `trio<=0.25.0` , | ||||||
|  | 
 | ||||||
|  | #     .. code-block:: python | ||||||
|  | 
 | ||||||
|  | #         async with trio.open_nursery( | ||||||
|  | #             strict_exception_groups=False, | ||||||
|  | #         ) as tn: | ||||||
|  | #             ... | ||||||
|  | 
 | ||||||
|  | #     ''' | ||||||
|  | #     async with ( | ||||||
|  | #         collapse_eg(), | ||||||
|  | #         trio.open_nursery() as tn, | ||||||
|  | #     ): | ||||||
|  | #         yield tn | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @acm | ||||||
|  | async def maybe_open_nursery( | ||||||
|  |     nursery: trio.Nursery|ActorNursery|None = None, | ||||||
|  |     shield: bool = False, | ||||||
|  |     lib: ModuleType = trio, | ||||||
|  |     loose: bool = False, | ||||||
|  | 
 | ||||||
|  |     **kwargs,  # proxy thru | ||||||
|  | 
 | ||||||
|  | ) -> AsyncGenerator[trio.Nursery, Any]: | ||||||
|  |     ''' | ||||||
|  |     Create a new nursery if None provided. | ||||||
|  | 
 | ||||||
|  |     Blocks on exit as expected if no input nursery is provided. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     if nursery is not None: | ||||||
|  |         yield nursery | ||||||
|  |     else: | ||||||
|  |         async with lib.open_nursery(**kwargs) as tn: | ||||||
|  |             tn.cancel_scope.shield = shield | ||||||
|  |             yield tn | ||||||
		Loading…
	
		Reference in New Issue