Bump "task-manager(-nursery)" naming, add logging
Namely just renaming any `trio.Nursery` instances to `tn`, the primary `@acm`-API to `.trionics.open_taskman()` and change out all `print()`s for logger instances with 'info' level enabled by the mod-script usage.oco_supervisor_prototype
							parent
							
								
									90db6f2299
								
							
						
					
					
						commit
						653f23a04c
					
				|  | @ -15,13 +15,14 @@ | ||||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| Erlang-style (ish) "one-cancels-one" nursery. | Erlang-style (ish) "one-cancels-one" nursery, what we just call | ||||||
|  | a "task manager". | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
| from contextlib import ( | from contextlib import ( | ||||||
|     asynccontextmanager as acm, |     asynccontextmanager as acm, | ||||||
|     contextmanager as cm, |     # contextmanager as cm, | ||||||
| ) | ) | ||||||
| from functools import partial | from functools import partial | ||||||
| from typing import ( | from typing import ( | ||||||
|  | @ -35,11 +36,17 @@ from outcome import ( | ||||||
| ) | ) | ||||||
| from msgspec import Struct | from msgspec import Struct | ||||||
| import trio | import trio | ||||||
| from trio._core._run import ( | from trio import ( | ||||||
|     Task, |     TaskStatus, | ||||||
|     CancelScope, |     CancelScope, | ||||||
|     Nursery, |     Nursery, | ||||||
| ) | ) | ||||||
|  | from trio.lowlevel import ( | ||||||
|  |     Task, | ||||||
|  | ) | ||||||
|  | from tractor.log import get_logger | ||||||
|  | 
 | ||||||
|  | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class TaskOutcome(Struct): | class TaskOutcome(Struct): | ||||||
|  | @ -101,7 +108,7 @@ class TaskOutcome(Struct): | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class TaskManagerNursery(Struct): | class TaskManagerNursery(Struct): | ||||||
|     _n: Nursery |     _tn: Nursery | ||||||
|     _scopes: dict[ |     _scopes: dict[ | ||||||
|         Task, |         Task, | ||||||
|         tuple[CancelScope, Outcome] |         tuple[CancelScope, Outcome] | ||||||
|  | @ -127,20 +134,20 @@ class TaskManagerNursery(Struct): | ||||||
|         # assert len(new_tasks) == 1 |         # assert len(new_tasks) == 1 | ||||||
|         # task = new_tasks.pop() |         # task = new_tasks.pop() | ||||||
| 
 | 
 | ||||||
|         n: Nursery = self._n |         tn: Nursery = self._tn | ||||||
| 
 | 
 | ||||||
|         sm = self.task_manager |         sm = self.task_manager | ||||||
|         # we do default behavior of a scope-per-nursery |         # we do default behavior of a scope-per-nursery | ||||||
|         # if the user did not provide a task manager. |         # if the user did not provide a task manager. | ||||||
|         if sm is None: |         if sm is None: | ||||||
|             return n.start_soon(async_fn, *args, name=None) |             return tn.start_soon(async_fn, *args, name=None) | ||||||
| 
 | 
 | ||||||
|         new_task: Task | None = None |         # new_task: Task|None = None | ||||||
|         to_return: tuple[Any] | None = None |         to_return: tuple[Any] | None = None | ||||||
| 
 | 
 | ||||||
|         # NOTE: what do we enforce as a signature for the |         # NOTE: what do we enforce as a signature for the | ||||||
|         # `@task_scope_manager` here? |         # `@task_scope_manager` here? | ||||||
|         mngr = sm(nursery=n) |         mngr = sm(nursery=tn) | ||||||
| 
 | 
 | ||||||
|         async def _start_wrapped_in_scope( |         async def _start_wrapped_in_scope( | ||||||
|             task_status: TaskStatus[ |             task_status: TaskStatus[ | ||||||
|  | @ -190,7 +197,7 @@ class TaskManagerNursery(Struct): | ||||||
|             else: |             else: | ||||||
|                 raise RuntimeError(f"{mngr} didn't stop!") |                 raise RuntimeError(f"{mngr} didn't stop!") | ||||||
| 
 | 
 | ||||||
|         to_return = await n.start(_start_wrapped_in_scope) |         to_return = await tn.start(_start_wrapped_in_scope) | ||||||
|         assert to_return is not None |         assert to_return is not None | ||||||
| 
 | 
 | ||||||
|         # TODO: use the fancy type-check-time type signature stuff from |         # TODO: use the fancy type-check-time type signature stuff from | ||||||
|  | @ -222,7 +229,7 @@ def add_task_handle_and_crash_handling( | ||||||
|     ''' |     ''' | ||||||
|     # if you need it you can ask trio for the task obj |     # if you need it you can ask trio for the task obj | ||||||
|     task: Task = trio.lowlevel.current_task() |     task: Task = trio.lowlevel.current_task() | ||||||
|     print(f'Spawning task: {task.name}') |     log.info(f'Spawning task: {task.name}') | ||||||
| 
 | 
 | ||||||
|     # User defined "task handle" for more granular supervision |     # User defined "task handle" for more granular supervision | ||||||
|     # of each spawned task as needed for their particular usage. |     # of each spawned task as needed for their particular usage. | ||||||
|  | @ -247,18 +254,27 @@ def add_task_handle_and_crash_handling( | ||||||
|     # Adds "crash handling" from `pdbp` by entering |     # Adds "crash handling" from `pdbp` by entering | ||||||
|     # a REPL on std errors. |     # a REPL on std errors. | ||||||
|     except Exception as err: |     except Exception as err: | ||||||
|         print(f'{task.name} crashed, entering debugger!') |  | ||||||
|         if debug_mode: |         if debug_mode: | ||||||
|  |             log.exception( | ||||||
|  |                 f'{task.name} crashed, entering debugger!' | ||||||
|  |             ) | ||||||
|             import pdbp |             import pdbp | ||||||
|             pdbp.xpm() |             pdbp.xpm() | ||||||
|         raise | 
 | ||||||
|  |         raise err | ||||||
| 
 | 
 | ||||||
|     finally: |     finally: | ||||||
|         print(f'{task.name} Exitted') |         log.info( | ||||||
|  |             f'Task exitted\n' | ||||||
|  |             f')>\n' | ||||||
|  |             f' |_{task}\n' | ||||||
|  |             # ^^TODO? use sclang formatter? | ||||||
|  |             # -[ ] .devx.pformat.nest_from_op()` yo! | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def open_nursery( | async def open_taskman( | ||||||
|     task_manager: Generator[Any, Outcome, None] | None = None, |     task_manager: Generator[Any, Outcome, None] | None = None, | ||||||
| 
 | 
 | ||||||
|     **lowlevel_nursery_kwargs, |     **lowlevel_nursery_kwargs, | ||||||
|  | @ -281,7 +297,7 @@ async def ensure_cancelled(): | ||||||
| 
 | 
 | ||||||
|     except trio.Cancelled: |     except trio.Cancelled: | ||||||
|         task = trio.lowlevel.current_task() |         task = trio.lowlevel.current_task() | ||||||
|         print(f'heyyo ONLY {task.name} was cancelled as expected B)') |         log.cancel(f'heyyo ONLY {task.name} was cancelled as expected B)') | ||||||
|         assert 0 |         assert 0 | ||||||
| 
 | 
 | ||||||
|     except BaseException: |     except BaseException: | ||||||
|  | @ -290,30 +306,33 @@ async def ensure_cancelled(): | ||||||
| 
 | 
 | ||||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||||
| 
 | 
 | ||||||
|  |     from tractor.log import get_console_log | ||||||
|  |     get_console_log(level='info') | ||||||
|  | 
 | ||||||
|     async def main(): |     async def main(): | ||||||
|         async with open_nursery( |         async with open_taskman( | ||||||
|             task_manager=partial( |             task_manager=partial( | ||||||
|                 add_task_handle_and_crash_handling, |                 add_task_handle_and_crash_handling, | ||||||
|                 debug_mode=True, |                 debug_mode=True, | ||||||
|             ), |             ), | ||||||
|         ) as sn: |         ) as tm: | ||||||
|             for _ in range(3): |             for _ in range(3): | ||||||
|                 outcome, _ = await sn.start_soon(trio.sleep_forever) |                 outcome, _ = await tm.start_soon(trio.sleep_forever) | ||||||
| 
 | 
 | ||||||
|             # extra task we want to engage in debugger post mortem. |             # extra task we want to engage in debugger post mortem. | ||||||
|             err_outcome, cs = await sn.start_soon(ensure_cancelled) |             err_outcome, cs = await tm.start_soon(ensure_cancelled) | ||||||
| 
 | 
 | ||||||
|             val: str = 'yoyoyo' |             val: str = 'yoyoyo' | ||||||
|             val_outcome, _ = await sn.start_soon( |             val_outcome, _ = await tm.start_soon( | ||||||
|                 sleep_then_return_val, |                 sleep_then_return_val, | ||||||
|                 val, |                 val, | ||||||
|             ) |             ) | ||||||
|             res = await val_outcome.wait_for_result() |             res = await val_outcome.wait_for_result() | ||||||
|             assert res == val |             assert res == val | ||||||
|             print(f'{res} -> GOT EXPECTED TASK VALUE') |             log.info(f'{res} -> GOT EXPECTED TASK VALUE') | ||||||
| 
 | 
 | ||||||
|             await trio.sleep(0.6) |             await trio.sleep(0.6) | ||||||
|             print( |             log.cancel( | ||||||
|                 f'Cancelling and waiting on {err_outcome.lowlevel_task} ' |                 f'Cancelling and waiting on {err_outcome.lowlevel_task} ' | ||||||
|                 'to CRASH..' |                 'to CRASH..' | ||||||
|             ) |             ) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue