From 940e65fccf83dd4fb814c54519c4ad0708d90cda Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 May 2023 14:23:22 -0400 Subject: [PATCH] More refinements and proper typing - drop unneeded (and commented) internal cs allocating bits. - bypass all task manager stuff if no generator is provided by the caller; i.e. just call `.start_soon()` as normal. - fix `Generator` typing. - add some prints around task manager. - wrap in `TaskOutcome.lowlevel_task: Task`. --- tractor/trionics/_supervisor.py | 81 +++++++++++++++++---------------- 1 file changed, 43 insertions(+), 38 deletions(-) diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index d23e1df..46a1ccd 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -24,7 +24,10 @@ from contextlib import ( contextmanager as cm, nullcontext, ) -from typing import ContextManager +from typing import ( + Generator, + Any, +) from outcome import ( Outcome, @@ -47,6 +50,7 @@ class TaskOutcome(Struct): to the eventual boxed result/value or raised exception. ''' + lowlevel_task: Task _exited: Event = trio.Event() # as per `trio.Runner.task_exited()` _outcome: Outcome | None = None # as per `outcome.Outcome` _result: Any | None = None # the eventual maybe-returned-value @@ -105,7 +109,7 @@ class ScopePerTaskNursery(Struct): tuple[CancelScope, Outcome] ] = {} - scope_manager: ContextManager | None = None + scope_manager: Generator[Any, Outcome, None] | None = None async def start_soon( self, @@ -133,16 +137,13 @@ class ScopePerTaskNursery(Struct): if sm is None: return n.start_soon(async_fn, *args, name=None) - # per_task_cs = CancelScope() new_task: Task | None = None to_return: tuple[Any] | None = None # NOTE: what do we enforce as a signature for the # `@task_scope_manager` here? - mngr = sm( - nursery=n, - # scope=per_task_cs, - ) + mngr = sm(nursery=n) + async def _start_wrapped_in_scope( task_status: TaskStatus[ tuple[CancelScope, Task] @@ -153,13 +154,6 @@ class ScopePerTaskNursery(Struct): # TODO: this was working before?! # nonlocal to_return - task = trio.lowlevel.current_task() - # self._scopes[per_task_cs] = task - - # NOTE: we actually don't need this since the user can - # just to it themselves inside mngr! - # with per_task_cs: - # execute up to the first yield try: to_return: tuple[Any] = next(mngr) @@ -206,15 +200,9 @@ class ScopePerTaskNursery(Struct): return to_return -# TODO: you could wrap your output task handle in this? -# class TaskHandle(Struct): -# task: Task -# cs: CancelScope -# outcome: TaskOutcome - - -# TODO: maybe just make this a generator with a single yield that also -# delivers a value (of some std type) from the yield expression? +# TODO: define a decorator to runtime type check that this a generator +# with a single yield that also delivers a value (of some std type) from +# the yield expression? # @trio.task_scope_manager def add_task_handle_and_crash_handling( nursery: Nursery, @@ -224,20 +212,35 @@ def add_task_handle_and_crash_handling( # they want below? # scope: CancelScope, -) -> Generator[None, list[Any]]: +) -> Generator[ + Any, + Outcome, + None, +]: + ''' + A customizable, user defined "task scope manager". - task_outcome = TaskOutcome() + With this specially crafted single-yield generator function you can + add more granular controls around every task spawned by `trio` B) + ''' # if you need it you can ask trio for the task obj task: Task = trio.lowlevel.current_task() print(f'Spawning task: {task.name}') - # yields back when task is terminated, cancelled, returns. + # User defined "task handle" for more granular supervision + # of each spawned task as needed for their particular usage. + task_outcome = TaskOutcome(task) + + # NOTE: if wanted the user could wrap the output task handle however + # they want! + # class TaskHandle(Struct): + # task: Task + # cs: CancelScope + # outcome: TaskOutcome + + # this yields back when the task is terminated, cancelled or returns. try: - # XXX: wait, this isn't doing anything right since we'd have to - # manually activate this scope using something like: - # `task._activate_cancel_status(cs._cancel_status)` ?? - # oh wait, but `.__enter__()` does all that already? with CancelScope() as cs: # the yielded value(s) here are what are returned to the @@ -245,12 +248,16 @@ def add_task_handle_and_crash_handling( lowlevel_outcome: Outcome = yield (task_outcome, cs) task_outcome._set_outcome(lowlevel_outcome) + # Adds "crash handling" from `pdbp` by entering + # a REPL on std errors. except Exception as err: - # Adds "crash handling" from `pdbp` by entering - # a REPL on std errors. + print(f'{task.name} crashed, entering debugger!') pdbp.xpm() raise + finally: + print(f'{task.name} Exitted') + @acm async def open_nursery( @@ -264,11 +271,6 @@ async def open_nursery( ) -async def sleep_then_err(): - await trio.sleep(1) - assert 0 - - async def sleep_then_return_val(val: str): await trio.sleep(0.2) return val @@ -309,7 +311,10 @@ if __name__ == '__main__': print(f'{res} -> GOT EXPECTED TASK VALUE') await trio.sleep(0.6) - print('Cancelling and waiting for CRASH..') + print( + 'Cancelling and waiting on {err_outcome.lowlevel_task} ' + 'to CRASH..' + ) cs.cancel() trio.run(main)