diff --git a/tractor/trionics/_supervisor.py b/tractor/trionics/_supervisor.py index faeb761..2a664a9 100644 --- a/tractor/trionics/_supervisor.py +++ b/tractor/trionics/_supervisor.py @@ -39,11 +39,17 @@ from trio._core._run import ( Nursery, ) -class MaybeOutcome(Struct): - _ready: Event = trio.Event() - _outcome: Outcome | None = None - _result: Any | None = None +class TaskOutcome(Struct): + ''' + The outcome of a scheduled ``trio`` task which includes an interface + for synchronizing to the completion of the task's runtime and access + to the eventual boxed result/value or raised exception. + + ''' + _exited: Event = trio.Event() # as per `trio.Runner.task_exited()` + _outcome: Outcome | None = None # as per `outcome.Outcome` + _result: Any | None = None # the eventual maybe-returned-value @property def result(self) -> Any: @@ -55,7 +61,7 @@ class MaybeOutcome(Struct): raise RuntimeError( # f'Task {task.name} is not complete.\n' f'Outcome is not complete.\n' - 'wait on `await MaybeOutcome.unwrap()` first!' + 'wait on `await TaskOutcome.wait_for_result()` first!' ) return self._result @@ -63,19 +69,27 @@ class MaybeOutcome(Struct): self, outcome: Outcome, ): + ''' + Set the ``Outcome`` for this task. + + This method should only ever be called by the task's supervising + nursery implemenation. + + ''' self._outcome = outcome self._result = outcome.unwrap() - self._ready.set() + self._exited.set() - # TODO: maybe a better name like, - # - .wait_and_unwrap() - # - .wait_unwrap() - # - .aunwrap() ? - async def unwrap(self) -> Any: - if self._ready.is_set(): + async def wait_for_result(self) -> Any: + ''' + Unwind the underlying task's ``Outcome`` by async waiting for + the task to first complete and then unwrap it's result-value. + + ''' + if self._exited.is_set(): return self._result - await self._ready.wait() + await self._exited.wait() out = self._outcome if out is None: @@ -84,13 +98,6 @@ class MaybeOutcome(Struct): return self.result -class TaskHandle(Struct): - task: Task - cs: CancelScope - exited: Event | None = None - _outcome: Outcome | None = None - - class ScopePerTaskNursery(Struct): _n: Nursery _scopes: dict[ @@ -122,17 +129,14 @@ class ScopePerTaskNursery(Struct): cs = CancelScope() new_task: Task | None = None to_return: tuple[Any] | None = None - maybe_outcome = MaybeOutcome() sm = self.scope_manager if sm is None: mngr = nullcontext([cs]) else: - mngr = sm( - nursery=n, - scope=cs, - maybe_outcome=maybe_outcome, - ) + # NOTE: what do we enforce as a signature for the + # `@task_scope_manager` here? + mngr = sm(nursery=n, scope=cs) async def _start_wrapped_in_scope( task_status: TaskStatus[ @@ -140,55 +144,81 @@ class ScopePerTaskNursery(Struct): ] = trio.TASK_STATUS_IGNORED, ) -> None: - nonlocal maybe_outcome - nonlocal to_return + + # TODO: this was working before?! + # nonlocal to_return with cs: task = trio.lowlevel.current_task() self._scopes[cs] = task - # TODO: instead we should probably just use - # `Outcome.send(mngr)` here no and inside a custom - # decorator `@trio.cancel_scope_manager` enforce - # that it's a single yield generator? - with mngr as to_return: + # execute up to the first yield + try: + to_return: tuple[Any] = next(mngr) + except StopIteration: + raise RuntimeError("task manager didn't yield") from None - # TODO: relay through whatever the - # started task passes back via `.started()` ? - # seems like that won't work with also returning - # a "task handle"? - task_status.started() + # TODO: how do we support `.start()` style? + # - relay through whatever the + # started task passes back via `.started()` ? + # seems like that won't work with also returning + # a "task handle"? + # - we were previously binding-out this `to_return` to + # the parent's lexical scope, why isn't that working + # now? + task_status.started(to_return) - # invoke underlying func now that cs is entered. - outcome = await acapture(async_fn, *args) + # invoke underlying func now that cs is entered. + outcome = await acapture(async_fn, *args) - # TODO: instead, mngr.send(outcome) so that we don't - # tie this `.start_soon()` impl to the - # `MaybeOutcome` type? Use `Outcome.send(mngr)` - # right? - maybe_outcome._set_outcome(outcome) + # execute from the 1st yield to return and expect + # generator-mngr `@task_scope_manager` thinger to + # terminate! + try: + mngr.send(outcome) - await n.start(_start_wrapped_in_scope) + # NOTE: this will instead send the underlying + # `.value`? Not sure if that's better or not? + # I would presume it's better to have a handle to + # the `Outcome` entirely? This method sends *into* + # the mngr this `Outcome.value`; seems like kinda + # weird semantics for our purposes? + # outcome.send(mngr) + + except StopIteration: + return + else: + raise RuntimeError(f"{mngr} didn't stop!") + + to_return = await n.start(_start_wrapped_in_scope) assert to_return is not None - # TODO: better way to concat the values delivered by the user - # provided `.scope_manager` and the outcome? - return tuple([maybe_outcome] + to_return) + # TODO: use the fancy type-check-time type signature stuff from + # mypy i guess..to like, relay the type of whatever the + # generator yielded through? betcha that'll be un-grokable XD + return to_return + + + +# TODO: you could wrap your output task handle in this? +# class TaskHandle(Struct): +# task: Task +# cs: CancelScope +# outcome: TaskOutcome # TODO: maybe just make this a generator with a single yield that also # delivers a value (of some std type) from the yield expression? -# @trio.cancel_scope_manager -@cm +# @trio.task_scope_manager def add_task_handle_and_crash_handling( nursery: Nursery, scope: CancelScope, - maybe_outcome: MaybeOutcome, ) -> Generator[None, list[Any]]: cs: CancelScope = CancelScope() + task_outcome = TaskOutcome() # if you need it you can ask trio for the task obj task: Task = trio.lowlevel.current_task() @@ -197,12 +227,11 @@ def add_task_handle_and_crash_handling( try: # yields back when task is terminated, cancelled, returns? with cs: - # the yielded values here are what are returned to the - # nursery's `.start_soon()` caller - # TODO: actually make this work so that `MaybeOutcome` isn't - # tied to the impl of `.start_soon()` on our custom nursery! - task_outcome: Outcome = yield [cs] + # the yielded value(s) here are what are returned to the + # nursery's `.start_soon()` caller B) + lowlevel_outcome: Outcome = yield (task_outcome, cs) + task_outcome._set_outcome(lowlevel_outcome) except Exception as err: # Adds "crash handling" from `pdbp` by entering @@ -247,7 +276,7 @@ if __name__ == '__main__': val: str = 'yoyoyo' val_outcome, cs = await sn.start_soon(sleep_then_return_val, val) - res = await val_outcome.unwrap() + res = await val_outcome.wait_for_result() assert res == val print(f'GOT EXPECTED TASK VALUE: {res}')