Bump "task-manager(-nursery)" naming, add logging

Namely just renaming any `trio.Nursery` instances to `tn`, the primary
`@acm`-API to `.trionics.open_taskman()` and change out all `print()`s
for logger instances with 'info' level enabled by the mod-script usage.
oco_supervisor_prototype
Tyler Goodlet 2025-05-11 20:19:10 -04:00
parent 3acf69be8b
commit 05a02d97b4
1 changed files with 42 additions and 23 deletions

View File

@ -15,13 +15,14 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Erlang-style (ish) "one-cancels-one" nursery.
Erlang-style (ish) "one-cancels-one" nursery, what we just call
a "task manager".
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
# contextmanager as cm,
)
from functools import partial
from typing import (
@ -35,11 +36,17 @@ from outcome import (
)
from msgspec import Struct
import trio
from trio._core._run import (
Task,
from trio import (
TaskStatus,
CancelScope,
Nursery,
)
from trio.lowlevel import (
Task,
)
from tractor.log import get_logger
log = get_logger(__name__)
class TaskOutcome(Struct):
@ -101,7 +108,7 @@ class TaskOutcome(Struct):
class TaskManagerNursery(Struct):
_n: Nursery
_tn: Nursery
_scopes: dict[
Task,
tuple[CancelScope, Outcome]
@ -127,20 +134,20 @@ class TaskManagerNursery(Struct):
# assert len(new_tasks) == 1
# task = new_tasks.pop()
n: Nursery = self._n
tn: Nursery = self._tn
sm = self.task_manager
# we do default behavior of a scope-per-nursery
# if the user did not provide a task manager.
if sm is None:
return n.start_soon(async_fn, *args, name=None)
return tn.start_soon(async_fn, *args, name=None)
new_task: Task | None = None
# new_task: Task|None = None
to_return: tuple[Any] | None = None
# NOTE: what do we enforce as a signature for the
# `@task_scope_manager` here?
mngr = sm(nursery=n)
mngr = sm(nursery=tn)
async def _start_wrapped_in_scope(
task_status: TaskStatus[
@ -190,7 +197,7 @@ class TaskManagerNursery(Struct):
else:
raise RuntimeError(f"{mngr} didn't stop!")
to_return = await n.start(_start_wrapped_in_scope)
to_return = await tn.start(_start_wrapped_in_scope)
assert to_return is not None
# TODO: use the fancy type-check-time type signature stuff from
@ -222,7 +229,7 @@ def add_task_handle_and_crash_handling(
'''
# if you need it you can ask trio for the task obj
task: Task = trio.lowlevel.current_task()
print(f'Spawning task: {task.name}')
log.info(f'Spawning task: {task.name}')
# User defined "task handle" for more granular supervision
# of each spawned task as needed for their particular usage.
@ -247,18 +254,27 @@ def add_task_handle_and_crash_handling(
# Adds "crash handling" from `pdbp` by entering
# a REPL on std errors.
except Exception as err:
print(f'{task.name} crashed, entering debugger!')
if debug_mode:
log.exception(
f'{task.name} crashed, entering debugger!'
)
import pdbp
pdbp.xpm()
raise
raise err
finally:
print(f'{task.name} Exitted')
log.info(
f'Task exitted\n'
f')>\n'
f' |_{task}\n'
# ^^TODO? use sclang formatter?
# -[ ] .devx.pformat.nest_from_op()` yo!
)
@acm
async def open_nursery(
async def open_taskman(
task_manager: Generator[Any, Outcome, None] | None = None,
**lowlevel_nursery_kwargs,
@ -281,7 +297,7 @@ async def ensure_cancelled():
except trio.Cancelled:
task = trio.lowlevel.current_task()
print(f'heyyo ONLY {task.name} was cancelled as expected B)')
log.cancel(f'heyyo ONLY {task.name} was cancelled as expected B)')
assert 0
except BaseException:
@ -290,30 +306,33 @@ async def ensure_cancelled():
if __name__ == '__main__':
from tractor.log import get_console_log
get_console_log(level='info')
async def main():
async with open_nursery(
async with open_taskman(
task_manager=partial(
add_task_handle_and_crash_handling,
debug_mode=True,
),
) as sn:
) as tm:
for _ in range(3):
outcome, _ = await sn.start_soon(trio.sleep_forever)
outcome, _ = await tm.start_soon(trio.sleep_forever)
# extra task we want to engage in debugger post mortem.
err_outcome, cs = await sn.start_soon(ensure_cancelled)
err_outcome, cs = await tm.start_soon(ensure_cancelled)
val: str = 'yoyoyo'
val_outcome, _ = await sn.start_soon(
val_outcome, _ = await tm.start_soon(
sleep_then_return_val,
val,
)
res = await val_outcome.wait_for_result()
assert res == val
print(f'{res} -> GOT EXPECTED TASK VALUE')
log.info(f'{res} -> GOT EXPECTED TASK VALUE')
await trio.sleep(0.6)
print(
log.cancel(
f'Cancelling and waiting on {err_outcome.lowlevel_task} '
'to CRASH..'
)