Initial prototype for a one-cancels-one style supervisor, nursery thing..
							parent
							
								
									fd314deecb
								
							
						
					
					
						commit
						d5b54f3f5e
					
				|  | @ -0,0 +1,256 @@ | ||||||
|  | # tractor: structured concurrent "actors". | ||||||
|  | # Copyright 2018-eternity Tyler Goodlet. | ||||||
|  | 
 | ||||||
|  | # This program is free software: you can redistribute it and/or modify | ||||||
|  | # it under the terms of the GNU Affero General Public License as published by | ||||||
|  | # the Free Software Foundation, either version 3 of the License, or | ||||||
|  | # (at your option) any later version. | ||||||
|  | 
 | ||||||
|  | # This program is distributed in the hope that it will be useful, | ||||||
|  | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
|  | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||||
|  | # GNU Affero General Public License for more details. | ||||||
|  | 
 | ||||||
|  | # You should have received a copy of the GNU Affero General Public License | ||||||
|  | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | Erlang-style (ish) "one-cancels-one" nursery. | ||||||
|  | 
 | ||||||
|  | ''' | ||||||
|  | from __future__ import annotations | ||||||
|  | from contextlib import ( | ||||||
|  |     asynccontextmanager as acm, | ||||||
|  |     contextmanager as cm, | ||||||
|  |     nullcontext, | ||||||
|  | ) | ||||||
|  | from typing import ContextManager | ||||||
|  | 
 | ||||||
|  | from outcome import ( | ||||||
|  |     Outcome, | ||||||
|  |     acapture, | ||||||
|  | ) | ||||||
|  | import pdbp | ||||||
|  | from msgspec import Struct | ||||||
|  | import trio | ||||||
|  | from trio._core._run import ( | ||||||
|  |     Task, | ||||||
|  |     CancelScope, | ||||||
|  |     Nursery, | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | class MaybeOutcome(Struct): | ||||||
|  | 
 | ||||||
|  |     _ready: Event = trio.Event() | ||||||
|  |     _outcome: Outcome | None = None | ||||||
|  |     _result: Any | None = None | ||||||
|  | 
 | ||||||
|  |     @property | ||||||
|  |     def result(self) -> Any: | ||||||
|  |         ''' | ||||||
|  |         Either Any or None depending on whether the Outcome has compeleted. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         if self._outcome is None: | ||||||
|  |             raise RuntimeError( | ||||||
|  |                 # f'Task {task.name} is not complete.\n' | ||||||
|  |                 f'Outcome is not complete.\n' | ||||||
|  |                 'wait on `await MaybeOutcome.unwrap()` first!' | ||||||
|  |             ) | ||||||
|  |         return self._result | ||||||
|  | 
 | ||||||
|  |     def _set_outcome( | ||||||
|  |         self, | ||||||
|  |         outcome: Outcome, | ||||||
|  |     ): | ||||||
|  |         self._outcome = outcome | ||||||
|  |         self._result = outcome.unwrap() | ||||||
|  |         self._ready.set() | ||||||
|  | 
 | ||||||
|  |     # TODO: maybe a better name like, | ||||||
|  |     # - .wait_and_unwrap() | ||||||
|  |     # - .wait_unwrap() | ||||||
|  |     # - .aunwrap() ? | ||||||
|  |     async def unwrap(self) -> Any: | ||||||
|  |         if self._ready.is_set(): | ||||||
|  |             return self._result | ||||||
|  | 
 | ||||||
|  |         await self._ready.wait() | ||||||
|  | 
 | ||||||
|  |         out = self._outcome | ||||||
|  |         if out is None: | ||||||
|  |             raise ValueError(f'{out} is not an outcome!?') | ||||||
|  | 
 | ||||||
|  |         return self.result | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class TaskHandle(Struct): | ||||||
|  |     task: Task | ||||||
|  |     cs: CancelScope | ||||||
|  |     exited: Event | None = None | ||||||
|  |     _outcome: Outcome | None = None | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class ScopePerTaskNursery(Struct): | ||||||
|  |     _n: Nursery | ||||||
|  |     _scopes: dict[ | ||||||
|  |         Task, | ||||||
|  |         tuple[CancelScope, Outcome] | ||||||
|  |     ] = {} | ||||||
|  | 
 | ||||||
|  |     scope_manager: ContextManager | None = None | ||||||
|  | 
 | ||||||
|  |     async def start_soon( | ||||||
|  |         self, | ||||||
|  |         async_fn, | ||||||
|  |         *args, | ||||||
|  | 
 | ||||||
|  |         name=None, | ||||||
|  |         scope_manager: ContextManager | None = None, | ||||||
|  | 
 | ||||||
|  |     ) -> tuple[CancelScope, Task]: | ||||||
|  | 
 | ||||||
|  |         # NOTE: internals of a nursery don't let you know what | ||||||
|  |         # the most recently spawned task is by order.. so we'd | ||||||
|  |         # have to either change that or do set ops. | ||||||
|  |         # pre_start_tasks: set[Task] = n._children.copy() | ||||||
|  |         # new_tasks = n._children - pre_start_Tasks | ||||||
|  |         # assert len(new_tasks) == 1 | ||||||
|  |         # task = new_tasks.pop() | ||||||
|  | 
 | ||||||
|  |         n: Nursery = self._n | ||||||
|  |         cs = CancelScope() | ||||||
|  |         new_task: Task | None = None | ||||||
|  |         to_return: tuple[Any] | None = None | ||||||
|  |         maybe_outcome = MaybeOutcome() | ||||||
|  | 
 | ||||||
|  |         sm = self.scope_manager | ||||||
|  |         if sm is None: | ||||||
|  |             mngr = nullcontext([cs]) | ||||||
|  |         else: | ||||||
|  |             mngr = sm( | ||||||
|  |                 nursery=n, | ||||||
|  |                 scope=cs, | ||||||
|  |                 maybe_outcome=maybe_outcome, | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         async def _start_wrapped_in_scope( | ||||||
|  |             task_status: TaskStatus[ | ||||||
|  |                 tuple[CancelScope, Task] | ||||||
|  |             ] = trio.TASK_STATUS_IGNORED, | ||||||
|  | 
 | ||||||
|  |         ) -> None: | ||||||
|  |             nonlocal maybe_outcome | ||||||
|  |             nonlocal to_return | ||||||
|  | 
 | ||||||
|  |             with cs: | ||||||
|  | 
 | ||||||
|  |                 task = trio.lowlevel.current_task() | ||||||
|  |                 self._scopes[cs] = task | ||||||
|  | 
 | ||||||
|  |                 # TODO: instead we should probably just use | ||||||
|  |                 # `Outcome.send(mngr)` here no and inside a custom | ||||||
|  |                 # decorator `@trio.cancel_scope_manager` enforce | ||||||
|  |                 # that it's a single yield generator? | ||||||
|  |                 with mngr as to_return: | ||||||
|  | 
 | ||||||
|  |                     # TODO: relay through whatever the | ||||||
|  |                     # started task passes back via `.started()` ? | ||||||
|  |                     # seems like that won't work with also returning | ||||||
|  |                     # a "task handle"? | ||||||
|  |                     task_status.started() | ||||||
|  | 
 | ||||||
|  |                     # invoke underlying func now that cs is entered. | ||||||
|  |                     outcome = await acapture(async_fn, *args) | ||||||
|  | 
 | ||||||
|  |                     # TODO: instead, mngr.send(outcome) so that we don't | ||||||
|  |                     # tie this `.start_soon()` impl to the | ||||||
|  |                     # `MaybeOutcome` type? Use `Outcome.send(mngr)` | ||||||
|  |                     # right? | ||||||
|  |                     maybe_outcome._set_outcome(outcome) | ||||||
|  | 
 | ||||||
|  |         await n.start(_start_wrapped_in_scope) | ||||||
|  |         assert to_return is not None | ||||||
|  | 
 | ||||||
|  |         # TODO: better way to concat the values delivered by the user | ||||||
|  |         # provided `.scope_manager` and the outcome? | ||||||
|  |         return tuple([maybe_outcome] + to_return) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # TODO: maybe just make this a generator with a single yield that also | ||||||
|  | # delivers a value (of some std type) from the yield expression? | ||||||
|  | # @trio.cancel_scope_manager | ||||||
|  | @cm | ||||||
|  | def add_task_handle_and_crash_handling( | ||||||
|  |     nursery: Nursery, | ||||||
|  |     scope: CancelScope, | ||||||
|  |     maybe_outcome: MaybeOutcome, | ||||||
|  | 
 | ||||||
|  | ) -> Generator[None, list[Any]]: | ||||||
|  | 
 | ||||||
|  |     cs: CancelScope = CancelScope() | ||||||
|  | 
 | ||||||
|  |     # if you need it you can ask trio for the task obj | ||||||
|  |     task: Task = trio.lowlevel.current_task() | ||||||
|  |     print(f'Spawning task: {task.name}') | ||||||
|  | 
 | ||||||
|  |     try: | ||||||
|  |         # yields back when task is terminated, cancelled, returns? | ||||||
|  |         with cs: | ||||||
|  |             # the yielded values here are what are returned to the | ||||||
|  |             # nursery's `.start_soon()` caller | ||||||
|  | 
 | ||||||
|  |             # TODO: actually make this work so that `MaybeOutcome` isn't | ||||||
|  |             # tied to the impl of `.start_soon()` on our custom nursery! | ||||||
|  |             task_outcome: Outcome = yield [cs] | ||||||
|  | 
 | ||||||
|  |     except Exception as err: | ||||||
|  |         # Adds "crash handling" from `pdbp` by entering | ||||||
|  |         # a REPL on std errors. | ||||||
|  |         pdbp.xpm() | ||||||
|  |         raise | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @acm | ||||||
|  | async def open_nursery( | ||||||
|  |     scope_manager = None, | ||||||
|  |     **kwargs, | ||||||
|  | ): | ||||||
|  |     async with trio.open_nursery(**kwargs) as nurse: | ||||||
|  |         yield ScopePerTaskNursery( | ||||||
|  |             nurse, | ||||||
|  |             scope_manager=scope_manager, | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def sleep_then_err(): | ||||||
|  |     await trio.sleep(1) | ||||||
|  |     assert 0 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | async def sleep_then_return_val(val: str): | ||||||
|  |     await trio.sleep(0.2) | ||||||
|  |     return val | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == '__main__': | ||||||
|  | 
 | ||||||
|  |     async def main(): | ||||||
|  |         async with open_nursery( | ||||||
|  |             scope_manager=add_task_handle_and_crash_handling, | ||||||
|  |         ) as sn: | ||||||
|  |             for _ in range(3): | ||||||
|  |                 outcome, cs = await sn.start_soon(trio.sleep_forever) | ||||||
|  | 
 | ||||||
|  |             # extra task we want to engage in debugger post mortem. | ||||||
|  |             err_outcome, *_ = await sn.start_soon(sleep_then_err) | ||||||
|  | 
 | ||||||
|  |             val: str = 'yoyoyo' | ||||||
|  |             val_outcome, cs = await sn.start_soon(sleep_then_return_val, val) | ||||||
|  |             res = await val_outcome.unwrap() | ||||||
|  |             assert res == val | ||||||
|  |             print(f'GOT EXPECTED TASK VALUE: {res}') | ||||||
|  | 
 | ||||||
|  |             print('WAITING FOR CRASH..') | ||||||
|  | 
 | ||||||
|  |     trio.run(main) | ||||||
		Loading…
	
		Reference in New Issue