forked from goodboy/tractor
1
0
Fork 0

Go all in on "task manager" naming

oco_supervisor_prototype
Tyler Goodlet 2023-05-19 14:49:10 -04:00
parent 940e65fccf
commit e0c888fd5c
1 changed files with 14 additions and 23 deletions

View File

@ -22,7 +22,6 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
contextmanager as cm, contextmanager as cm,
nullcontext,
) )
from typing import ( from typing import (
Generator, Generator,
@ -51,7 +50,7 @@ class TaskOutcome(Struct):
''' '''
lowlevel_task: Task lowlevel_task: Task
_exited: Event = trio.Event() # as per `trio.Runner.task_exited()` _exited = trio.Event() # as per `trio.Runner.task_exited()`
_outcome: Outcome | None = None # as per `outcome.Outcome` _outcome: Outcome | None = None # as per `outcome.Outcome`
_result: Any | None = None # the eventual maybe-returned-value _result: Any | None = None # the eventual maybe-returned-value
@ -63,9 +62,8 @@ class TaskOutcome(Struct):
''' '''
if self._outcome is None: if self._outcome is None:
raise RuntimeError( raise RuntimeError(
# f'Task {task.name} is not complete.\n' f'Task {self.lowlevel_task.name} is not complete.\n'
f'Outcome is not complete.\n' 'First wait on `await TaskOutcome.wait_for_result()`!'
'wait on `await TaskOutcome.wait_for_result()` first!'
) )
return self._result return self._result
@ -102,14 +100,14 @@ class TaskOutcome(Struct):
return self.result return self.result
class ScopePerTaskNursery(Struct): class TaskManagerNursery(Struct):
_n: Nursery _n: Nursery
_scopes: dict[ _scopes: dict[
Task, Task,
tuple[CancelScope, Outcome] tuple[CancelScope, Outcome]
] = {} ] = {}
scope_manager: Generator[Any, Outcome, None] | None = None task_manager: Generator[Any, Outcome, None] | None = None
async def start_soon( async def start_soon(
self, self,
@ -117,7 +115,7 @@ class ScopePerTaskNursery(Struct):
*args, *args,
name=None, name=None,
scope_manager: ContextManager | None = None, task_manager: Generator[Any, Outcome, None] | None = None
) -> tuple[CancelScope, Task]: ) -> tuple[CancelScope, Task]:
@ -131,7 +129,7 @@ class ScopePerTaskNursery(Struct):
n: Nursery = self._n n: Nursery = self._n
sm = self.scope_manager sm = self.task_manager
# we do default behavior of a scope-per-nursery # we do default behavior of a scope-per-nursery
# if the user did not provide a task manager. # if the user did not provide a task manager.
if sm is None: if sm is None:
@ -151,7 +149,8 @@ class ScopePerTaskNursery(Struct):
) -> None: ) -> None:
# TODO: this was working before?! # TODO: this was working before?! and, do we need something
# like it to implement `.start()`?
# nonlocal to_return # nonlocal to_return
# execute up to the first yield # execute up to the first yield
@ -203,15 +202,10 @@ class ScopePerTaskNursery(Struct):
# TODO: define a decorator to runtime type check that this a generator # TODO: define a decorator to runtime type check that this a generator
# with a single yield that also delivers a value (of some std type) from # with a single yield that also delivers a value (of some std type) from
# the yield expression? # the yield expression?
# @trio.task_scope_manager # @trio.task_manager
def add_task_handle_and_crash_handling( def add_task_handle_and_crash_handling(
nursery: Nursery, nursery: Nursery,
# TODO: is this the only way we can have a per-task scope
# allocated or can we allow the user to somehow do it if
# they want below?
# scope: CancelScope,
) -> Generator[ ) -> Generator[
Any, Any,
Outcome, Outcome,
@ -261,14 +255,11 @@ def add_task_handle_and_crash_handling(
@acm @acm
async def open_nursery( async def open_nursery(
scope_manager = None, task_manager = None,
**kwargs, **kwargs,
): ):
async with trio.open_nursery(**kwargs) as nurse: async with trio.open_nursery(**kwargs) as nurse:
yield ScopePerTaskNursery( yield TaskManagerNursery(nurse, task_manager=task_manager)
nurse,
scope_manager=scope_manager,
)
async def sleep_then_return_val(val: str): async def sleep_then_return_val(val: str):
@ -293,7 +284,7 @@ if __name__ == '__main__':
async def main(): async def main():
async with open_nursery( async with open_nursery(
scope_manager=add_task_handle_and_crash_handling, task_manager=add_task_handle_and_crash_handling,
) as sn: ) as sn:
for _ in range(3): for _ in range(3):
outcome, _ = await sn.start_soon(trio.sleep_forever) outcome, _ = await sn.start_soon(trio.sleep_forever)
@ -312,7 +303,7 @@ if __name__ == '__main__':
await trio.sleep(0.6) await trio.sleep(0.6)
print( print(
'Cancelling and waiting on {err_outcome.lowlevel_task} ' f'Cancelling and waiting on {err_outcome.lowlevel_task} '
'to CRASH..' 'to CRASH..'
) )
cs.cancel() cs.cancel()