diff --git a/examples/debugging/asyncio_bp.py b/examples/debugging/asyncio_bp.py index b32ad1d8..baddfe03 100644 --- a/examples/debugging/asyncio_bp.py +++ b/examples/debugging/asyncio_bp.py @@ -77,7 +77,9 @@ async def main( ) -> None: - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + # debug_mode=True, + ) as n: p = await n.start_actor( 'aio_daemon', diff --git a/examples/debugging/multi_daemon_subactors.py b/examples/debugging/multi_daemon_subactors.py index ea5fe005..80ef933c 100644 --- a/examples/debugging/multi_daemon_subactors.py +++ b/examples/debugging/multi_daemon_subactors.py @@ -4,9 +4,15 @@ import trio async def breakpoint_forever(): "Indefinitely re-enter debugger in child actor." - while True: - yield 'yo' - await tractor.breakpoint() + try: + while True: + yield 'yo' + await tractor.breakpoint() + except BaseException: + tractor.log.get_console_log().exception( + 'Cancelled while trying to enter pause point!' + ) + raise async def name_error(): @@ -19,7 +25,7 @@ async def main(): """ async with tractor.open_nursery( debug_mode=True, - loglevel='error', + loglevel='cancel', ) as n: p0 = await n.start_actor('bp_forever', enable_modules=[__name__]) diff --git a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py index 348a5ee9..8df52e3b 100644 --- a/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py +++ b/examples/debugging/multi_nested_subactors_error_up_through_nurseries.py @@ -45,6 +45,7 @@ async def spawn_until(depth=0): ) +# TODO: notes on the new boxed-relayed errors through proxy actors async def main(): """The main ``tractor`` routine. diff --git a/examples/debugging/multi_subactors.py b/examples/debugging/multi_subactors.py index 259d5268..22b13ac8 100644 --- a/examples/debugging/multi_subactors.py +++ b/examples/debugging/multi_subactors.py @@ -38,6 +38,7 @@ async def main(): """ async with tractor.open_nursery( debug_mode=True, + # loglevel='runtime', ) as n: # Spawn both actors, don't bother with collecting results diff --git a/examples/debugging/per_actor_debug.py b/examples/debugging/per_actor_debug.py index 1db56981..c1bf5cab 100644 --- a/examples/debugging/per_actor_debug.py +++ b/examples/debugging/per_actor_debug.py @@ -23,5 +23,6 @@ async def main(): n.start_soon(debug_actor.run, die) n.start_soon(crash_boi.run, die) + if __name__ == '__main__': trio.run(main) diff --git a/examples/debugging/root_actor_breakpoint_forever.py b/examples/debugging/root_actor_breakpoint_forever.py index 3536a751..88a6e0e9 100644 --- a/examples/debugging/root_actor_breakpoint_forever.py +++ b/examples/debugging/root_actor_breakpoint_forever.py @@ -2,10 +2,13 @@ import trio import tractor -async def main(): +async def main( + registry_addrs: tuple[str, int]|None = None +): async with tractor.open_root_actor( debug_mode=True, + # loglevel='runtime', ): while True: await tractor.breakpoint() diff --git a/examples/debugging/subactor_breakpoint.py b/examples/debugging/subactor_breakpoint.py index bcc304d1..4fdff484 100644 --- a/examples/debugging/subactor_breakpoint.py +++ b/examples/debugging/subactor_breakpoint.py @@ -3,17 +3,20 @@ import tractor async def breakpoint_forever(): - """Indefinitely re-enter debugger in child actor. - """ + ''' + Indefinitely re-enter debugger in child actor. + + ''' while True: await trio.sleep(0.1) - await tractor.breakpoint() + await tractor.pause() async def main(): async with tractor.open_nursery( debug_mode=True, + loglevel='cancel', ) as n: portal = await n.run_in_actor( diff --git a/examples/debugging/subactor_error.py b/examples/debugging/subactor_error.py index e38c1614..d7aee447 100644 --- a/examples/debugging/subactor_error.py +++ b/examples/debugging/subactor_error.py @@ -3,16 +3,26 @@ import tractor async def name_error(): - getattr(doggypants) + getattr(doggypants) # noqa (on purpose) async def main(): async with tractor.open_nursery( debug_mode=True, - ) as n: + # loglevel='transport', + ) as an: - portal = await n.run_in_actor(name_error) - await portal.result() + # TODO: ideally the REPL arrives at this frame in the parent, + # ABOVE the @api_frame of `Portal.run_in_actor()` (which + # should eventually not even be a portal method ... XD) + # await tractor.pause() + p: tractor.Portal = await an.run_in_actor(name_error) + + # with this style, should raise on this line + await p.result() + + # with this alt style should raise at `open_nusery()` + # return await p.result() if __name__ == '__main__': diff --git a/examples/debugging/sync_bp.py b/examples/debugging/sync_bp.py new file mode 100644 index 00000000..efa4e405 --- /dev/null +++ b/examples/debugging/sync_bp.py @@ -0,0 +1,75 @@ +import trio +import tractor + + +def sync_pause( + use_builtin: bool = True, + error: bool = False, +): + if use_builtin: + breakpoint(hide_tb=False) + + else: + tractor.pause_from_sync() + + if error: + raise RuntimeError('yoyo sync code error') + + +@tractor.context +async def start_n_sync_pause( + ctx: tractor.Context, +): + actor: tractor.Actor = tractor.current_actor() + + # sync to parent-side task + await ctx.started() + + print(f'entering SYNC PAUSE in {actor.uid}') + sync_pause() + print(f'back from SYNC PAUSE in {actor.uid}') + + +async def main() -> None: + async with tractor.open_nursery( + # NOTE: required for pausing from sync funcs + maybe_enable_greenback=True, + debug_mode=True, + ) as an: + + p: tractor.Portal = await an.start_actor( + 'subactor', + enable_modules=[__name__], + # infect_asyncio=True, + debug_mode=True, + loglevel='cancel', + ) + + # TODO: 3 sub-actor usage cases: + # -[ ] via a `.run_in_actor()` call + # -[ ] via a `.run()` + # -[ ] via a `.open_context()` + # + async with p.open_context( + start_n_sync_pause, + ) as (ctx, first): + assert first is None + + await tractor.pause() + sync_pause() + + # TODO: make this work!! + await trio.to_thread.run_sync( + sync_pause, + abandon_on_cancel=False, + ) + + await ctx.cancel() + + # TODO: case where we cancel from trio-side while asyncio task + # has debugger lock? + await p.cancel_actor() + + +if __name__ == '__main__': + trio.run(main) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 3fcf71f9..0de2020d 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -1025,3 +1025,67 @@ def test_different_debug_mode_per_actor( # instead crashed completely assert "tractor._exceptions.RemoteActorError: ('crash_boi'" in before assert "RuntimeError" in before + + + +def test_pause_from_sync( + spawn, + ctlc: bool +): + ''' + Verify we can use the `pdbp` REPL from sync functions AND from + any thread spawned with `trio.to_thread.run_sync()`. + + `examples/debugging/sync_bp.py` + + ''' + child = spawn('sync_bp') + child.expect(PROMPT) + assert_before( + child, + [ + '`greenback` portal opened!', + # pre-prompt line + _pause_msg, "('root'", + ] + ) + if ctlc: + do_ctlc(child) + child.sendline('c') + child.expect(PROMPT) + + # XXX shouldn't see gb loaded again + before = str(child.before.decode()) + assert not in_prompt_msg( + before, + ['`greenback` portal opened!'], + ) + assert_before( + child, + [_pause_msg, "('root'",], + ) + + if ctlc: + do_ctlc(child) + child.sendline('c') + child.expect(PROMPT) + assert_before( + child, + [_pause_msg, "('subactor'",], + ) + + if ctlc: + do_ctlc(child) + child.sendline('c') + child.expect(PROMPT) + # non-main thread case + # TODO: should we agument the pre-prompt msg in this case? + assert_before( + child, + [_pause_msg, "('root'",], + ) + + if ctlc: + do_ctlc(child) + child.sendline('c') + child.expect(pexpect.EOF) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 5ac463ea..8d34bef4 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -601,7 +601,8 @@ def test_echoserver_detailed_mechanics( pass else: pytest.fail( - "stream wasn't stopped after sentinel?!") + 'stream not stopped after sentinel ?!' + ) # TODO: the case where this blocks and # is cancelled by kbi or out of task cancellation @@ -613,3 +614,37 @@ def test_echoserver_detailed_mechanics( else: trio.run(main) + + +# TODO: debug_mode tests once we get support for `asyncio`! +# +# -[ ] need tests to wrap both scripts: +# - [ ] infected_asyncio_echo_server.py +# - [ ] debugging/asyncio_bp.py +# -[ ] consider moving ^ (some of) these ^ to `test_debugger`? +# +# -[ ] missing impl outstanding includes: +# - [x] for sync pauses we need to ensure we open yet another +# `greenback` portal in the asyncio task +# => completed using `.bestow_portal(task)` inside +# `.to_asyncio._run_asyncio_task()` right? +# -[ ] translation func to get from `asyncio` task calling to +# `._debug.wait_for_parent_stdin_hijack()` which does root +# call to do TTY locking. +# +def test_sync_breakpoint(): + ''' + Verify we can do sync-func/code breakpointing using the + `breakpoint()` builtin inside infected mode actors. + + ''' + pytest.xfail('This support is not implemented yet!') + + +def test_debug_mode_crash_handling(): + ''' + Verify mult-actor crash handling works with a combo of infected-`asyncio`-mode + and normal `trio` actors despite nested process trees. + + ''' + pytest.xfail('This support is not implemented yet!') diff --git a/tractor/_child.py b/tractor/_child.py index bd1e830e..4226ae90 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -36,6 +36,7 @@ def parse_ipaddr(arg): if __name__ == "__main__": + __tracebackhide__: bool = True parser = argparse.ArgumentParser() parser.add_argument("--uid", type=parse_uid) diff --git a/tractor/_context.py b/tractor/_context.py index a2860f3d..51b23302 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -351,7 +351,7 @@ class Context: by the runtime in 2 ways: - by entering ``Portal.open_context()`` which is the primary public API for any "caller" task or, - - by the RPC machinery's `._runtime._invoke()` as a `ctx` arg + - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg to a remotely scheduled "callee" function. AND is always constructed using the below ``mk_context()``. @@ -361,10 +361,10 @@ class Context: `trio.Task`s. Contexts are allocated on each side of any task RPC-linked msg dialog, i.e. for every request to a remote actor from a `Portal`. On the "callee" side a context is - always allocated inside ``._runtime._invoke()``. + always allocated inside ``._rpc._invoke()``. - # TODO: more detailed writeup on cancellation, error and - # streaming semantics.. + TODO: more detailed writeup on cancellation, error and + streaming semantics.. A context can be cancelled and (possibly eventually restarted) from either side of the underlying IPC channel, it can also open task @@ -1209,7 +1209,9 @@ class Context: # await pause() log.warning( 'Stream was terminated by EoC\n\n' - f'{repr(eoc)}\n' + # NOTE: won't show the error but + # does show txt followed by IPC msg. + f'{str(eoc)}\n' ) finally: @@ -1306,7 +1308,7 @@ class Context: # `._cancel_called == True`. not raise_overrun_from_self and isinstance(remote_error, RemoteActorError) - and remote_error.msgdata['type_str'] == 'StreamOverrun' + and remote_error.msgdata['boxed_type_str'] == 'StreamOverrun' and tuple(remote_error.msgdata['sender']) == our_uid ): # NOTE: we set the local scope error to any "self @@ -1883,6 +1885,19 @@ class Context: return False +# TODO: exception tb masking by using a manual +# `.__aexit__()`/.__aenter__()` pair on a type? +# => currently this is one of the few places we can't easily +# mask errors - on the exit side of a `Portal.open_context()`.. +# there's # => currently this is one of the few places we can't +# there's 2 ways to approach it: +# - manually write an @acm type as per above +# - use `contextlib.AsyncContextDecorator` to override the default +# impl to suppress traceback frames: +# * https://docs.python.org/3/library/contextlib.html#contextlib.AsyncContextDecorator +# * https://docs.python.org/3/library/contextlib.html#contextlib.ContextDecorator +# - also we could just override directly the underlying +# `contextlib._AsyncGeneratorContextManager`? @acm async def open_context_from_portal( portal: Portal, diff --git a/tractor/_entry.py b/tractor/_entry.py index 0ac0dc47..21c9ae48 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -106,6 +106,7 @@ def _trio_main( Entry point for a `trio_run_in_process` subactor. ''' + __tracebackhide__: bool = True _state._current_actor = actor trio_main = partial( async_main, diff --git a/tractor/_root.py b/tractor/_root.py index e1a7fb6c..4469f3ed 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -22,9 +22,10 @@ from contextlib import asynccontextmanager from functools import partial import importlib import logging +import os import signal import sys -import os +from typing import Callable import warnings @@ -78,6 +79,8 @@ async def open_root_actor( # enables the multi-process debugger support debug_mode: bool = False, + maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support + enable_stack_on_sig: bool = False, # internal logging loglevel: str|None = None, @@ -94,12 +97,41 @@ async def open_root_actor( Runtime init entry point for ``tractor``. ''' + # TODO: stick this in a `@cm` defined in `devx._debug`? + # # Override the global debugger hook to make it play nice with # ``trio``, see much discussion in: # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 - builtin_bp_handler = sys.breakpointhook - orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None) - os.environ['PYTHONBREAKPOINT'] = 'tractor.devx._debug.pause_from_sync' + builtin_bp_handler: Callable = sys.breakpointhook + orig_bp_path: str|None = os.environ.get( + 'PYTHONBREAKPOINT', + None, + ) + if ( + debug_mode + and maybe_enable_greenback + and await _debug.maybe_init_greenback( + raise_not_found=False, + ) + ): + os.environ['PYTHONBREAKPOINT'] = ( + 'tractor.devx._debug.pause_from_sync' + ) + else: + # TODO: disable `breakpoint()` by default (without + # `greenback`) since it will break any multi-actor + # usage by a clobbered TTY's stdstreams! + def block_bps(*args, **kwargs): + raise RuntimeError( + 'Trying to use `breakpoint()` eh?\n' + 'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n' + 'If you need to use it please install `greenback` and set ' + '`debug_mode=True` when opening the runtime ' + '(either via `.open_nursery()` or `open_root_actor()`)\n' + ) + + sys.breakpointhook = block_bps + # os.environ['PYTHONBREAKPOINT'] = None # attempt to retreive ``trio``'s sigint handler and stash it # on our debugger lock state. @@ -179,7 +211,11 @@ async def open_root_actor( assert _log # TODO: factor this into `.devx._stackscope`!! - if debug_mode: + if ( + debug_mode + and + enable_stack_on_sig + ): try: logger.info('Enabling `stackscope` traces on SIGUSR1') from .devx import enable_stack_on_sig @@ -356,12 +392,14 @@ async def open_root_actor( _state._last_actor_terminated = actor # restore built-in `breakpoint()` hook state - sys.breakpointhook = builtin_bp_handler - if orig_bp_path is not None: - os.environ['PYTHONBREAKPOINT'] = orig_bp_path - else: - # clear env back to having no entry - os.environ.pop('PYTHONBREAKPOINT') + if debug_mode: + if builtin_bp_handler is not None: + sys.breakpointhook = builtin_bp_handler + if orig_bp_path is not None: + os.environ['PYTHONBREAKPOINT'] = orig_bp_path + else: + # clear env back to having no entry + os.environ.pop('PYTHONBREAKPOINT') logger.runtime("Root actor terminated") diff --git a/tractor/_rpc.py b/tractor/_rpc.py index e50c80dd..b108fdda 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -26,7 +26,6 @@ from contextlib import ( from functools import partial import inspect from pprint import pformat -from types import ModuleType from typing import ( Any, Callable, @@ -332,27 +331,6 @@ async def _errors_relayed_via_ipc( actor._ongoing_rpc_tasks.set() -_gb_mod: ModuleType|None|False = None - - -async def maybe_import_gb(): - global _gb_mod - if _gb_mod is False: - return - - try: - import greenback - _gb_mod = greenback - await greenback.ensure_portal() - - except ModuleNotFoundError: - log.debug( - '`greenback` is not installed.\n' - 'No sync debug support!\n' - ) - _gb_mod = False - - async def _invoke( actor: Actor, @@ -380,7 +358,9 @@ async def _invoke( treat_as_gen: bool = False if _state.debug_mode(): - await maybe_import_gb() + # XXX for .pause_from_sync()` usage we need to make sure + # `greenback` is boostrapped in the subactor! + await _debug.maybe_init_greenback() # TODO: possibly a specially formatted traceback # (not sure what typing is for this..)? diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 66a5381c..ed7b4503 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -136,16 +136,16 @@ class Actor: msg_buffer_size: int = 2**6 # nursery placeholders filled in by `async_main()` after fork - _root_n: Nursery | None = None - _service_n: Nursery | None = None - _server_n: Nursery | None = None + _root_n: Nursery|None = None + _service_n: Nursery|None = None + _server_n: Nursery|None = None # Information about `__main__` from parent _parent_main_data: dict[str, str] - _parent_chan_cs: CancelScope | None = None + _parent_chan_cs: CancelScope|None = None # syncs for setup/teardown sequences - _server_down: trio.Event | None = None + _server_down: trio.Event|None = None # user toggled crash handling (including monkey-patched in # `trio.open_nursery()` via `.trionics._supervisor` B) @@ -174,7 +174,7 @@ class Actor: spawn_method: str|None = None, # TODO: remove! - arbiter_addr: tuple[str, int] | None = None, + arbiter_addr: tuple[str, int]|None = None, ) -> None: ''' @@ -189,7 +189,7 @@ class Actor: ) self._cancel_complete = trio.Event() - self._cancel_called_by_remote: tuple[str, tuple] | None = None + self._cancel_called_by_remote: tuple[str, tuple]|None = None self._cancel_called: bool = False # retreive and store parent `__main__` data which @@ -245,11 +245,11 @@ class Actor: ] = {} self._listeners: list[trio.abc.Listener] = [] - self._parent_chan: Channel | None = None - self._forkserver_info: tuple | None = None + self._parent_chan: Channel|None = None + self._forkserver_info: tuple|None = None self._actoruid2nursery: dict[ tuple[str, str], - ActorNursery | None, + ActorNursery|None, ] = {} # type: ignore # noqa # when provided, init the registry addresses property from @@ -781,7 +781,7 @@ class Actor: # # side: str|None = None, - msg_buffer_size: int | None = None, + msg_buffer_size: int|None = None, allow_overruns: bool = False, ) -> Context: @@ -846,7 +846,7 @@ class Actor: kwargs: dict, # IPC channel config - msg_buffer_size: int | None = None, + msg_buffer_size: int|None = None, allow_overruns: bool = False, load_nsf: bool = False, @@ -920,11 +920,11 @@ class Actor: async def _from_parent( self, - parent_addr: tuple[str, int] | None, + parent_addr: tuple[str, int]|None, ) -> tuple[ Channel, - list[tuple[str, int]] | None, + list[tuple[str, int]]|None, ]: ''' Bootstrap this local actor's runtime config from its parent by @@ -945,7 +945,7 @@ class Actor: # Initial handshake: swap names. await self._do_handshake(chan) - accept_addrs: list[tuple[str, int]] | None = None + accept_addrs: list[tuple[str, int]]|None = None if self._spawn_method == "trio": # Receive runtime state from our parent parent_data: dict[str, Any] @@ -1009,7 +1009,7 @@ class Actor: handler_nursery: Nursery, *, # (host, port) to bind for channel server - listen_sockaddrs: list[tuple[str, int]] | None = None, + listen_sockaddrs: list[tuple[str, int]]|None = None, task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1466,7 +1466,7 @@ class Actor: async def async_main( actor: Actor, - accept_addrs: tuple[str, int] | None = None, + accept_addrs: tuple[str, int]|None = None, # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to @@ -1475,7 +1475,7 @@ async def async_main( # change this to a simple ``is_subactor: bool`` which will # be False when running as root actor and True when as # a subactor. - parent_addr: tuple[str, int] | None = None, + parent_addr: tuple[str, int]|None = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1498,7 +1498,7 @@ async def async_main( try: # establish primary connection with immediate parent - actor._parent_chan: Channel | None = None + actor._parent_chan: Channel|None = None if parent_addr is not None: ( @@ -1797,7 +1797,7 @@ class Arbiter(Actor): self, name: str, - ) -> tuple[str, int] | None: + ) -> tuple[str, int]|None: for uid, sockaddr in self._registry.items(): if name in uid: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index aa0e9bf1..48135cc9 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -503,7 +503,7 @@ async def trio_proc( }) # track subactor in current nursery - curr_actor = current_actor() + curr_actor: Actor = current_actor() curr_actor._actoruid2nursery[subactor.uid] = actor_nursery # resume caller at next checkpoint now that child is up diff --git a/tractor/_state.py b/tractor/_state.py index 9e4e9473..b76e8ac9 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -30,11 +30,16 @@ if TYPE_CHECKING: _current_actor: Actor|None = None # type: ignore # noqa _last_actor_terminated: Actor|None = None + +# TODO: mk this a `msgspec.Struct`! _runtime_vars: dict[str, Any] = { '_debug_mode': False, '_is_root': False, '_root_mailbox': (None, None), '_registry_addrs': [], + + # for `breakpoint()` support + 'use_greenback': False, } diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 733dd53c..be81e4e6 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -119,11 +119,11 @@ class ActorNursery: name: str, *, bind_addrs: list[tuple[str, int]] = [_default_bind_addr], - rpc_module_paths: list[str] | None = None, - enable_modules: list[str] | None = None, - loglevel: str | None = None, # set log level per subactor - nursery: trio.Nursery | None = None, - debug_mode: bool | None = None, + rpc_module_paths: list[str]|None = None, + enable_modules: list[str]|None = None, + loglevel: str|None = None, # set log level per subactor + nursery: trio.Nursery|None = None, + debug_mode: bool|None = None, infect_asyncio: bool = False, ) -> Portal: ''' @@ -583,7 +583,7 @@ async def open_nursery( finally: msg: str = ( 'Actor-nursery exited\n' - f'|_{an}\n\n' + f'|_{an}\n' ) # shutdown runtime if it was started diff --git a/tractor/devx/_code.py b/tractor/devx/_code.py new file mode 100644 index 00000000..01d64cd1 --- /dev/null +++ b/tractor/devx/_code.py @@ -0,0 +1,177 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Tools for code-object annotation, introspection and mutation +as it pertains to improving the grok-ability of our runtime! + +''' +from __future__ import annotations +import inspect +# import msgspec +# from pprint import pformat +from types import ( + FrameType, + FunctionType, + MethodType, + # CodeType, +) +from typing import ( + # Any, + Callable, + # TYPE_CHECKING, + Type, +) + +from tractor.msg import ( + pretty_struct, + NamespacePath, +) + + +# TODO: yeah, i don't love this and we should prolly just +# write a decorator that actually keeps a stupid ref to the func +# obj.. +def get_class_from_frame(fr: FrameType) -> ( + FunctionType + |MethodType +): + ''' + Attempt to get the function (or method) reference + from a given `FrameType`. + + Verbatim from an SO: + https://stackoverflow.com/a/2220759 + + ''' + args, _, _, value_dict = inspect.getargvalues(fr) + + # we check the first parameter for the frame function is + # named 'self' + if ( + len(args) + and + # TODO: other cases for `@classmethod` etc..?) + args[0] == 'self' + ): + # in that case, 'self' will be referenced in value_dict + instance: object = value_dict.get('self') + if instance: + # return its class + return getattr( + instance, + '__class__', + None, + ) + + # return None otherwise + return None + + +def func_ref_from_frame( + frame: FrameType, +) -> Callable: + func_name: str = frame.f_code.co_name + try: + return frame.f_globals[func_name] + except KeyError: + cls: Type|None = get_class_from_frame(frame) + if cls: + return getattr( + cls, + func_name, + ) + + +# TODO: move all this into new `.devx._code`! +# -[ ] prolly create a `@runtime_api` dec? +# -[ ] ^- make it capture and/or accept buncha optional +# meta-data like a fancier version of `@pdbp.hideframe`. +# +class CallerInfo(pretty_struct.Struct): + rt_fi: inspect.FrameInfo + call_frame: FrameType + + @property + def api_func_ref(self) -> Callable|None: + return func_ref_from_frame(self.rt_fi.frame) + + @property + def api_nsp(self) -> NamespacePath|None: + func: FunctionType = self.api_func_ref + if func: + return NamespacePath.from_ref(func) + + return '' + + @property + def caller_func_ref(self) -> Callable|None: + return func_ref_from_frame(self.call_frame) + + @property + def caller_nsp(self) -> NamespacePath|None: + func: FunctionType = self.caller_func_ref + if func: + return NamespacePath.from_ref(func) + + return '' + + +def find_caller_info( + dunder_var: str = '__runtimeframe__', + iframes:int = 1, + check_frame_depth: bool = True, + +) -> CallerInfo|None: + ''' + Scan up the callstack for a frame with a `dunder_var: str` variable + and return the `iframes` frames above it. + + By default we scan for a `__runtimeframe__` scope var which + denotes a `tractor` API above which (one frame up) is "user + app code" which "called into" the `tractor` method or func. + + TODO: ex with `Portal.open_context()` + + ''' + # TODO: use this instead? + # https://docs.python.org/3/library/inspect.html#inspect.getouterframes + frames: list[inspect.FrameInfo] = inspect.stack() + for fi in frames: + assert ( + fi.function + == + fi.frame.f_code.co_name + ) + this_frame: FrameType = fi.frame + dunder_val: int|None = this_frame.f_locals.get(dunder_var) + if dunder_val: + go_up_iframes: int = ( + dunder_val # could be 0 or `True` i guess? + or + iframes + ) + rt_frame: FrameType = fi.frame + call_frame = rt_frame + for i in range(go_up_iframes): + call_frame = call_frame.f_back + + return CallerInfo( + rt_fi=fi, + call_frame=call_frame, + ) + + return None diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index 3203af1b..255b1dbd 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -33,35 +33,46 @@ from functools import ( import os import signal import sys +import threading import traceback from typing import ( Any, Callable, AsyncIterator, AsyncGenerator, + TYPE_CHECKING, +) +from types import ( + FrameType, + ModuleType, ) -from types import FrameType import pdbp +import sniffio import tractor import trio from trio.lowlevel import current_task -from trio_typing import ( +from trio import ( TaskStatus, # Task, ) -from ..log import get_logger -from .._state import ( +from tractor.log import get_logger +from tractor._state import ( current_actor, is_root_process, debug_mode, ) -from .._exceptions import ( +from tractor._exceptions import ( is_multi_cancelled, ContextCancelled, ) -from .._ipc import Channel +from tractor._ipc import Channel + +if TYPE_CHECKING: + from tractor._runtime import ( + Actor, + ) log = get_logger(__name__) @@ -116,10 +127,36 @@ class Lock: @classmethod def shield_sigint(cls): - cls._orig_sigint_handler = signal.signal( - signal.SIGINT, - shield_sigint_handler, - ) + ''' + Shield out SIGINT handling (which by default triggers + `trio.Task` cancellation) in subactors when the `pdb` REPL + is active. + + Avoids cancellation of the current actor (task) when the + user mistakenly sends ctl-c or a signal is received from + an external request; explicit runtime cancel requests are + allowed until the use exits the REPL session using + 'continue' or 'quit', at which point the orig SIGINT + handler is restored. + + ''' + # + # XXX detect whether we're running from a non-main thread + # in which case schedule the SIGINT shielding override + # to in the main thread. + # https://docs.python.org/3/library/signal.html#signals-and-threads + if not cls.is_main_trio_thread(): + cls._orig_sigint_handler: Callable = trio.from_thread.run_sync( + signal.signal, + signal.SIGINT, + shield_sigint_handler, + ) + + else: + cls._orig_sigint_handler = signal.signal( + signal.SIGINT, + shield_sigint_handler, + ) @classmethod @pdbp.hideframe # XXX NOTE XXX see below in `.pause_from_sync()` @@ -127,13 +164,60 @@ class Lock: # always restore ``trio``'s sigint handler. see notes below in # the pdb factory about the nightmare that is that code swapping # out the handler when the repl activates... - signal.signal(signal.SIGINT, cls._trio_handler) + if not cls.is_main_trio_thread(): + trio.from_thread.run_sync( + signal.signal, + signal.SIGINT, + cls._trio_handler, + ) + else: + signal.signal( + signal.SIGINT, + cls._trio_handler, + ) + cls._orig_sigint_handler = None + @classmethod + def is_main_trio_thread(cls) -> bool: + ''' + Check if we're the "main" thread (as in the first one + started by cpython) AND that it is ALSO the thread that + called `trio.run()` and not some thread spawned with + `trio.to_thread.run_sync()`. + + ''' + is_trio_main = ( + # TODO: since this is private, @oremanj says + # we should just copy the impl for now.. + (is_main_thread := trio._util.is_main_thread()) + and + (async_lib := sniffio.current_async_library()) == 'trio' + ) + if ( + not is_trio_main + and is_main_thread + ): + log.warning( + f'Current async-lib detected by `sniffio`: {async_lib}\n' + ) + return is_trio_main + # XXX apparently unreliable..see ^ + # ( + # threading.current_thread() + # is not threading.main_thread() + # ) + @classmethod def release(cls): try: - cls._debug_lock.release() + if not cls.is_main_trio_thread(): + trio.from_thread.run_sync( + cls._debug_lock.release + ) + else: + cls._debug_lock.release() + except RuntimeError: # uhhh makes no sense but been seeing the non-owner # release error even though this is definitely the task @@ -400,7 +484,6 @@ async def wait_for_parent_stdin_hijack( # this syncs to child's ``Context.started()`` call. async with portal.open_context( - lock_tty_for_child, subactor_uid=actor_uid, @@ -438,11 +521,31 @@ async def wait_for_parent_stdin_hijack( log.debug('Exiting debugger from child') -def mk_mpdb() -> tuple[MultiActorPdb, Callable]: +def mk_mpdb() -> MultiActorPdb: + ''' + Deliver a new `MultiActorPdb`: a multi-process safe `pdbp` + REPL using the magic of SC! + Our `pdb.Pdb` subtype accomplishes multi-process safe debugging + by: + + - mutexing access to the root process' TTY & stdstreams + via an IPC managed `Lock` singleton per process tree. + + - temporarily overriding any subactor's SIGINT handler to shield during + live REPL sessions in sub-actors such that cancellation is + never (mistakenly) triggered by a ctrl-c and instead only + by either explicit requests in the runtime or + + ''' pdb = MultiActorPdb() - # signal.signal = pdbp.hideframe(signal.signal) + # Always shield out SIGINTs for subactors when REPL is active. + # + # XXX detect whether we're running from a non-main thread + # in which case schedule the SIGINT shielding override + # to in the main thread. + # https://docs.python.org/3/library/signal.html#signals-and-threads Lock.shield_sigint() # XXX: These are the important flags mentioned in @@ -451,7 +554,7 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]: pdb.allow_kbdint = True pdb.nosigint = True - return pdb, Lock.unshield_sigint + return pdb def shield_sigint_handler( @@ -464,17 +567,16 @@ def shield_sigint_handler( ''' Specialized, debugger-aware SIGINT handler. - In childred we always ignore to avoid deadlocks since cancellation - should always be managed by the parent supervising actor. The root - is always cancelled on ctrl-c. + In childred we always ignore/shield for SIGINT to avoid + deadlocks since cancellation should always be managed by the + supervising parent actor. The root actor-proces is always + cancelled on ctrl-c. ''' - __tracebackhide__ = True + __tracebackhide__: bool = True + uid_in_debug: tuple[str, str]|None = Lock.global_actor_in_debug - uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug - - actor = current_actor() - # print(f'{actor.uid} in HANDLER with ') + actor: Actor = current_actor() def do_cancel(): # If we haven't tried to cancel the runtime then do that instead @@ -509,7 +611,7 @@ def shield_sigint_handler( return do_cancel() # only set in the actor actually running the REPL - pdb_obj: MultiActorPdb | None = Lock.repl + pdb_obj: MultiActorPdb|None = Lock.repl # root actor branch that reports whether or not a child # has locked debugger. @@ -616,14 +718,20 @@ _pause_msg: str = 'Attaching to pdb REPL in actor' def _set_trace( - actor: tractor.Actor | None = None, - pdb: MultiActorPdb | None = None, + actor: tractor.Actor|None = None, + pdb: MultiActorPdb|None = None, shield: bool = False, extra_frames_up_when_async: int = 1, + hide_tb: bool = True, ): - __tracebackhide__: bool = True - actor: tractor.Actor = actor or current_actor() + __tracebackhide__: bool = hide_tb + + actor: tractor.Actor = ( + actor + or + current_actor() + ) # always start 1 level up from THIS in user code. frame: FrameType|None @@ -669,20 +777,17 @@ def _set_trace( f'Going up frame {i} -> {frame}\n' ) - else: - pdb, undo_sigint = mk_mpdb() - - # we entered the global ``breakpoint()`` built-in from sync - # code? - Lock.local_task_in_debug = 'sync' - + # engage ze REPL + # B~() pdb.set_trace(frame=frame) async def _pause( debug_func: Callable = _set_trace, - release_lock_signal: trio.Event | None = None, + + # NOTE: must be passed in the `.pause_from_sync()` case! + pdb: MultiActorPdb|None = None, # TODO: allow caller to pause despite task cancellation, # exactly the same as wrapping with: @@ -691,9 +796,9 @@ async def _pause( # => the REMAINING ISSUE is that the scope's .__exit__() frame # is always show in the debugger on entry.. and there seems to # be no way to override it?.. - # shield: bool = False, - + # shield: bool = False, + hide_tb: bool = True, task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED ) -> None: @@ -705,10 +810,16 @@ async def _pause( Hopefully we won't need this in the long run. ''' - __tracebackhide__: bool = True - actor = current_actor() - pdb, undo_sigint = mk_mpdb() - task_name: str = trio.lowlevel.current_task().name + __tracebackhide__: bool = hide_tb + actor: Actor = current_actor() + try: + task_name: str = trio.lowlevel.current_task().name + except RuntimeError as rte: + if actor.is_infected_aio(): + raise RuntimeError( + '`tractor.pause[_from_sync]()` not yet supported ' + 'for infected `asyncio` mode!' + ) from rte if ( not Lock.local_pdb_complete @@ -716,9 +827,13 @@ async def _pause( ): Lock.local_pdb_complete = trio.Event() - debug_func = partial( - debug_func, - ) + if debug_func is not None: + debug_func = partial( + debug_func, + ) + + if pdb is None: + pdb: MultiActorPdb = mk_mpdb() # TODO: need a more robust check for the "root" actor if ( @@ -767,6 +882,7 @@ async def _pause( actor.uid, ) Lock.repl = pdb + except RuntimeError: Lock.release() @@ -811,32 +927,26 @@ async def _pause( # TODO: do we want to support using this **just** for the # locking / common code (prolly to help address #320)? # - # if debug_func is None: - # assert release_lock_signal, ( - # 'Must pass `release_lock_signal: trio.Event` if no ' - # 'trace func provided!' - # ) - # print(f"{actor.uid} ENTERING WAIT") - # with trio.CancelScope(shield=True): - # await release_lock_signal.wait() + if debug_func is None: + task_status.started(Lock) - # else: + else: # block here one (at the appropriate frame *up*) where # ``breakpoint()`` was awaited and begin handling stdio. - log.debug('Entering sync world of the `pdb` REPL..') - try: - debug_func( - actor, - pdb, - extra_frames_up_when_async=2, - shield=shield, - ) - except BaseException: - log.exception( - 'Failed to invoke internal `debug_func = ' - f'{debug_func.func.__name__}`\n' - ) - raise + log.debug('Entering sync world of the `pdb` REPL..') + try: + debug_func( + actor, + pdb, + extra_frames_up_when_async=2, + shield=shield, + ) + except BaseException: + log.exception( + 'Failed to invoke internal `debug_func = ' + f'{debug_func.func.__name__}`\n' + ) + raise except bdb.BdbQuit: Lock.release() @@ -862,8 +972,7 @@ async def _pause( async def pause( - debug_func: Callable = _set_trace, - release_lock_signal: trio.Event | None = None, + debug_func: Callable|None = _set_trace, # TODO: allow caller to pause despite task cancellation, # exactly the same as wrapping with: @@ -872,10 +981,11 @@ async def pause( # => the REMAINING ISSUE is that the scope's .__exit__() frame # is always show in the debugger on entry.. and there seems to # be no way to override it?.. - # shield: bool = False, - + # shield: bool = False, - task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED + task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, + + **_pause_kwargs, ) -> None: ''' @@ -920,89 +1030,166 @@ async def pause( task_status.started(cs) return await _pause( debug_func=debug_func, - release_lock_signal=release_lock_signal, shield=True, task_status=task_status, + **_pause_kwargs ) else: return await _pause( debug_func=debug_func, - release_lock_signal=release_lock_signal, shield=False, task_status=task_status, + **_pause_kwargs ) +_gb_mod: None|ModuleType|False = None + + +def maybe_import_greenback( + raise_not_found: bool = True, + force_reload: bool = False, + +) -> ModuleType|False: + # be cached-fast on module-already-inited + global _gb_mod + + if _gb_mod is False: + return False + + elif ( + _gb_mod is not None + and not force_reload + ): + return _gb_mod + + try: + import greenback + _gb_mod = greenback + return greenback + + except ModuleNotFoundError as mnf: + log.debug( + '`greenback` is not installed.\n' + 'No sync debug support!\n' + ) + _gb_mod = False + + if raise_not_found: + raise RuntimeError( + 'The `greenback` lib is required to use `tractor.pause_from_sync()`!\n' + 'https://github.com/oremanj/greenback\n' + ) from mnf + + return False + + +async def maybe_init_greenback( + **kwargs, +) -> None|ModuleType: + + if mod := maybe_import_greenback(**kwargs): + await mod.ensure_portal() + log.info( + '`greenback` portal opened!\n' + 'Sync debug support activated!\n' + ) + return mod + + return None # TODO: allow pausing from sync code. # normally by remapping python's builtin breakpoint() hook to this # runtime aware version which takes care of all . -def pause_from_sync() -> None: - print("ENTER SYNC PAUSE") +def pause_from_sync( + hide_tb: bool = False, +) -> None: + + __tracebackhide__: bool = hide_tb actor: tractor.Actor = current_actor( err_on_no_runtime=False, ) - if actor: - try: - import greenback - # __tracebackhide__ = True + log.debug( + f'{actor.uid}: JUST ENTERED `tractor.pause_from_sync()`' + f'|_{actor}\n' + ) + if not actor: + raise RuntimeError( + 'Not inside the `tractor`-runtime?\n' + '`tractor.pause_from_sync()` is not functional without a wrapping\n' + '- `async with tractor.open_nursery()` or,\n' + '- `async with tractor.open_root_actor()`\n' + ) + # NOTE: once supported, remove this AND the one + # inside `._pause()`! + if actor.is_infected_aio(): + raise RuntimeError( + '`tractor.pause[_from_sync]()` not yet supported ' + 'for infected `asyncio` mode!' + ) - # task_can_release_tty_lock = trio.Event() + # raises on not-found by default + greenback: ModuleType = maybe_import_greenback() + mdb: MultiActorPdb = mk_mpdb() - # spawn bg task which will lock out the TTY, we poll - # just below until the release event is reporting that task as - # waiting.. not the most ideal but works for now ;) - greenback.await_( - actor._service_n.start(partial( - pause, - debug_func=None, - # release_lock_signal=task_can_release_tty_lock, - )) + # run async task which will lock out the root proc's TTY. + if not Lock.is_main_trio_thread(): + + # TODO: we could also check for a non-`.to_thread` context + # using `trio.from_thread.check_cancelled()` (says + # oremanj) wherein we get the following outputs: + # + # `RuntimeError`: non-`.to_thread` spawned thread + # noop: non-cancelled `.to_thread` + # `trio.Cancelled`: cancelled `.to_thread` + # + trio.from_thread.run( + partial( + pause, + debug_func=None, + pdb=mdb, + hide_tb=hide_tb, ) + ) + # TODO: maybe the `trio.current_task()` id/name if avail? + Lock.local_task_in_debug: str = str(threading.current_thread().name) - except ModuleNotFoundError: - log.warning('NO GREENBACK FOUND') - else: - log.warning('Not inside actor-runtime') + else: # we are presumably the `trio.run()` + main thread + greenback.await_( + pause( + debug_func=None, + pdb=mdb, + hide_tb=hide_tb, + ) + ) + Lock.local_task_in_debug: str = current_task().name - db, undo_sigint = mk_mpdb() - Lock.local_task_in_debug = 'sync' - # db.config.enable_hidden_frames = True - - # we entered the global ``breakpoint()`` built-in from sync + # TODO: ensure we aggressively make the user aware about + # entering the global ``breakpoint()`` built-in from sync # code? - frame: FrameType | None = sys._getframe() - # print(f'FRAME: {str(frame)}') - # assert not db._is_hidden(frame) + _set_trace( + actor=actor, + pdb=mdb, + hide_tb=hide_tb, + extra_frames_up_when_async=1, - frame: FrameType = frame.f_back # type: ignore - # print(f'FRAME: {str(frame)}') - # if not db._is_hidden(frame): - # pdbp.set_trace() - # db._hidden_frames.append( - # (frame, frame.f_lineno) - # ) - db.set_trace(frame=frame) - # NOTE XXX: see the `@pdbp.hideframe` decoration - # on `Lock.unshield_sigint()`.. I have NO CLUE why - # the next instruction's def frame is being shown - # in the tb but it seems to be something wonky with - # the way `pdb` core works? - # undo_sigint() - - # Lock.global_actor_in_debug = actor.uid - # Lock.release() - # task_can_release_tty_lock.set() - - -# using the "pause" semantics instead since -# that better covers actually somewhat "pausing the runtime" -# for this particular paralell task to do debugging B) -# pp = pause # short-hand for "pause point" + # TODO? will we ever need it? + # -> the gb._await() won't be affected by cancellation? + # shield=shield, + ) + # LEGACY NOTE on next LOC's frame showing weirdness.. + # + # XXX NOTE XXX no other LOC can be here without it + # showing up in the REPL's last stack frame !?! + # -[ ] tried to use `@pdbp.hideframe` decoration but + # still doesn't work +# NOTE prefer a new "pause" semantic since it better describes +# "pausing the actor's runtime" for this particular +# paralell task to do debugging in a REPL. async def breakpoint(**kwargs): log.warning( '`tractor.breakpoint()` is deprecated!\n' diff --git a/tractor/devx/_stackscope.py b/tractor/devx/_stackscope.py index 706b85d3..e8e97d1a 100644 --- a/tractor/devx/_stackscope.py +++ b/tractor/devx/_stackscope.py @@ -23,12 +23,31 @@ into each ``trio.Nursery`` except it links the lifetimes of memory space disjoint, parallel executing tasks in separate actors. ''' +from __future__ import annotations +import multiprocessing as mp from signal import ( signal, SIGUSR1, ) +import traceback +from typing import TYPE_CHECKING import trio +from tractor import ( + _state, + log as logmod, +) + +log = logmod.get_logger(__name__) + + +if TYPE_CHECKING: + from tractor._spawn import ProcessType + from tractor import ( + Actor, + ActorNursery, + ) + @trio.lowlevel.disable_ki_protection def dump_task_tree() -> None: @@ -41,9 +60,15 @@ def dump_task_tree() -> None: recurse_child_tasks=True ) ) - log = get_console_log('cancel') - log.pdb( - f'Dumping `stackscope` tree:\n\n' + log = get_console_log( + name=__name__, + level='cancel', + ) + actor: Actor = _state.current_actor() + log.devx( + f'Dumping `stackscope` tree for actor\n' + f'{actor.name}: {actor}\n' + f' |_{mp.current_process()}\n\n' f'{tree_str}\n' ) # import logging @@ -56,8 +81,13 @@ def dump_task_tree() -> None: # ).exception("Error printing task tree") -def signal_handler(sig: int, frame: object) -> None: - import traceback +def signal_handler( + sig: int, + frame: object, + + relay_to_subs: bool = True, + +) -> None: try: trio.lowlevel.current_trio_token( ).run_sync_soon(dump_task_tree) @@ -65,6 +95,26 @@ def signal_handler(sig: int, frame: object) -> None: # not in async context -- print a normal traceback traceback.print_stack() + if not relay_to_subs: + return + + an: ActorNursery + for an in _state.current_actor()._actoruid2nursery.values(): + + subproc: ProcessType + subactor: Actor + for subactor, subproc, _ in an._children.values(): + log.devx( + f'Relaying `SIGUSR1`[{sig}] to sub-actor\n' + f'{subactor}\n' + f' |_{subproc}\n' + ) + + if isinstance(subproc, trio.Process): + subproc.send_signal(sig) + + elif isinstance(subproc, mp.Process): + subproc._send_signal(sig) def enable_stack_on_sig( @@ -82,3 +132,6 @@ def enable_stack_on_sig( # NOTE: not the above can be triggered from # a (xonsh) shell using: # kill -SIGUSR1 @$(pgrep -f '') + # + # for example if you were looking to trace a `pytest` run + # kill -SIGUSR1 @$(pgrep -f 'pytest') diff --git a/tractor/log.py b/tractor/log.py index 6c040209..edb058e3 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -21,6 +21,11 @@ Log like a forester! from collections.abc import Mapping import sys import logging +from logging import ( + LoggerAdapter, + Logger, + StreamHandler, +) import colorlog # type: ignore import trio @@ -48,20 +53,19 @@ LOG_FORMAT = ( DATE_FORMAT = '%b %d %H:%M:%S' -LEVELS: dict[str, int] = { +# FYI, ERROR is 40 +CUSTOM_LEVELS: dict[str, int] = { 'TRANSPORT': 5, 'RUNTIME': 15, - 'CANCEL': 16, + 'DEVX': 17, + 'CANCEL': 18, 'PDB': 500, } -# _custom_levels: set[str] = { -# lvlname.lower for lvlname in LEVELS.keys() -# } - STD_PALETTE = { 'CRITICAL': 'red', 'ERROR': 'red', 'PDB': 'white', + 'DEVX': 'cyan', 'WARNING': 'yellow', 'INFO': 'green', 'CANCEL': 'yellow', @@ -78,7 +82,7 @@ BOLD_PALETTE = { # TODO: this isn't showing the correct '{filename}' # as it did before.. -class StackLevelAdapter(logging.LoggerAdapter): +class StackLevelAdapter(LoggerAdapter): def transport( self, @@ -86,7 +90,8 @@ class StackLevelAdapter(logging.LoggerAdapter): ) -> None: ''' - IPC level msg-ing. + IPC transport level msg IO; generally anything below + `._ipc.Channel` and friends. ''' return self.log(5, msg) @@ -102,11 +107,11 @@ class StackLevelAdapter(logging.LoggerAdapter): msg: str, ) -> None: ''' - Cancellation logging, mostly for runtime reporting. + Cancellation sequencing, mostly for runtime reporting. ''' return self.log( - level=16, + level=22, msg=msg, # stacklevel=4, ) @@ -116,11 +121,21 @@ class StackLevelAdapter(logging.LoggerAdapter): msg: str, ) -> None: ''' - Debugger logging. + `pdb`-REPL (debugger) related statuses. ''' return self.log(500, msg) + def devx( + self, + msg: str, + ) -> None: + ''' + "Developer experience" sub-sys statuses. + + ''' + return self.log(17, msg) + def log( self, level, @@ -136,8 +151,7 @@ class StackLevelAdapter(logging.LoggerAdapter): if self.isEnabledFor(level): stacklevel: int = 3 if ( - level in LEVELS.values() - # or level in _custom_levels + level in CUSTOM_LEVELS.values() ): stacklevel: int = 4 @@ -184,8 +198,30 @@ class StackLevelAdapter(logging.LoggerAdapter): ) +# TODO IDEAs: +# -[ ] move to `.devx.pformat`? +# -[ ] do per task-name and actor-name color coding +# -[ ] unique color per task-id and actor-uuid +def pformat_task_uid( + id_part: str = 'tail' +): + ''' + Return `str`-ified unique for a `trio.Task` via a combo of its + `.name: str` and `id()` truncated output. + + ''' + task: trio.Task = trio.lowlevel.current_task() + tid: str = str(id(task)) + if id_part == 'tail': + tid_part: str = tid[-6:] + else: + tid_part: str = tid[:6] + + return f'{task.name}[{tid_part}]' + + _conc_name_getters = { - 'task': lambda: trio.lowlevel.current_task().name, + 'task': pformat_task_uid, 'actor': lambda: current_actor(), 'actor_name': lambda: current_actor().name, 'actor_uid': lambda: current_actor().uid[1][:6], @@ -193,7 +229,10 @@ _conc_name_getters = { class ActorContextInfo(Mapping): - "Dyanmic lookup for local actor and task names" + ''' + Dyanmic lookup for local actor and task names. + + ''' _context_keys = ( 'task', 'actor', @@ -224,6 +263,7 @@ def get_logger( '''Return the package log or a sub-logger for ``name`` if provided. ''' + log: Logger log = rlog = logging.getLogger(_root_name) if ( @@ -266,7 +306,7 @@ def get_logger( logger = StackLevelAdapter(log, ActorContextInfo()) # additional levels - for name, val in LEVELS.items(): + for name, val in CUSTOM_LEVELS.items(): logging.addLevelName(val, name) # ensure customs levels exist as methods @@ -278,7 +318,7 @@ def get_logger( def get_console_log( level: str | None = None, **kwargs, -) -> logging.LoggerAdapter: +) -> LoggerAdapter: '''Get the package logger and enable a handler which writes to stderr. Yeah yeah, i know we can use ``DictConfig``. You do it. @@ -303,7 +343,7 @@ def get_console_log( None, ) ): - handler = logging.StreamHandler() + handler = StreamHandler() formatter = colorlog.ColoredFormatter( LOG_FORMAT, datefmt=DATE_FORMAT, @@ -323,3 +363,19 @@ def get_loglevel() -> str: # global module logger for tractor itself log = get_logger('tractor') + + +def at_least_level( + log: Logger|LoggerAdapter, + level: int|str, +) -> bool: + ''' + Predicate to test if a given level is active. + + ''' + if isinstance(level, str): + level: int = CUSTOM_LEVELS[level.upper()] + + if log.getEffectiveLevel() <= level: + return True + return False diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 7c88edd2..585b0b00 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -33,10 +33,14 @@ from typing import ( import trio from outcome import Error -from .log import get_logger -from ._state import current_actor -from ._exceptions import AsyncioCancelled -from .trionics._broadcast import ( +from tractor.log import get_logger +from tractor._state import ( + current_actor, + debug_mode, +) +from tractor.devx import _debug +from tractor._exceptions import AsyncioCancelled +from tractor.trionics._broadcast import ( broadcast_receiver, BroadcastReceiver, ) @@ -64,9 +68,9 @@ class LinkedTaskChannel(trio.abc.Channel): _trio_exited: bool = False # set after ``asyncio.create_task()`` - _aio_task: asyncio.Task | None = None - _aio_err: BaseException | None = None - _broadcaster: BroadcastReceiver | None = None + _aio_task: asyncio.Task|None = None + _aio_err: BaseException|None = None + _broadcaster: BroadcastReceiver|None = None async def aclose(self) -> None: await self._from_aio.aclose() @@ -158,7 +162,9 @@ def _run_asyncio_task( ''' __tracebackhide__ = True if not current_actor().is_infected_aio(): - raise RuntimeError("`infect_asyncio` mode is not enabled!?") + raise RuntimeError( + "`infect_asyncio` mode is not enabled!?" + ) # ITC (inter task comms), these channel/queue names are mostly from # ``asyncio``'s perspective. @@ -187,7 +193,7 @@ def _run_asyncio_task( cancel_scope = trio.CancelScope() aio_task_complete = trio.Event() - aio_err: BaseException | None = None + aio_err: BaseException|None = None chan = LinkedTaskChannel( aio_q, # asyncio.Queue @@ -253,7 +259,7 @@ def _run_asyncio_task( if not inspect.isawaitable(coro): raise TypeError(f"No support for invoking {coro}") - task = asyncio.create_task( + task: asyncio.Task = asyncio.create_task( wait_on_coro_final_result( to_trio, coro, @@ -262,6 +268,18 @@ def _run_asyncio_task( ) chan._aio_task = task + # XXX TODO XXX get this actually workin.. XD + # maybe setup `greenback` for `asyncio`-side task REPLing + if ( + debug_mode() + and + (greenback := _debug.maybe_import_greenback( + force_reload=True, + raise_not_found=False, + )) + ): + greenback.bestow_portal(task) + def cancel_trio(task: asyncio.Task) -> None: ''' Cancel the calling ``trio`` task on error. @@ -269,7 +287,7 @@ def _run_asyncio_task( ''' nonlocal chan aio_err = chan._aio_err - task_err: BaseException | None = None + task_err: BaseException|None = None # only to avoid ``asyncio`` complaining about uncaptured # task exceptions @@ -349,11 +367,11 @@ async def translate_aio_errors( ''' trio_task = trio.lowlevel.current_task() - aio_err: BaseException | None = None + aio_err: BaseException|None = None # TODO: make thisi a channel method? def maybe_raise_aio_err( - err: Exception | None = None + err: Exception|None = None ) -> None: aio_err = chan._aio_err if ( @@ -531,6 +549,16 @@ def run_as_asyncio_guest( loop = asyncio.get_running_loop() trio_done_fut = asyncio.Future() + if debug_mode(): + # XXX make it obvi we know this isn't supported yet! + log.error( + 'Attempting to enter unsupported `greenback` init ' + 'from `asyncio` task..' + ) + await _debug.maybe_init_greenback( + force_reload=True, + ) + def trio_done_callback(main_outcome): if isinstance(main_outcome, Error):