More refinements and proper typing
- drop unneeded (and commented) internal cs allocating bits. - bypass all task manager stuff if no generator is provided by the caller; i.e. just call `.start_soon()` as normal. - fix `Generator` typing. - add some prints around task manager. - wrap in `TaskOutcome.lowlevel_task: Task`.
							parent
							
								
									311a1e6d55
								
							
						
					
					
						commit
						ed56eda684
					
				|  | @ -24,7 +24,10 @@ from contextlib import ( | |||
|     contextmanager as cm, | ||||
|     nullcontext, | ||||
| ) | ||||
| from typing import ContextManager | ||||
| from typing import ( | ||||
|     Generator, | ||||
|     Any, | ||||
| ) | ||||
| 
 | ||||
| from outcome import ( | ||||
|     Outcome, | ||||
|  | @ -47,6 +50,7 @@ class TaskOutcome(Struct): | |||
|     to the eventual boxed result/value or raised exception. | ||||
| 
 | ||||
|     ''' | ||||
|     lowlevel_task: Task | ||||
|     _exited: Event = trio.Event()  # as per `trio.Runner.task_exited()` | ||||
|     _outcome: Outcome | None = None  # as per `outcome.Outcome` | ||||
|     _result: Any | None = None  # the eventual maybe-returned-value | ||||
|  | @ -105,7 +109,7 @@ class ScopePerTaskNursery(Struct): | |||
|         tuple[CancelScope, Outcome] | ||||
|     ] = {} | ||||
| 
 | ||||
|     scope_manager: ContextManager | None = None | ||||
|     scope_manager: Generator[Any, Outcome, None] | None = None | ||||
| 
 | ||||
|     async def start_soon( | ||||
|         self, | ||||
|  | @ -133,16 +137,13 @@ class ScopePerTaskNursery(Struct): | |||
|         if sm is None: | ||||
|             return n.start_soon(async_fn, *args, name=None) | ||||
| 
 | ||||
|         # per_task_cs = CancelScope() | ||||
|         new_task: Task | None = None | ||||
|         to_return: tuple[Any] | None = None | ||||
| 
 | ||||
|         # NOTE: what do we enforce as a signature for the | ||||
|         # `@task_scope_manager` here? | ||||
|         mngr = sm( | ||||
|             nursery=n, | ||||
|             # scope=per_task_cs, | ||||
|         ) | ||||
|         mngr = sm(nursery=n) | ||||
| 
 | ||||
|         async def _start_wrapped_in_scope( | ||||
|             task_status: TaskStatus[ | ||||
|                 tuple[CancelScope, Task] | ||||
|  | @ -153,13 +154,6 @@ class ScopePerTaskNursery(Struct): | |||
|             # TODO: this was working before?! | ||||
|             # nonlocal to_return | ||||
| 
 | ||||
|             task = trio.lowlevel.current_task() | ||||
|             # self._scopes[per_task_cs] = task | ||||
| 
 | ||||
|             # NOTE: we actually don't need this since the user can | ||||
|             # just to it themselves inside mngr! | ||||
|             # with per_task_cs: | ||||
| 
 | ||||
|             # execute up to the first yield | ||||
|             try: | ||||
|                 to_return: tuple[Any] = next(mngr) | ||||
|  | @ -206,15 +200,9 @@ class ScopePerTaskNursery(Struct): | |||
|         return to_return | ||||
| 
 | ||||
| 
 | ||||
| # TODO: you could wrap your output task handle in this? | ||||
| # class TaskHandle(Struct): | ||||
| #     task: Task | ||||
| #     cs: CancelScope | ||||
| #     outcome: TaskOutcome | ||||
| 
 | ||||
| 
 | ||||
| # TODO: maybe just make this a generator with a single yield that also | ||||
| # delivers a value (of some std type) from the yield expression? | ||||
| # TODO: define a decorator to runtime type check that this a generator | ||||
| # with a single yield that also delivers a value (of some std type) from | ||||
| # the yield expression? | ||||
| # @trio.task_scope_manager | ||||
| def add_task_handle_and_crash_handling( | ||||
|     nursery: Nursery, | ||||
|  | @ -224,20 +212,35 @@ def add_task_handle_and_crash_handling( | |||
|     # they want below? | ||||
|     # scope: CancelScope, | ||||
| 
 | ||||
| ) -> Generator[None, list[Any]]: | ||||
| ) -> Generator[ | ||||
|     Any, | ||||
|     Outcome, | ||||
|     None, | ||||
| ]: | ||||
|     ''' | ||||
|     A customizable, user defined "task scope manager". | ||||
| 
 | ||||
|     task_outcome = TaskOutcome() | ||||
|     With this specially crafted single-yield generator function you can | ||||
|     add more granular controls around every task spawned by `trio` B) | ||||
| 
 | ||||
|     ''' | ||||
|     # if you need it you can ask trio for the task obj | ||||
|     task: Task = trio.lowlevel.current_task() | ||||
|     print(f'Spawning task: {task.name}') | ||||
| 
 | ||||
|     # yields back when task is terminated, cancelled, returns. | ||||
|     # User defined "task handle" for more granular supervision | ||||
|     # of each spawned task as needed for their particular usage. | ||||
|     task_outcome = TaskOutcome(task) | ||||
| 
 | ||||
|     # NOTE: if wanted the user could wrap the output task handle however | ||||
|     # they want! | ||||
|     # class TaskHandle(Struct): | ||||
|     #     task: Task | ||||
|     #     cs: CancelScope | ||||
|     #     outcome: TaskOutcome | ||||
| 
 | ||||
|     # this yields back when the task is terminated, cancelled or returns. | ||||
|     try: | ||||
|         # XXX: wait, this isn't doing anything right since we'd have to | ||||
|         # manually activate this scope using something like: | ||||
|         # `task._activate_cancel_status(cs._cancel_status)` ?? | ||||
|         # oh wait, but `.__enter__()` does all that already? | ||||
|         with CancelScope() as cs: | ||||
| 
 | ||||
|             # the yielded value(s) here are what are returned to the | ||||
|  | @ -245,12 +248,16 @@ def add_task_handle_and_crash_handling( | |||
|             lowlevel_outcome: Outcome = yield (task_outcome, cs) | ||||
|             task_outcome._set_outcome(lowlevel_outcome) | ||||
| 
 | ||||
|     # Adds "crash handling" from `pdbp` by entering | ||||
|     # a REPL on std errors. | ||||
|     except Exception as err: | ||||
|         # Adds "crash handling" from `pdbp` by entering | ||||
|         # a REPL on std errors. | ||||
|         print(f'{task.name} crashed, entering debugger!') | ||||
|         pdbp.xpm() | ||||
|         raise | ||||
| 
 | ||||
|     finally: | ||||
|         print(f'{task.name} Exitted') | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_nursery( | ||||
|  | @ -264,11 +271,6 @@ async def open_nursery( | |||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| async def sleep_then_err(): | ||||
|     await trio.sleep(1) | ||||
|     assert 0 | ||||
| 
 | ||||
| 
 | ||||
| async def sleep_then_return_val(val: str): | ||||
|     await trio.sleep(0.2) | ||||
|     return val | ||||
|  | @ -309,7 +311,10 @@ if __name__ == '__main__': | |||
|             print(f'{res} -> GOT EXPECTED TASK VALUE') | ||||
| 
 | ||||
|             await trio.sleep(0.6) | ||||
|             print('Cancelling and waiting for CRASH..') | ||||
|             print( | ||||
|                 'Cancelling and waiting on {err_outcome.lowlevel_task} ' | ||||
|                 'to CRASH..' | ||||
|             ) | ||||
|             cs.cancel() | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue