forked from goodboy/tractor
1
0
Fork 0

More refinements and proper typing

- drop unneeded (and commented) internal cs allocating bits.
- bypass all task manager stuff if no generator is provided by the
  caller; i.e. just call `.start_soon()` as normal.
- fix `Generator` typing.
- add some prints around task manager.
- wrap in `TaskOutcome.lowlevel_task: Task`.
oco_supervisor_prototype
Tyler Goodlet 2023-05-19 14:23:22 -04:00
parent 56882b680c
commit 940e65fccf
1 changed files with 43 additions and 38 deletions

View File

@ -24,7 +24,10 @@ from contextlib import (
contextmanager as cm, contextmanager as cm,
nullcontext, nullcontext,
) )
from typing import ContextManager from typing import (
Generator,
Any,
)
from outcome import ( from outcome import (
Outcome, Outcome,
@ -47,6 +50,7 @@ class TaskOutcome(Struct):
to the eventual boxed result/value or raised exception. to the eventual boxed result/value or raised exception.
''' '''
lowlevel_task: Task
_exited: Event = trio.Event() # as per `trio.Runner.task_exited()` _exited: Event = trio.Event() # as per `trio.Runner.task_exited()`
_outcome: Outcome | None = None # as per `outcome.Outcome` _outcome: Outcome | None = None # as per `outcome.Outcome`
_result: Any | None = None # the eventual maybe-returned-value _result: Any | None = None # the eventual maybe-returned-value
@ -105,7 +109,7 @@ class ScopePerTaskNursery(Struct):
tuple[CancelScope, Outcome] tuple[CancelScope, Outcome]
] = {} ] = {}
scope_manager: ContextManager | None = None scope_manager: Generator[Any, Outcome, None] | None = None
async def start_soon( async def start_soon(
self, self,
@ -133,16 +137,13 @@ class ScopePerTaskNursery(Struct):
if sm is None: if sm is None:
return n.start_soon(async_fn, *args, name=None) return n.start_soon(async_fn, *args, name=None)
# per_task_cs = CancelScope()
new_task: Task | None = None new_task: Task | None = None
to_return: tuple[Any] | None = None to_return: tuple[Any] | None = None
# NOTE: what do we enforce as a signature for the # NOTE: what do we enforce as a signature for the
# `@task_scope_manager` here? # `@task_scope_manager` here?
mngr = sm( mngr = sm(nursery=n)
nursery=n,
# scope=per_task_cs,
)
async def _start_wrapped_in_scope( async def _start_wrapped_in_scope(
task_status: TaskStatus[ task_status: TaskStatus[
tuple[CancelScope, Task] tuple[CancelScope, Task]
@ -153,13 +154,6 @@ class ScopePerTaskNursery(Struct):
# TODO: this was working before?! # TODO: this was working before?!
# nonlocal to_return # nonlocal to_return
task = trio.lowlevel.current_task()
# self._scopes[per_task_cs] = task
# NOTE: we actually don't need this since the user can
# just to it themselves inside mngr!
# with per_task_cs:
# execute up to the first yield # execute up to the first yield
try: try:
to_return: tuple[Any] = next(mngr) to_return: tuple[Any] = next(mngr)
@ -206,15 +200,9 @@ class ScopePerTaskNursery(Struct):
return to_return return to_return
# TODO: you could wrap your output task handle in this? # TODO: define a decorator to runtime type check that this a generator
# class TaskHandle(Struct): # with a single yield that also delivers a value (of some std type) from
# task: Task # the yield expression?
# cs: CancelScope
# outcome: TaskOutcome
# TODO: maybe just make this a generator with a single yield that also
# delivers a value (of some std type) from the yield expression?
# @trio.task_scope_manager # @trio.task_scope_manager
def add_task_handle_and_crash_handling( def add_task_handle_and_crash_handling(
nursery: Nursery, nursery: Nursery,
@ -224,20 +212,35 @@ def add_task_handle_and_crash_handling(
# they want below? # they want below?
# scope: CancelScope, # scope: CancelScope,
) -> Generator[None, list[Any]]: ) -> Generator[
Any,
Outcome,
None,
]:
'''
A customizable, user defined "task scope manager".
task_outcome = TaskOutcome() With this specially crafted single-yield generator function you can
add more granular controls around every task spawned by `trio` B)
'''
# if you need it you can ask trio for the task obj # if you need it you can ask trio for the task obj
task: Task = trio.lowlevel.current_task() task: Task = trio.lowlevel.current_task()
print(f'Spawning task: {task.name}') print(f'Spawning task: {task.name}')
# yields back when task is terminated, cancelled, returns. # User defined "task handle" for more granular supervision
# of each spawned task as needed for their particular usage.
task_outcome = TaskOutcome(task)
# NOTE: if wanted the user could wrap the output task handle however
# they want!
# class TaskHandle(Struct):
# task: Task
# cs: CancelScope
# outcome: TaskOutcome
# this yields back when the task is terminated, cancelled or returns.
try: try:
# XXX: wait, this isn't doing anything right since we'd have to
# manually activate this scope using something like:
# `task._activate_cancel_status(cs._cancel_status)` ??
# oh wait, but `.__enter__()` does all that already?
with CancelScope() as cs: with CancelScope() as cs:
# the yielded value(s) here are what are returned to the # the yielded value(s) here are what are returned to the
@ -245,12 +248,16 @@ def add_task_handle_and_crash_handling(
lowlevel_outcome: Outcome = yield (task_outcome, cs) lowlevel_outcome: Outcome = yield (task_outcome, cs)
task_outcome._set_outcome(lowlevel_outcome) task_outcome._set_outcome(lowlevel_outcome)
# Adds "crash handling" from `pdbp` by entering
# a REPL on std errors.
except Exception as err: except Exception as err:
# Adds "crash handling" from `pdbp` by entering print(f'{task.name} crashed, entering debugger!')
# a REPL on std errors.
pdbp.xpm() pdbp.xpm()
raise raise
finally:
print(f'{task.name} Exitted')
@acm @acm
async def open_nursery( async def open_nursery(
@ -264,11 +271,6 @@ async def open_nursery(
) )
async def sleep_then_err():
await trio.sleep(1)
assert 0
async def sleep_then_return_val(val: str): async def sleep_then_return_val(val: str):
await trio.sleep(0.2) await trio.sleep(0.2)
return val return val
@ -309,7 +311,10 @@ if __name__ == '__main__':
print(f'{res} -> GOT EXPECTED TASK VALUE') print(f'{res} -> GOT EXPECTED TASK VALUE')
await trio.sleep(0.6) await trio.sleep(0.6)
print('Cancelling and waiting for CRASH..') print(
'Cancelling and waiting on {err_outcome.lowlevel_task} '
'to CRASH..'
)
cs.cancel() cs.cancel()
trio.run(main) trio.run(main)