Do renaming, implement lowlevel `Outcome` sending

As was listed in the many todos, this changes the `.start_soon()` impl
to instead (manually) `.send()` into the user defined
`@task_scope_manager` an `Outcome` from the spawned task. In this case
the task manager wraps that in a user defined (and renamed)
`TaskOutcome` and delivers that + a containing `trio.CancelScope` to the
`.start_soon()` caller. Here the user defined `TaskOutcome` defines
a `.wait_for_result()` method that can be used to await the task's exit
and handle it's underlying returned value or raised error; the
implementation could be different and subject to the user's own whims.

Note that by default, if this was added to `trio`'s core, the
`@task_scope_manager` would simply be implemented as either a `None`
yielding single-yield-generator but more likely just entirely ignored
by the runtime (as in no manual task outcome collecting, generator
calling and sending is done at all) by default if the user does not provide
the `task_scope_manager` to the nursery at open time.
oco_supervisor_prototype
Tyler Goodlet 2023-05-19 13:13:21 -04:00
parent b4858710a9
commit 65c5d7da4e
1 changed files with 87 additions and 58 deletions

View File

@ -39,11 +39,17 @@ from trio._core._run import (
Nursery, Nursery,
) )
class MaybeOutcome(Struct):
_ready: Event = trio.Event() class TaskOutcome(Struct):
_outcome: Outcome | None = None '''
_result: Any | None = None The outcome of a scheduled ``trio`` task which includes an interface
for synchronizing to the completion of the task's runtime and access
to the eventual boxed result/value or raised exception.
'''
_exited: Event = trio.Event() # as per `trio.Runner.task_exited()`
_outcome: Outcome | None = None # as per `outcome.Outcome`
_result: Any | None = None # the eventual maybe-returned-value
@property @property
def result(self) -> Any: def result(self) -> Any:
@ -55,7 +61,7 @@ class MaybeOutcome(Struct):
raise RuntimeError( raise RuntimeError(
# f'Task {task.name} is not complete.\n' # f'Task {task.name} is not complete.\n'
f'Outcome is not complete.\n' f'Outcome is not complete.\n'
'wait on `await MaybeOutcome.unwrap()` first!' 'wait on `await TaskOutcome.wait_for_result()` first!'
) )
return self._result return self._result
@ -63,19 +69,27 @@ class MaybeOutcome(Struct):
self, self,
outcome: Outcome, outcome: Outcome,
): ):
'''
Set the ``Outcome`` for this task.
This method should only ever be called by the task's supervising
nursery implemenation.
'''
self._outcome = outcome self._outcome = outcome
self._result = outcome.unwrap() self._result = outcome.unwrap()
self._ready.set() self._exited.set()
# TODO: maybe a better name like, async def wait_for_result(self) -> Any:
# - .wait_and_unwrap() '''
# - .wait_unwrap() Unwind the underlying task's ``Outcome`` by async waiting for
# - .aunwrap() ? the task to first complete and then unwrap it's result-value.
async def unwrap(self) -> Any:
if self._ready.is_set(): '''
if self._exited.is_set():
return self._result return self._result
await self._ready.wait() await self._exited.wait()
out = self._outcome out = self._outcome
if out is None: if out is None:
@ -84,13 +98,6 @@ class MaybeOutcome(Struct):
return self.result return self.result
class TaskHandle(Struct):
task: Task
cs: CancelScope
exited: Event | None = None
_outcome: Outcome | None = None
class ScopePerTaskNursery(Struct): class ScopePerTaskNursery(Struct):
_n: Nursery _n: Nursery
_scopes: dict[ _scopes: dict[
@ -122,17 +129,14 @@ class ScopePerTaskNursery(Struct):
cs = CancelScope() cs = CancelScope()
new_task: Task | None = None new_task: Task | None = None
to_return: tuple[Any] | None = None to_return: tuple[Any] | None = None
maybe_outcome = MaybeOutcome()
sm = self.scope_manager sm = self.scope_manager
if sm is None: if sm is None:
mngr = nullcontext([cs]) mngr = nullcontext([cs])
else: else:
mngr = sm( # NOTE: what do we enforce as a signature for the
nursery=n, # `@task_scope_manager` here?
scope=cs, mngr = sm(nursery=n, scope=cs)
maybe_outcome=maybe_outcome,
)
async def _start_wrapped_in_scope( async def _start_wrapped_in_scope(
task_status: TaskStatus[ task_status: TaskStatus[
@ -140,55 +144,81 @@ class ScopePerTaskNursery(Struct):
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
nonlocal maybe_outcome
nonlocal to_return # TODO: this was working before?!
# nonlocal to_return
with cs: with cs:
task = trio.lowlevel.current_task() task = trio.lowlevel.current_task()
self._scopes[cs] = task self._scopes[cs] = task
# TODO: instead we should probably just use # execute up to the first yield
# `Outcome.send(mngr)` here no and inside a custom try:
# decorator `@trio.cancel_scope_manager` enforce to_return: tuple[Any] = next(mngr)
# that it's a single yield generator? except StopIteration:
with mngr as to_return: raise RuntimeError("task manager didn't yield") from None
# TODO: relay through whatever the # TODO: how do we support `.start()` style?
# started task passes back via `.started()` ? # - relay through whatever the
# seems like that won't work with also returning # started task passes back via `.started()` ?
# a "task handle"? # seems like that won't work with also returning
task_status.started() # a "task handle"?
# - we were previously binding-out this `to_return` to
# the parent's lexical scope, why isn't that working
# now?
task_status.started(to_return)
# invoke underlying func now that cs is entered. # invoke underlying func now that cs is entered.
outcome = await acapture(async_fn, *args) outcome = await acapture(async_fn, *args)
# TODO: instead, mngr.send(outcome) so that we don't # execute from the 1st yield to return and expect
# tie this `.start_soon()` impl to the # generator-mngr `@task_scope_manager` thinger to
# `MaybeOutcome` type? Use `Outcome.send(mngr)` # terminate!
# right? try:
maybe_outcome._set_outcome(outcome) mngr.send(outcome)
await n.start(_start_wrapped_in_scope) # NOTE: this will instead send the underlying
# `.value`? Not sure if that's better or not?
# I would presume it's better to have a handle to
# the `Outcome` entirely? This method sends *into*
# the mngr this `Outcome.value`; seems like kinda
# weird semantics for our purposes?
# outcome.send(mngr)
except StopIteration:
return
else:
raise RuntimeError(f"{mngr} didn't stop!")
to_return = await n.start(_start_wrapped_in_scope)
assert to_return is not None assert to_return is not None
# TODO: better way to concat the values delivered by the user # TODO: use the fancy type-check-time type signature stuff from
# provided `.scope_manager` and the outcome? # mypy i guess..to like, relay the type of whatever the
return tuple([maybe_outcome] + to_return) # generator yielded through? betcha that'll be un-grokable XD
return to_return
# TODO: you could wrap your output task handle in this?
# class TaskHandle(Struct):
# task: Task
# cs: CancelScope
# outcome: TaskOutcome
# TODO: maybe just make this a generator with a single yield that also # TODO: maybe just make this a generator with a single yield that also
# delivers a value (of some std type) from the yield expression? # delivers a value (of some std type) from the yield expression?
# @trio.cancel_scope_manager # @trio.task_scope_manager
@cm
def add_task_handle_and_crash_handling( def add_task_handle_and_crash_handling(
nursery: Nursery, nursery: Nursery,
scope: CancelScope, scope: CancelScope,
maybe_outcome: MaybeOutcome,
) -> Generator[None, list[Any]]: ) -> Generator[None, list[Any]]:
cs: CancelScope = CancelScope() cs: CancelScope = CancelScope()
task_outcome = TaskOutcome()
# if you need it you can ask trio for the task obj # if you need it you can ask trio for the task obj
task: Task = trio.lowlevel.current_task() task: Task = trio.lowlevel.current_task()
@ -197,12 +227,11 @@ def add_task_handle_and_crash_handling(
try: try:
# yields back when task is terminated, cancelled, returns? # yields back when task is terminated, cancelled, returns?
with cs: with cs:
# the yielded values here are what are returned to the
# nursery's `.start_soon()` caller
# TODO: actually make this work so that `MaybeOutcome` isn't # the yielded value(s) here are what are returned to the
# tied to the impl of `.start_soon()` on our custom nursery! # nursery's `.start_soon()` caller B)
task_outcome: Outcome = yield [cs] lowlevel_outcome: Outcome = yield (task_outcome, cs)
task_outcome._set_outcome(lowlevel_outcome)
except Exception as err: except Exception as err:
# Adds "crash handling" from `pdbp` by entering # Adds "crash handling" from `pdbp` by entering
@ -247,7 +276,7 @@ if __name__ == '__main__':
val: str = 'yoyoyo' val: str = 'yoyoyo'
val_outcome, cs = await sn.start_soon(sleep_then_return_val, val) val_outcome, cs = await sn.start_soon(sleep_then_return_val, val)
res = await val_outcome.unwrap() res = await val_outcome.wait_for_result()
assert res == val assert res == val
print(f'GOT EXPECTED TASK VALUE: {res}') print(f'GOT EXPECTED TASK VALUE: {res}')