Merge pull request #374 from goodboy/pause_from_sync_w_greenback

Pause from sync (with `greenback`), `log.devx()`, hide `@acm` frames
sc_super_proto_dgrams
goodboy 2025-03-21 00:17:28 -04:00 committed by GitHub
commit e8bd834b5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 987 additions and 245 deletions

View File

@ -77,7 +77,9 @@ async def main(
) -> None: ) -> None:
async with tractor.open_nursery() as n: async with tractor.open_nursery(
# debug_mode=True,
) as n:
p = await n.start_actor( p = await n.start_actor(
'aio_daemon', 'aio_daemon',

View File

@ -4,9 +4,15 @@ import trio
async def breakpoint_forever(): async def breakpoint_forever():
"Indefinitely re-enter debugger in child actor." "Indefinitely re-enter debugger in child actor."
while True: try:
yield 'yo' while True:
await tractor.breakpoint() yield 'yo'
await tractor.breakpoint()
except BaseException:
tractor.log.get_console_log().exception(
'Cancelled while trying to enter pause point!'
)
raise
async def name_error(): async def name_error():
@ -19,7 +25,7 @@ async def main():
""" """
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='error', loglevel='cancel',
) as n: ) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__]) p0 = await n.start_actor('bp_forever', enable_modules=[__name__])

View File

@ -45,6 +45,7 @@ async def spawn_until(depth=0):
) )
# TODO: notes on the new boxed-relayed errors through proxy actors
async def main(): async def main():
"""The main ``tractor`` routine. """The main ``tractor`` routine.

View File

@ -38,6 +38,7 @@ async def main():
""" """
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
# loglevel='runtime',
) as n: ) as n:
# Spawn both actors, don't bother with collecting results # Spawn both actors, don't bother with collecting results

View File

@ -23,5 +23,6 @@ async def main():
n.start_soon(debug_actor.run, die) n.start_soon(debug_actor.run, die)
n.start_soon(crash_boi.run, die) n.start_soon(crash_boi.run, die)
if __name__ == '__main__': if __name__ == '__main__':
trio.run(main) trio.run(main)

View File

@ -2,10 +2,13 @@ import trio
import tractor import tractor
async def main(): async def main(
registry_addrs: tuple[str, int]|None = None
):
async with tractor.open_root_actor( async with tractor.open_root_actor(
debug_mode=True, debug_mode=True,
# loglevel='runtime',
): ):
while True: while True:
await tractor.breakpoint() await tractor.breakpoint()

View File

@ -3,17 +3,20 @@ import tractor
async def breakpoint_forever(): async def breakpoint_forever():
"""Indefinitely re-enter debugger in child actor. '''
""" Indefinitely re-enter debugger in child actor.
'''
while True: while True:
await trio.sleep(0.1) await trio.sleep(0.1)
await tractor.breakpoint() await tractor.pause()
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='cancel',
) as n: ) as n:
portal = await n.run_in_actor( portal = await n.run_in_actor(

View File

@ -3,16 +3,26 @@ import tractor
async def name_error(): async def name_error():
getattr(doggypants) getattr(doggypants) # noqa (on purpose)
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
) as n: # loglevel='transport',
) as an:
portal = await n.run_in_actor(name_error) # TODO: ideally the REPL arrives at this frame in the parent,
await portal.result() # ABOVE the @api_frame of `Portal.run_in_actor()` (which
# should eventually not even be a portal method ... XD)
# await tractor.pause()
p: tractor.Portal = await an.run_in_actor(name_error)
# with this style, should raise on this line
await p.result()
# with this alt style should raise at `open_nusery()`
# return await p.result()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -0,0 +1,75 @@
import trio
import tractor
def sync_pause(
use_builtin: bool = True,
error: bool = False,
):
if use_builtin:
breakpoint(hide_tb=False)
else:
tractor.pause_from_sync()
if error:
raise RuntimeError('yoyo sync code error')
@tractor.context
async def start_n_sync_pause(
ctx: tractor.Context,
):
actor: tractor.Actor = tractor.current_actor()
# sync to parent-side task
await ctx.started()
print(f'entering SYNC PAUSE in {actor.uid}')
sync_pause()
print(f'back from SYNC PAUSE in {actor.uid}')
async def main() -> None:
async with tractor.open_nursery(
# NOTE: required for pausing from sync funcs
maybe_enable_greenback=True,
debug_mode=True,
) as an:
p: tractor.Portal = await an.start_actor(
'subactor',
enable_modules=[__name__],
# infect_asyncio=True,
debug_mode=True,
loglevel='cancel',
)
# TODO: 3 sub-actor usage cases:
# -[ ] via a `.run_in_actor()` call
# -[ ] via a `.run()`
# -[ ] via a `.open_context()`
#
async with p.open_context(
start_n_sync_pause,
) as (ctx, first):
assert first is None
await tractor.pause()
sync_pause()
# TODO: make this work!!
await trio.to_thread.run_sync(
sync_pause,
abandon_on_cancel=False,
)
await ctx.cancel()
# TODO: case where we cancel from trio-side while asyncio task
# has debugger lock?
await p.cancel_actor()
if __name__ == '__main__':
trio.run(main)

View File

@ -1025,3 +1025,67 @@ def test_different_debug_mode_per_actor(
# instead crashed completely # instead crashed completely
assert "tractor._exceptions.RemoteActorError: ('crash_boi'" in before assert "tractor._exceptions.RemoteActorError: ('crash_boi'" in before
assert "RuntimeError" in before assert "RuntimeError" in before
def test_pause_from_sync(
spawn,
ctlc: bool
):
'''
Verify we can use the `pdbp` REPL from sync functions AND from
any thread spawned with `trio.to_thread.run_sync()`.
`examples/debugging/sync_bp.py`
'''
child = spawn('sync_bp')
child.expect(PROMPT)
assert_before(
child,
[
'`greenback` portal opened!',
# pre-prompt line
_pause_msg, "('root'",
]
)
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(PROMPT)
# XXX shouldn't see gb loaded again
before = str(child.before.decode())
assert not in_prompt_msg(
before,
['`greenback` portal opened!'],
)
assert_before(
child,
[_pause_msg, "('root'",],
)
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[_pause_msg, "('subactor'",],
)
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(PROMPT)
# non-main thread case
# TODO: should we agument the pre-prompt msg in this case?
assert_before(
child,
[_pause_msg, "('root'",],
)
if ctlc:
do_ctlc(child)
child.sendline('c')
child.expect(pexpect.EOF)

View File

@ -601,7 +601,8 @@ def test_echoserver_detailed_mechanics(
pass pass
else: else:
pytest.fail( pytest.fail(
"stream wasn't stopped after sentinel?!") 'stream not stopped after sentinel ?!'
)
# TODO: the case where this blocks and # TODO: the case where this blocks and
# is cancelled by kbi or out of task cancellation # is cancelled by kbi or out of task cancellation
@ -613,3 +614,37 @@ def test_echoserver_detailed_mechanics(
else: else:
trio.run(main) trio.run(main)
# TODO: debug_mode tests once we get support for `asyncio`!
#
# -[ ] need tests to wrap both scripts:
# - [ ] infected_asyncio_echo_server.py
# - [ ] debugging/asyncio_bp.py
# -[ ] consider moving ^ (some of) these ^ to `test_debugger`?
#
# -[ ] missing impl outstanding includes:
# - [x] for sync pauses we need to ensure we open yet another
# `greenback` portal in the asyncio task
# => completed using `.bestow_portal(task)` inside
# `.to_asyncio._run_asyncio_task()` right?
# -[ ] translation func to get from `asyncio` task calling to
# `._debug.wait_for_parent_stdin_hijack()` which does root
# call to do TTY locking.
#
def test_sync_breakpoint():
'''
Verify we can do sync-func/code breakpointing using the
`breakpoint()` builtin inside infected mode actors.
'''
pytest.xfail('This support is not implemented yet!')
def test_debug_mode_crash_handling():
'''
Verify mult-actor crash handling works with a combo of infected-`asyncio`-mode
and normal `trio` actors despite nested process trees.
'''
pytest.xfail('This support is not implemented yet!')

View File

@ -36,6 +36,7 @@ def parse_ipaddr(arg):
if __name__ == "__main__": if __name__ == "__main__":
__tracebackhide__: bool = True
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid) parser.add_argument("--uid", type=parse_uid)

View File

@ -351,7 +351,7 @@ class Context:
by the runtime in 2 ways: by the runtime in 2 ways:
- by entering ``Portal.open_context()`` which is the primary - by entering ``Portal.open_context()`` which is the primary
public API for any "caller" task or, public API for any "caller" task or,
- by the RPC machinery's `._runtime._invoke()` as a `ctx` arg - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
to a remotely scheduled "callee" function. to a remotely scheduled "callee" function.
AND is always constructed using the below ``mk_context()``. AND is always constructed using the below ``mk_context()``.
@ -361,10 +361,10 @@ class Context:
`trio.Task`s. Contexts are allocated on each side of any task `trio.Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is actor from a `Portal`. On the "callee" side a context is
always allocated inside ``._runtime._invoke()``. always allocated inside ``._rpc._invoke()``.
# TODO: more detailed writeup on cancellation, error and TODO: more detailed writeup on cancellation, error and
# streaming semantics.. streaming semantics..
A context can be cancelled and (possibly eventually restarted) from A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel, it can also open task either side of the underlying IPC channel, it can also open task
@ -1209,7 +1209,9 @@ class Context:
# await pause() # await pause()
log.warning( log.warning(
'Stream was terminated by EoC\n\n' 'Stream was terminated by EoC\n\n'
f'{repr(eoc)}\n' # NOTE: won't show the error <Type> but
# does show txt followed by IPC msg.
f'{str(eoc)}\n'
) )
finally: finally:
@ -1306,7 +1308,7 @@ class Context:
# `._cancel_called == True`. # `._cancel_called == True`.
not raise_overrun_from_self not raise_overrun_from_self
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.msgdata['type_str'] == 'StreamOverrun' and remote_error.msgdata['boxed_type_str'] == 'StreamOverrun'
and tuple(remote_error.msgdata['sender']) == our_uid and tuple(remote_error.msgdata['sender']) == our_uid
): ):
# NOTE: we set the local scope error to any "self # NOTE: we set the local scope error to any "self
@ -1883,6 +1885,19 @@ class Context:
return False return False
# TODO: exception tb masking by using a manual
# `.__aexit__()`/.__aenter__()` pair on a type?
# => currently this is one of the few places we can't easily
# mask errors - on the exit side of a `Portal.open_context()`..
# there's # => currently this is one of the few places we can't
# there's 2 ways to approach it:
# - manually write an @acm type as per above
# - use `contextlib.AsyncContextDecorator` to override the default
# impl to suppress traceback frames:
# * https://docs.python.org/3/library/contextlib.html#contextlib.AsyncContextDecorator
# * https://docs.python.org/3/library/contextlib.html#contextlib.ContextDecorator
# - also we could just override directly the underlying
# `contextlib._AsyncGeneratorContextManager`?
@acm @acm
async def open_context_from_portal( async def open_context_from_portal(
portal: Portal, portal: Portal,

View File

@ -106,6 +106,7 @@ def _trio_main(
Entry point for a `trio_run_in_process` subactor. Entry point for a `trio_run_in_process` subactor.
''' '''
__tracebackhide__: bool = True
_state._current_actor = actor _state._current_actor = actor
trio_main = partial( trio_main = partial(
async_main, async_main,

View File

@ -22,9 +22,10 @@ from contextlib import asynccontextmanager
from functools import partial from functools import partial
import importlib import importlib
import logging import logging
import os
import signal import signal
import sys import sys
import os from typing import Callable
import warnings import warnings
@ -78,6 +79,8 @@ async def open_root_actor(
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
enable_stack_on_sig: bool = False,
# internal logging # internal logging
loglevel: str|None = None, loglevel: str|None = None,
@ -94,12 +97,41 @@ async def open_root_actor(
Runtime init entry point for ``tractor``. Runtime init entry point for ``tractor``.
''' '''
# TODO: stick this in a `@cm` defined in `devx._debug`?
#
# Override the global debugger hook to make it play nice with # Override the global debugger hook to make it play nice with
# ``trio``, see much discussion in: # ``trio``, see much discussion in:
# https://github.com/python-trio/trio/issues/1155#issuecomment-742964018 # https://github.com/python-trio/trio/issues/1155#issuecomment-742964018
builtin_bp_handler = sys.breakpointhook builtin_bp_handler: Callable = sys.breakpointhook
orig_bp_path: str | None = os.environ.get('PYTHONBREAKPOINT', None) orig_bp_path: str|None = os.environ.get(
os.environ['PYTHONBREAKPOINT'] = 'tractor.devx._debug.pause_from_sync' 'PYTHONBREAKPOINT',
None,
)
if (
debug_mode
and maybe_enable_greenback
and await _debug.maybe_init_greenback(
raise_not_found=False,
)
):
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug.pause_from_sync'
)
else:
# TODO: disable `breakpoint()` by default (without
# `greenback`) since it will break any multi-actor
# usage by a clobbered TTY's stdstreams!
def block_bps(*args, **kwargs):
raise RuntimeError(
'Trying to use `breakpoint()` eh?\n'
'Welp, `tractor` blocks `breakpoint()` built-in calls by default!\n'
'If you need to use it please install `greenback` and set '
'`debug_mode=True` when opening the runtime '
'(either via `.open_nursery()` or `open_root_actor()`)\n'
)
sys.breakpointhook = block_bps
# os.environ['PYTHONBREAKPOINT'] = None
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
# on our debugger lock state. # on our debugger lock state.
@ -179,7 +211,11 @@ async def open_root_actor(
assert _log assert _log
# TODO: factor this into `.devx._stackscope`!! # TODO: factor this into `.devx._stackscope`!!
if debug_mode: if (
debug_mode
and
enable_stack_on_sig
):
try: try:
logger.info('Enabling `stackscope` traces on SIGUSR1') logger.info('Enabling `stackscope` traces on SIGUSR1')
from .devx import enable_stack_on_sig from .devx import enable_stack_on_sig
@ -356,12 +392,14 @@ async def open_root_actor(
_state._last_actor_terminated = actor _state._last_actor_terminated = actor
# restore built-in `breakpoint()` hook state # restore built-in `breakpoint()` hook state
sys.breakpointhook = builtin_bp_handler if debug_mode:
if orig_bp_path is not None: if builtin_bp_handler is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path sys.breakpointhook = builtin_bp_handler
else: if orig_bp_path is not None:
# clear env back to having no entry os.environ['PYTHONBREAKPOINT'] = orig_bp_path
os.environ.pop('PYTHONBREAKPOINT') else:
# clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT')
logger.runtime("Root actor terminated") logger.runtime("Root actor terminated")

View File

@ -26,7 +26,6 @@ from contextlib import (
from functools import partial from functools import partial
import inspect import inspect
from pprint import pformat from pprint import pformat
from types import ModuleType
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -332,27 +331,6 @@ async def _errors_relayed_via_ipc(
actor._ongoing_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
_gb_mod: ModuleType|None|False = None
async def maybe_import_gb():
global _gb_mod
if _gb_mod is False:
return
try:
import greenback
_gb_mod = greenback
await greenback.ensure_portal()
except ModuleNotFoundError:
log.debug(
'`greenback` is not installed.\n'
'No sync debug support!\n'
)
_gb_mod = False
async def _invoke( async def _invoke(
actor: Actor, actor: Actor,
@ -380,7 +358,9 @@ async def _invoke(
treat_as_gen: bool = False treat_as_gen: bool = False
if _state.debug_mode(): if _state.debug_mode():
await maybe_import_gb() # XXX for .pause_from_sync()` usage we need to make sure
# `greenback` is boostrapped in the subactor!
await _debug.maybe_init_greenback()
# TODO: possibly a specially formatted traceback # TODO: possibly a specially formatted traceback
# (not sure what typing is for this..)? # (not sure what typing is for this..)?

View File

@ -136,16 +136,16 @@ class Actor:
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork # nursery placeholders filled in by `async_main()` after fork
_root_n: Nursery | None = None _root_n: Nursery|None = None
_service_n: Nursery | None = None _service_n: Nursery|None = None
_server_n: Nursery | None = None _server_n: Nursery|None = None
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: dict[str, str] _parent_main_data: dict[str, str]
_parent_chan_cs: CancelScope | None = None _parent_chan_cs: CancelScope|None = None
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
_server_down: trio.Event | None = None _server_down: trio.Event|None = None
# user toggled crash handling (including monkey-patched in # user toggled crash handling (including monkey-patched in
# `trio.open_nursery()` via `.trionics._supervisor` B) # `trio.open_nursery()` via `.trionics._supervisor` B)
@ -174,7 +174,7 @@ class Actor:
spawn_method: str|None = None, spawn_method: str|None = None,
# TODO: remove! # TODO: remove!
arbiter_addr: tuple[str, int] | None = None, arbiter_addr: tuple[str, int]|None = None,
) -> None: ) -> None:
''' '''
@ -189,7 +189,7 @@ class Actor:
) )
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple] | None = None self._cancel_called_by_remote: tuple[str, tuple]|None = None
self._cancel_called: bool = False self._cancel_called: bool = False
# retreive and store parent `__main__` data which # retreive and store parent `__main__` data which
@ -245,11 +245,11 @@ class Actor:
] = {} ] = {}
self._listeners: list[trio.abc.Listener] = [] self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Channel | None = None self._parent_chan: Channel|None = None
self._forkserver_info: tuple | None = None self._forkserver_info: tuple|None = None
self._actoruid2nursery: dict[ self._actoruid2nursery: dict[
tuple[str, str], tuple[str, str],
ActorNursery | None, ActorNursery|None,
] = {} # type: ignore # noqa ] = {} # type: ignore # noqa
# when provided, init the registry addresses property from # when provided, init the registry addresses property from
@ -781,7 +781,7 @@ class Actor:
# #
# side: str|None = None, # side: str|None = None,
msg_buffer_size: int | None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
) -> Context: ) -> Context:
@ -846,7 +846,7 @@ class Actor:
kwargs: dict, kwargs: dict,
# IPC channel config # IPC channel config
msg_buffer_size: int | None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
load_nsf: bool = False, load_nsf: bool = False,
@ -920,11 +920,11 @@ class Actor:
async def _from_parent( async def _from_parent(
self, self,
parent_addr: tuple[str, int] | None, parent_addr: tuple[str, int]|None,
) -> tuple[ ) -> tuple[
Channel, Channel,
list[tuple[str, int]] | None, list[tuple[str, int]]|None,
]: ]:
''' '''
Bootstrap this local actor's runtime config from its parent by Bootstrap this local actor's runtime config from its parent by
@ -945,7 +945,7 @@ class Actor:
# Initial handshake: swap names. # Initial handshake: swap names.
await self._do_handshake(chan) await self._do_handshake(chan)
accept_addrs: list[tuple[str, int]] | None = None accept_addrs: list[tuple[str, int]]|None = None
if self._spawn_method == "trio": if self._spawn_method == "trio":
# Receive runtime state from our parent # Receive runtime state from our parent
parent_data: dict[str, Any] parent_data: dict[str, Any]
@ -1009,7 +1009,7 @@ class Actor:
handler_nursery: Nursery, handler_nursery: Nursery,
*, *,
# (host, port) to bind for channel server # (host, port) to bind for channel server
listen_sockaddrs: list[tuple[str, int]] | None = None, listen_sockaddrs: list[tuple[str, int]]|None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1466,7 +1466,7 @@ class Actor:
async def async_main( async def async_main(
actor: Actor, actor: Actor,
accept_addrs: tuple[str, int] | None = None, accept_addrs: tuple[str, int]|None = None,
# XXX: currently ``parent_addr`` is only needed for the # XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to # ``multiprocessing`` backend (which pickles state sent to
@ -1475,7 +1475,7 @@ async def async_main(
# change this to a simple ``is_subactor: bool`` which will # change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as # be False when running as root actor and True when as
# a subactor. # a subactor.
parent_addr: tuple[str, int] | None = None, parent_addr: tuple[str, int]|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1498,7 +1498,7 @@ async def async_main(
try: try:
# establish primary connection with immediate parent # establish primary connection with immediate parent
actor._parent_chan: Channel | None = None actor._parent_chan: Channel|None = None
if parent_addr is not None: if parent_addr is not None:
( (
@ -1797,7 +1797,7 @@ class Arbiter(Actor):
self, self,
name: str, name: str,
) -> tuple[str, int] | None: ) -> tuple[str, int]|None:
for uid, sockaddr in self._registry.items(): for uid, sockaddr in self._registry.items():
if name in uid: if name in uid:

View File

@ -503,7 +503,7 @@ async def trio_proc(
}) })
# track subactor in current nursery # track subactor in current nursery
curr_actor = current_actor() curr_actor: Actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up # resume caller at next checkpoint now that child is up

View File

@ -30,11 +30,16 @@ if TYPE_CHECKING:
_current_actor: Actor|None = None # type: ignore # noqa _current_actor: Actor|None = None # type: ignore # noqa
_last_actor_terminated: Actor|None = None _last_actor_terminated: Actor|None = None
# TODO: mk this a `msgspec.Struct`!
_runtime_vars: dict[str, Any] = { _runtime_vars: dict[str, Any] = {
'_debug_mode': False, '_debug_mode': False,
'_is_root': False, '_is_root': False,
'_root_mailbox': (None, None), '_root_mailbox': (None, None),
'_registry_addrs': [], '_registry_addrs': [],
# for `breakpoint()` support
'use_greenback': False,
} }

View File

@ -119,11 +119,11 @@ class ActorNursery:
name: str, name: str,
*, *,
bind_addrs: list[tuple[str, int]] = [_default_bind_addr], bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
rpc_module_paths: list[str] | None = None, rpc_module_paths: list[str]|None = None,
enable_modules: list[str] | None = None, enable_modules: list[str]|None = None,
loglevel: str | None = None, # set log level per subactor loglevel: str|None = None, # set log level per subactor
nursery: trio.Nursery | None = None, nursery: trio.Nursery|None = None,
debug_mode: bool | None = None, debug_mode: bool|None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> Portal: ) -> Portal:
''' '''
@ -583,7 +583,7 @@ async def open_nursery(
finally: finally:
msg: str = ( msg: str = (
'Actor-nursery exited\n' 'Actor-nursery exited\n'
f'|_{an}\n\n' f'|_{an}\n'
) )
# shutdown runtime if it was started # shutdown runtime if it was started

View File

@ -0,0 +1,177 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Tools for code-object annotation, introspection and mutation
as it pertains to improving the grok-ability of our runtime!
'''
from __future__ import annotations
import inspect
# import msgspec
# from pprint import pformat
from types import (
FrameType,
FunctionType,
MethodType,
# CodeType,
)
from typing import (
# Any,
Callable,
# TYPE_CHECKING,
Type,
)
from tractor.msg import (
pretty_struct,
NamespacePath,
)
# TODO: yeah, i don't love this and we should prolly just
# write a decorator that actually keeps a stupid ref to the func
# obj..
def get_class_from_frame(fr: FrameType) -> (
FunctionType
|MethodType
):
'''
Attempt to get the function (or method) reference
from a given `FrameType`.
Verbatim from an SO:
https://stackoverflow.com/a/2220759
'''
args, _, _, value_dict = inspect.getargvalues(fr)
# we check the first parameter for the frame function is
# named 'self'
if (
len(args)
and
# TODO: other cases for `@classmethod` etc..?)
args[0] == 'self'
):
# in that case, 'self' will be referenced in value_dict
instance: object = value_dict.get('self')
if instance:
# return its class
return getattr(
instance,
'__class__',
None,
)
# return None otherwise
return None
def func_ref_from_frame(
frame: FrameType,
) -> Callable:
func_name: str = frame.f_code.co_name
try:
return frame.f_globals[func_name]
except KeyError:
cls: Type|None = get_class_from_frame(frame)
if cls:
return getattr(
cls,
func_name,
)
# TODO: move all this into new `.devx._code`!
# -[ ] prolly create a `@runtime_api` dec?
# -[ ] ^- make it capture and/or accept buncha optional
# meta-data like a fancier version of `@pdbp.hideframe`.
#
class CallerInfo(pretty_struct.Struct):
rt_fi: inspect.FrameInfo
call_frame: FrameType
@property
def api_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.rt_fi.frame)
@property
def api_nsp(self) -> NamespacePath|None:
func: FunctionType = self.api_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
@property
def caller_func_ref(self) -> Callable|None:
return func_ref_from_frame(self.call_frame)
@property
def caller_nsp(self) -> NamespacePath|None:
func: FunctionType = self.caller_func_ref
if func:
return NamespacePath.from_ref(func)
return '<unknown>'
def find_caller_info(
dunder_var: str = '__runtimeframe__',
iframes:int = 1,
check_frame_depth: bool = True,
) -> CallerInfo|None:
'''
Scan up the callstack for a frame with a `dunder_var: str` variable
and return the `iframes` frames above it.
By default we scan for a `__runtimeframe__` scope var which
denotes a `tractor` API above which (one frame up) is "user
app code" which "called into" the `tractor` method or func.
TODO: ex with `Portal.open_context()`
'''
# TODO: use this instead?
# https://docs.python.org/3/library/inspect.html#inspect.getouterframes
frames: list[inspect.FrameInfo] = inspect.stack()
for fi in frames:
assert (
fi.function
==
fi.frame.f_code.co_name
)
this_frame: FrameType = fi.frame
dunder_val: int|None = this_frame.f_locals.get(dunder_var)
if dunder_val:
go_up_iframes: int = (
dunder_val # could be 0 or `True` i guess?
or
iframes
)
rt_frame: FrameType = fi.frame
call_frame = rt_frame
for i in range(go_up_iframes):
call_frame = call_frame.f_back
return CallerInfo(
rt_fi=fi,
call_frame=call_frame,
)
return None

View File

@ -33,35 +33,46 @@ from functools import (
import os import os
import signal import signal
import sys import sys
import threading
import traceback import traceback
from typing import ( from typing import (
Any, Any,
Callable, Callable,
AsyncIterator, AsyncIterator,
AsyncGenerator, AsyncGenerator,
TYPE_CHECKING,
)
from types import (
FrameType,
ModuleType,
) )
from types import FrameType
import pdbp import pdbp
import sniffio
import tractor import tractor
import trio import trio
from trio.lowlevel import current_task from trio.lowlevel import current_task
from trio_typing import ( from trio import (
TaskStatus, TaskStatus,
# Task, # Task,
) )
from ..log import get_logger from tractor.log import get_logger
from .._state import ( from tractor._state import (
current_actor, current_actor,
is_root_process, is_root_process,
debug_mode, debug_mode,
) )
from .._exceptions import ( from tractor._exceptions import (
is_multi_cancelled, is_multi_cancelled,
ContextCancelled, ContextCancelled,
) )
from .._ipc import Channel from tractor._ipc import Channel
if TYPE_CHECKING:
from tractor._runtime import (
Actor,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -116,10 +127,36 @@ class Lock:
@classmethod @classmethod
def shield_sigint(cls): def shield_sigint(cls):
cls._orig_sigint_handler = signal.signal( '''
signal.SIGINT, Shield out SIGINT handling (which by default triggers
shield_sigint_handler, `trio.Task` cancellation) in subactors when the `pdb` REPL
) is active.
Avoids cancellation of the current actor (task) when the
user mistakenly sends ctl-c or a signal is received from
an external request; explicit runtime cancel requests are
allowed until the use exits the REPL session using
'continue' or 'quit', at which point the orig SIGINT
handler is restored.
'''
#
# XXX detect whether we're running from a non-main thread
# in which case schedule the SIGINT shielding override
# to in the main thread.
# https://docs.python.org/3/library/signal.html#signals-and-threads
if not cls.is_main_trio_thread():
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
signal.signal,
signal.SIGINT,
shield_sigint_handler,
)
else:
cls._orig_sigint_handler = signal.signal(
signal.SIGINT,
shield_sigint_handler,
)
@classmethod @classmethod
@pdbp.hideframe # XXX NOTE XXX see below in `.pause_from_sync()` @pdbp.hideframe # XXX NOTE XXX see below in `.pause_from_sync()`
@ -127,13 +164,60 @@ class Lock:
# always restore ``trio``'s sigint handler. see notes below in # always restore ``trio``'s sigint handler. see notes below in
# the pdb factory about the nightmare that is that code swapping # the pdb factory about the nightmare that is that code swapping
# out the handler when the repl activates... # out the handler when the repl activates...
signal.signal(signal.SIGINT, cls._trio_handler) if not cls.is_main_trio_thread():
trio.from_thread.run_sync(
signal.signal,
signal.SIGINT,
cls._trio_handler,
)
else:
signal.signal(
signal.SIGINT,
cls._trio_handler,
)
cls._orig_sigint_handler = None cls._orig_sigint_handler = None
@classmethod
def is_main_trio_thread(cls) -> bool:
'''
Check if we're the "main" thread (as in the first one
started by cpython) AND that it is ALSO the thread that
called `trio.run()` and not some thread spawned with
`trio.to_thread.run_sync()`.
'''
is_trio_main = (
# TODO: since this is private, @oremanj says
# we should just copy the impl for now..
(is_main_thread := trio._util.is_main_thread())
and
(async_lib := sniffio.current_async_library()) == 'trio'
)
if (
not is_trio_main
and is_main_thread
):
log.warning(
f'Current async-lib detected by `sniffio`: {async_lib}\n'
)
return is_trio_main
# XXX apparently unreliable..see ^
# (
# threading.current_thread()
# is not threading.main_thread()
# )
@classmethod @classmethod
def release(cls): def release(cls):
try: try:
cls._debug_lock.release() if not cls.is_main_trio_thread():
trio.from_thread.run_sync(
cls._debug_lock.release
)
else:
cls._debug_lock.release()
except RuntimeError: except RuntimeError:
# uhhh makes no sense but been seeing the non-owner # uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task # release error even though this is definitely the task
@ -400,7 +484,6 @@ async def wait_for_parent_stdin_hijack(
# this syncs to child's ``Context.started()`` call. # this syncs to child's ``Context.started()`` call.
async with portal.open_context( async with portal.open_context(
lock_tty_for_child, lock_tty_for_child,
subactor_uid=actor_uid, subactor_uid=actor_uid,
@ -438,11 +521,31 @@ async def wait_for_parent_stdin_hijack(
log.debug('Exiting debugger from child') log.debug('Exiting debugger from child')
def mk_mpdb() -> tuple[MultiActorPdb, Callable]: def mk_mpdb() -> MultiActorPdb:
'''
Deliver a new `MultiActorPdb`: a multi-process safe `pdbp`
REPL using the magic of SC!
Our `pdb.Pdb` subtype accomplishes multi-process safe debugging
by:
- mutexing access to the root process' TTY & stdstreams
via an IPC managed `Lock` singleton per process tree.
- temporarily overriding any subactor's SIGINT handler to shield during
live REPL sessions in sub-actors such that cancellation is
never (mistakenly) triggered by a ctrl-c and instead only
by either explicit requests in the runtime or
'''
pdb = MultiActorPdb() pdb = MultiActorPdb()
# signal.signal = pdbp.hideframe(signal.signal)
# Always shield out SIGINTs for subactors when REPL is active.
#
# XXX detect whether we're running from a non-main thread
# in which case schedule the SIGINT shielding override
# to in the main thread.
# https://docs.python.org/3/library/signal.html#signals-and-threads
Lock.shield_sigint() Lock.shield_sigint()
# XXX: These are the important flags mentioned in # XXX: These are the important flags mentioned in
@ -451,7 +554,7 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb.allow_kbdint = True pdb.allow_kbdint = True
pdb.nosigint = True pdb.nosigint = True
return pdb, Lock.unshield_sigint return pdb
def shield_sigint_handler( def shield_sigint_handler(
@ -464,17 +567,16 @@ def shield_sigint_handler(
''' '''
Specialized, debugger-aware SIGINT handler. Specialized, debugger-aware SIGINT handler.
In childred we always ignore to avoid deadlocks since cancellation In childred we always ignore/shield for SIGINT to avoid
should always be managed by the parent supervising actor. The root deadlocks since cancellation should always be managed by the
is always cancelled on ctrl-c. supervising parent actor. The root actor-proces is always
cancelled on ctrl-c.
''' '''
__tracebackhide__ = True __tracebackhide__: bool = True
uid_in_debug: tuple[str, str]|None = Lock.global_actor_in_debug
uid_in_debug: tuple[str, str] | None = Lock.global_actor_in_debug actor: Actor = current_actor()
actor = current_actor()
# print(f'{actor.uid} in HANDLER with ')
def do_cancel(): def do_cancel():
# If we haven't tried to cancel the runtime then do that instead # If we haven't tried to cancel the runtime then do that instead
@ -509,7 +611,7 @@ def shield_sigint_handler(
return do_cancel() return do_cancel()
# only set in the actor actually running the REPL # only set in the actor actually running the REPL
pdb_obj: MultiActorPdb | None = Lock.repl pdb_obj: MultiActorPdb|None = Lock.repl
# root actor branch that reports whether or not a child # root actor branch that reports whether or not a child
# has locked debugger. # has locked debugger.
@ -616,14 +718,20 @@ _pause_msg: str = 'Attaching to pdb REPL in actor'
def _set_trace( def _set_trace(
actor: tractor.Actor | None = None, actor: tractor.Actor|None = None,
pdb: MultiActorPdb | None = None, pdb: MultiActorPdb|None = None,
shield: bool = False, shield: bool = False,
extra_frames_up_when_async: int = 1, extra_frames_up_when_async: int = 1,
hide_tb: bool = True,
): ):
__tracebackhide__: bool = True __tracebackhide__: bool = hide_tb
actor: tractor.Actor = actor or current_actor()
actor: tractor.Actor = (
actor
or
current_actor()
)
# always start 1 level up from THIS in user code. # always start 1 level up from THIS in user code.
frame: FrameType|None frame: FrameType|None
@ -669,20 +777,17 @@ def _set_trace(
f'Going up frame {i} -> {frame}\n' f'Going up frame {i} -> {frame}\n'
) )
else: # engage ze REPL
pdb, undo_sigint = mk_mpdb() # B~()
# we entered the global ``breakpoint()`` built-in from sync
# code?
Lock.local_task_in_debug = 'sync'
pdb.set_trace(frame=frame) pdb.set_trace(frame=frame)
async def _pause( async def _pause(
debug_func: Callable = _set_trace, debug_func: Callable = _set_trace,
release_lock_signal: trio.Event | None = None,
# NOTE: must be passed in the `.pause_from_sync()` case!
pdb: MultiActorPdb|None = None,
# TODO: allow caller to pause despite task cancellation, # TODO: allow caller to pause despite task cancellation,
# exactly the same as wrapping with: # exactly the same as wrapping with:
@ -691,9 +796,9 @@ async def _pause(
# => the REMAINING ISSUE is that the scope's .__exit__() frame # => the REMAINING ISSUE is that the scope's .__exit__() frame
# is always show in the debugger on entry.. and there seems to # is always show in the debugger on entry.. and there seems to
# be no way to override it?.. # be no way to override it?..
# shield: bool = False, #
shield: bool = False, shield: bool = False,
hide_tb: bool = True,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
@ -705,10 +810,16 @@ async def _pause(
Hopefully we won't need this in the long run. Hopefully we won't need this in the long run.
''' '''
__tracebackhide__: bool = True __tracebackhide__: bool = hide_tb
actor = current_actor() actor: Actor = current_actor()
pdb, undo_sigint = mk_mpdb() try:
task_name: str = trio.lowlevel.current_task().name task_name: str = trio.lowlevel.current_task().name
except RuntimeError as rte:
if actor.is_infected_aio():
raise RuntimeError(
'`tractor.pause[_from_sync]()` not yet supported '
'for infected `asyncio` mode!'
) from rte
if ( if (
not Lock.local_pdb_complete not Lock.local_pdb_complete
@ -716,9 +827,13 @@ async def _pause(
): ):
Lock.local_pdb_complete = trio.Event() Lock.local_pdb_complete = trio.Event()
debug_func = partial( if debug_func is not None:
debug_func, debug_func = partial(
) debug_func,
)
if pdb is None:
pdb: MultiActorPdb = mk_mpdb()
# TODO: need a more robust check for the "root" actor # TODO: need a more robust check for the "root" actor
if ( if (
@ -767,6 +882,7 @@ async def _pause(
actor.uid, actor.uid,
) )
Lock.repl = pdb Lock.repl = pdb
except RuntimeError: except RuntimeError:
Lock.release() Lock.release()
@ -811,32 +927,26 @@ async def _pause(
# TODO: do we want to support using this **just** for the # TODO: do we want to support using this **just** for the
# locking / common code (prolly to help address #320)? # locking / common code (prolly to help address #320)?
# #
# if debug_func is None: if debug_func is None:
# assert release_lock_signal, ( task_status.started(Lock)
# 'Must pass `release_lock_signal: trio.Event` if no '
# 'trace func provided!'
# )
# print(f"{actor.uid} ENTERING WAIT")
# with trio.CancelScope(shield=True):
# await release_lock_signal.wait()
# else: else:
# block here one (at the appropriate frame *up*) where # block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio. # ``breakpoint()`` was awaited and begin handling stdio.
log.debug('Entering sync world of the `pdb` REPL..') log.debug('Entering sync world of the `pdb` REPL..')
try: try:
debug_func( debug_func(
actor, actor,
pdb, pdb,
extra_frames_up_when_async=2, extra_frames_up_when_async=2,
shield=shield, shield=shield,
) )
except BaseException: except BaseException:
log.exception( log.exception(
'Failed to invoke internal `debug_func = ' 'Failed to invoke internal `debug_func = '
f'{debug_func.func.__name__}`\n' f'{debug_func.func.__name__}`\n'
) )
raise raise
except bdb.BdbQuit: except bdb.BdbQuit:
Lock.release() Lock.release()
@ -862,8 +972,7 @@ async def _pause(
async def pause( async def pause(
debug_func: Callable = _set_trace, debug_func: Callable|None = _set_trace,
release_lock_signal: trio.Event | None = None,
# TODO: allow caller to pause despite task cancellation, # TODO: allow caller to pause despite task cancellation,
# exactly the same as wrapping with: # exactly the same as wrapping with:
@ -872,10 +981,11 @@ async def pause(
# => the REMAINING ISSUE is that the scope's .__exit__() frame # => the REMAINING ISSUE is that the scope's .__exit__() frame
# is always show in the debugger on entry.. and there seems to # is always show in the debugger on entry.. and there seems to
# be no way to override it?.. # be no way to override it?..
# shield: bool = False, #
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
**_pause_kwargs,
) -> None: ) -> None:
''' '''
@ -920,89 +1030,166 @@ async def pause(
task_status.started(cs) task_status.started(cs)
return await _pause( return await _pause(
debug_func=debug_func, debug_func=debug_func,
release_lock_signal=release_lock_signal,
shield=True, shield=True,
task_status=task_status, task_status=task_status,
**_pause_kwargs
) )
else: else:
return await _pause( return await _pause(
debug_func=debug_func, debug_func=debug_func,
release_lock_signal=release_lock_signal,
shield=False, shield=False,
task_status=task_status, task_status=task_status,
**_pause_kwargs
) )
_gb_mod: None|ModuleType|False = None
def maybe_import_greenback(
raise_not_found: bool = True,
force_reload: bool = False,
) -> ModuleType|False:
# be cached-fast on module-already-inited
global _gb_mod
if _gb_mod is False:
return False
elif (
_gb_mod is not None
and not force_reload
):
return _gb_mod
try:
import greenback
_gb_mod = greenback
return greenback
except ModuleNotFoundError as mnf:
log.debug(
'`greenback` is not installed.\n'
'No sync debug support!\n'
)
_gb_mod = False
if raise_not_found:
raise RuntimeError(
'The `greenback` lib is required to use `tractor.pause_from_sync()`!\n'
'https://github.com/oremanj/greenback\n'
) from mnf
return False
async def maybe_init_greenback(
**kwargs,
) -> None|ModuleType:
if mod := maybe_import_greenback(**kwargs):
await mod.ensure_portal()
log.info(
'`greenback` portal opened!\n'
'Sync debug support activated!\n'
)
return mod
return None
# TODO: allow pausing from sync code. # TODO: allow pausing from sync code.
# normally by remapping python's builtin breakpoint() hook to this # normally by remapping python's builtin breakpoint() hook to this
# runtime aware version which takes care of all . # runtime aware version which takes care of all .
def pause_from_sync() -> None: def pause_from_sync(
print("ENTER SYNC PAUSE") hide_tb: bool = False,
) -> None:
__tracebackhide__: bool = hide_tb
actor: tractor.Actor = current_actor( actor: tractor.Actor = current_actor(
err_on_no_runtime=False, err_on_no_runtime=False,
) )
if actor: log.debug(
try: f'{actor.uid}: JUST ENTERED `tractor.pause_from_sync()`'
import greenback f'|_{actor}\n'
# __tracebackhide__ = True )
if not actor:
raise RuntimeError(
'Not inside the `tractor`-runtime?\n'
'`tractor.pause_from_sync()` is not functional without a wrapping\n'
'- `async with tractor.open_nursery()` or,\n'
'- `async with tractor.open_root_actor()`\n'
)
# NOTE: once supported, remove this AND the one
# inside `._pause()`!
if actor.is_infected_aio():
raise RuntimeError(
'`tractor.pause[_from_sync]()` not yet supported '
'for infected `asyncio` mode!'
)
# task_can_release_tty_lock = trio.Event() # raises on not-found by default
greenback: ModuleType = maybe_import_greenback()
mdb: MultiActorPdb = mk_mpdb()
# spawn bg task which will lock out the TTY, we poll # run async task which will lock out the root proc's TTY.
# just below until the release event is reporting that task as if not Lock.is_main_trio_thread():
# waiting.. not the most ideal but works for now ;)
greenback.await_( # TODO: we could also check for a non-`.to_thread` context
actor._service_n.start(partial( # using `trio.from_thread.check_cancelled()` (says
pause, # oremanj) wherein we get the following outputs:
debug_func=None, #
# release_lock_signal=task_can_release_tty_lock, # `RuntimeError`: non-`.to_thread` spawned thread
)) # noop: non-cancelled `.to_thread`
# `trio.Cancelled`: cancelled `.to_thread`
#
trio.from_thread.run(
partial(
pause,
debug_func=None,
pdb=mdb,
hide_tb=hide_tb,
) )
)
# TODO: maybe the `trio.current_task()` id/name if avail?
Lock.local_task_in_debug: str = str(threading.current_thread().name)
except ModuleNotFoundError: else: # we are presumably the `trio.run()` + main thread
log.warning('NO GREENBACK FOUND') greenback.await_(
else: pause(
log.warning('Not inside actor-runtime') debug_func=None,
pdb=mdb,
hide_tb=hide_tb,
)
)
Lock.local_task_in_debug: str = current_task().name
db, undo_sigint = mk_mpdb() # TODO: ensure we aggressively make the user aware about
Lock.local_task_in_debug = 'sync' # entering the global ``breakpoint()`` built-in from sync
# db.config.enable_hidden_frames = True
# we entered the global ``breakpoint()`` built-in from sync
# code? # code?
frame: FrameType | None = sys._getframe() _set_trace(
# print(f'FRAME: {str(frame)}') actor=actor,
# assert not db._is_hidden(frame) pdb=mdb,
hide_tb=hide_tb,
extra_frames_up_when_async=1,
frame: FrameType = frame.f_back # type: ignore # TODO? will we ever need it?
# print(f'FRAME: {str(frame)}') # -> the gb._await() won't be affected by cancellation?
# if not db._is_hidden(frame): # shield=shield,
# pdbp.set_trace() )
# db._hidden_frames.append( # LEGACY NOTE on next LOC's frame showing weirdness..
# (frame, frame.f_lineno) #
# ) # XXX NOTE XXX no other LOC can be here without it
db.set_trace(frame=frame) # showing up in the REPL's last stack frame !?!
# NOTE XXX: see the `@pdbp.hideframe` decoration # -[ ] tried to use `@pdbp.hideframe` decoration but
# on `Lock.unshield_sigint()`.. I have NO CLUE why # still doesn't work
# the next instruction's def frame is being shown
# in the tb but it seems to be something wonky with
# the way `pdb` core works?
# undo_sigint()
# Lock.global_actor_in_debug = actor.uid
# Lock.release()
# task_can_release_tty_lock.set()
# using the "pause" semantics instead since
# that better covers actually somewhat "pausing the runtime"
# for this particular paralell task to do debugging B)
# pp = pause # short-hand for "pause point"
# NOTE prefer a new "pause" semantic since it better describes
# "pausing the actor's runtime" for this particular
# paralell task to do debugging in a REPL.
async def breakpoint(**kwargs): async def breakpoint(**kwargs):
log.warning( log.warning(
'`tractor.breakpoint()` is deprecated!\n' '`tractor.breakpoint()` is deprecated!\n'

View File

@ -23,12 +23,31 @@ into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors. disjoint, parallel executing tasks in separate actors.
''' '''
from __future__ import annotations
import multiprocessing as mp
from signal import ( from signal import (
signal, signal,
SIGUSR1, SIGUSR1,
) )
import traceback
from typing import TYPE_CHECKING
import trio import trio
from tractor import (
_state,
log as logmod,
)
log = logmod.get_logger(__name__)
if TYPE_CHECKING:
from tractor._spawn import ProcessType
from tractor import (
Actor,
ActorNursery,
)
@trio.lowlevel.disable_ki_protection @trio.lowlevel.disable_ki_protection
def dump_task_tree() -> None: def dump_task_tree() -> None:
@ -41,9 +60,15 @@ def dump_task_tree() -> None:
recurse_child_tasks=True recurse_child_tasks=True
) )
) )
log = get_console_log('cancel') log = get_console_log(
log.pdb( name=__name__,
f'Dumping `stackscope` tree:\n\n' level='cancel',
)
actor: Actor = _state.current_actor()
log.devx(
f'Dumping `stackscope` tree for actor\n'
f'{actor.name}: {actor}\n'
f' |_{mp.current_process()}\n\n'
f'{tree_str}\n' f'{tree_str}\n'
) )
# import logging # import logging
@ -56,8 +81,13 @@ def dump_task_tree() -> None:
# ).exception("Error printing task tree") # ).exception("Error printing task tree")
def signal_handler(sig: int, frame: object) -> None: def signal_handler(
import traceback sig: int,
frame: object,
relay_to_subs: bool = True,
) -> None:
try: try:
trio.lowlevel.current_trio_token( trio.lowlevel.current_trio_token(
).run_sync_soon(dump_task_tree) ).run_sync_soon(dump_task_tree)
@ -65,6 +95,26 @@ def signal_handler(sig: int, frame: object) -> None:
# not in async context -- print a normal traceback # not in async context -- print a normal traceback
traceback.print_stack() traceback.print_stack()
if not relay_to_subs:
return
an: ActorNursery
for an in _state.current_actor()._actoruid2nursery.values():
subproc: ProcessType
subactor: Actor
for subactor, subproc, _ in an._children.values():
log.devx(
f'Relaying `SIGUSR1`[{sig}] to sub-actor\n'
f'{subactor}\n'
f' |_{subproc}\n'
)
if isinstance(subproc, trio.Process):
subproc.send_signal(sig)
elif isinstance(subproc, mp.Process):
subproc._send_signal(sig)
def enable_stack_on_sig( def enable_stack_on_sig(
@ -82,3 +132,6 @@ def enable_stack_on_sig(
# NOTE: not the above can be triggered from # NOTE: not the above can be triggered from
# a (xonsh) shell using: # a (xonsh) shell using:
# kill -SIGUSR1 @$(pgrep -f '<cmd>') # kill -SIGUSR1 @$(pgrep -f '<cmd>')
#
# for example if you were looking to trace a `pytest` run
# kill -SIGUSR1 @$(pgrep -f 'pytest')

View File

@ -21,6 +21,11 @@ Log like a forester!
from collections.abc import Mapping from collections.abc import Mapping
import sys import sys
import logging import logging
from logging import (
LoggerAdapter,
Logger,
StreamHandler,
)
import colorlog # type: ignore import colorlog # type: ignore
import trio import trio
@ -48,20 +53,19 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S' DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS: dict[str, int] = { # FYI, ERROR is 40
CUSTOM_LEVELS: dict[str, int] = {
'TRANSPORT': 5, 'TRANSPORT': 5,
'RUNTIME': 15, 'RUNTIME': 15,
'CANCEL': 16, 'DEVX': 17,
'CANCEL': 18,
'PDB': 500, 'PDB': 500,
} }
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
# }
STD_PALETTE = { STD_PALETTE = {
'CRITICAL': 'red', 'CRITICAL': 'red',
'ERROR': 'red', 'ERROR': 'red',
'PDB': 'white', 'PDB': 'white',
'DEVX': 'cyan',
'WARNING': 'yellow', 'WARNING': 'yellow',
'INFO': 'green', 'INFO': 'green',
'CANCEL': 'yellow', 'CANCEL': 'yellow',
@ -78,7 +82,7 @@ BOLD_PALETTE = {
# TODO: this isn't showing the correct '{filename}' # TODO: this isn't showing the correct '{filename}'
# as it did before.. # as it did before..
class StackLevelAdapter(logging.LoggerAdapter): class StackLevelAdapter(LoggerAdapter):
def transport( def transport(
self, self,
@ -86,7 +90,8 @@ class StackLevelAdapter(logging.LoggerAdapter):
) -> None: ) -> None:
''' '''
IPC level msg-ing. IPC transport level msg IO; generally anything below
`._ipc.Channel` and friends.
''' '''
return self.log(5, msg) return self.log(5, msg)
@ -102,11 +107,11 @@ class StackLevelAdapter(logging.LoggerAdapter):
msg: str, msg: str,
) -> None: ) -> None:
''' '''
Cancellation logging, mostly for runtime reporting. Cancellation sequencing, mostly for runtime reporting.
''' '''
return self.log( return self.log(
level=16, level=22,
msg=msg, msg=msg,
# stacklevel=4, # stacklevel=4,
) )
@ -116,11 +121,21 @@ class StackLevelAdapter(logging.LoggerAdapter):
msg: str, msg: str,
) -> None: ) -> None:
''' '''
Debugger logging. `pdb`-REPL (debugger) related statuses.
''' '''
return self.log(500, msg) return self.log(500, msg)
def devx(
self,
msg: str,
) -> None:
'''
"Developer experience" sub-sys statuses.
'''
return self.log(17, msg)
def log( def log(
self, self,
level, level,
@ -136,8 +151,7 @@ class StackLevelAdapter(logging.LoggerAdapter):
if self.isEnabledFor(level): if self.isEnabledFor(level):
stacklevel: int = 3 stacklevel: int = 3
if ( if (
level in LEVELS.values() level in CUSTOM_LEVELS.values()
# or level in _custom_levels
): ):
stacklevel: int = 4 stacklevel: int = 4
@ -184,8 +198,30 @@ class StackLevelAdapter(logging.LoggerAdapter):
) )
# TODO IDEAs:
# -[ ] move to `.devx.pformat`?
# -[ ] do per task-name and actor-name color coding
# -[ ] unique color per task-id and actor-uuid
def pformat_task_uid(
id_part: str = 'tail'
):
'''
Return `str`-ified unique for a `trio.Task` via a combo of its
`.name: str` and `id()` truncated output.
'''
task: trio.Task = trio.lowlevel.current_task()
tid: str = str(id(task))
if id_part == 'tail':
tid_part: str = tid[-6:]
else:
tid_part: str = tid[:6]
return f'{task.name}[{tid_part}]'
_conc_name_getters = { _conc_name_getters = {
'task': lambda: trio.lowlevel.current_task().name, 'task': pformat_task_uid,
'actor': lambda: current_actor(), 'actor': lambda: current_actor(),
'actor_name': lambda: current_actor().name, 'actor_name': lambda: current_actor().name,
'actor_uid': lambda: current_actor().uid[1][:6], 'actor_uid': lambda: current_actor().uid[1][:6],
@ -193,7 +229,10 @@ _conc_name_getters = {
class ActorContextInfo(Mapping): class ActorContextInfo(Mapping):
"Dyanmic lookup for local actor and task names" '''
Dyanmic lookup for local actor and task names.
'''
_context_keys = ( _context_keys = (
'task', 'task',
'actor', 'actor',
@ -224,6 +263,7 @@ def get_logger(
'''Return the package log or a sub-logger for ``name`` if provided. '''Return the package log or a sub-logger for ``name`` if provided.
''' '''
log: Logger
log = rlog = logging.getLogger(_root_name) log = rlog = logging.getLogger(_root_name)
if ( if (
@ -266,7 +306,7 @@ def get_logger(
logger = StackLevelAdapter(log, ActorContextInfo()) logger = StackLevelAdapter(log, ActorContextInfo())
# additional levels # additional levels
for name, val in LEVELS.items(): for name, val in CUSTOM_LEVELS.items():
logging.addLevelName(val, name) logging.addLevelName(val, name)
# ensure customs levels exist as methods # ensure customs levels exist as methods
@ -278,7 +318,7 @@ def get_logger(
def get_console_log( def get_console_log(
level: str | None = None, level: str | None = None,
**kwargs, **kwargs,
) -> logging.LoggerAdapter: ) -> LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr. '''Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it. Yeah yeah, i know we can use ``DictConfig``. You do it.
@ -303,7 +343,7 @@ def get_console_log(
None, None,
) )
): ):
handler = logging.StreamHandler() handler = StreamHandler()
formatter = colorlog.ColoredFormatter( formatter = colorlog.ColoredFormatter(
LOG_FORMAT, LOG_FORMAT,
datefmt=DATE_FORMAT, datefmt=DATE_FORMAT,
@ -323,3 +363,19 @@ def get_loglevel() -> str:
# global module logger for tractor itself # global module logger for tractor itself
log = get_logger('tractor') log = get_logger('tractor')
def at_least_level(
log: Logger|LoggerAdapter,
level: int|str,
) -> bool:
'''
Predicate to test if a given level is active.
'''
if isinstance(level, str):
level: int = CUSTOM_LEVELS[level.upper()]
if log.getEffectiveLevel() <= level:
return True
return False

View File

@ -33,10 +33,14 @@ from typing import (
import trio import trio
from outcome import Error from outcome import Error
from .log import get_logger from tractor.log import get_logger
from ._state import current_actor from tractor._state import (
from ._exceptions import AsyncioCancelled current_actor,
from .trionics._broadcast import ( debug_mode,
)
from tractor.devx import _debug
from tractor._exceptions import AsyncioCancelled
from tractor.trionics._broadcast import (
broadcast_receiver, broadcast_receiver,
BroadcastReceiver, BroadcastReceiver,
) )
@ -64,9 +68,9 @@ class LinkedTaskChannel(trio.abc.Channel):
_trio_exited: bool = False _trio_exited: bool = False
# set after ``asyncio.create_task()`` # set after ``asyncio.create_task()``
_aio_task: asyncio.Task | None = None _aio_task: asyncio.Task|None = None
_aio_err: BaseException | None = None _aio_err: BaseException|None = None
_broadcaster: BroadcastReceiver | None = None _broadcaster: BroadcastReceiver|None = None
async def aclose(self) -> None: async def aclose(self) -> None:
await self._from_aio.aclose() await self._from_aio.aclose()
@ -158,7 +162,9 @@ def _run_asyncio_task(
''' '''
__tracebackhide__ = True __tracebackhide__ = True
if not current_actor().is_infected_aio(): if not current_actor().is_infected_aio():
raise RuntimeError("`infect_asyncio` mode is not enabled!?") raise RuntimeError(
"`infect_asyncio` mode is not enabled!?"
)
# ITC (inter task comms), these channel/queue names are mostly from # ITC (inter task comms), these channel/queue names are mostly from
# ``asyncio``'s perspective. # ``asyncio``'s perspective.
@ -187,7 +193,7 @@ def _run_asyncio_task(
cancel_scope = trio.CancelScope() cancel_scope = trio.CancelScope()
aio_task_complete = trio.Event() aio_task_complete = trio.Event()
aio_err: BaseException | None = None aio_err: BaseException|None = None
chan = LinkedTaskChannel( chan = LinkedTaskChannel(
aio_q, # asyncio.Queue aio_q, # asyncio.Queue
@ -253,7 +259,7 @@ def _run_asyncio_task(
if not inspect.isawaitable(coro): if not inspect.isawaitable(coro):
raise TypeError(f"No support for invoking {coro}") raise TypeError(f"No support for invoking {coro}")
task = asyncio.create_task( task: asyncio.Task = asyncio.create_task(
wait_on_coro_final_result( wait_on_coro_final_result(
to_trio, to_trio,
coro, coro,
@ -262,6 +268,18 @@ def _run_asyncio_task(
) )
chan._aio_task = task chan._aio_task = task
# XXX TODO XXX get this actually workin.. XD
# maybe setup `greenback` for `asyncio`-side task REPLing
if (
debug_mode()
and
(greenback := _debug.maybe_import_greenback(
force_reload=True,
raise_not_found=False,
))
):
greenback.bestow_portal(task)
def cancel_trio(task: asyncio.Task) -> None: def cancel_trio(task: asyncio.Task) -> None:
''' '''
Cancel the calling ``trio`` task on error. Cancel the calling ``trio`` task on error.
@ -269,7 +287,7 @@ def _run_asyncio_task(
''' '''
nonlocal chan nonlocal chan
aio_err = chan._aio_err aio_err = chan._aio_err
task_err: BaseException | None = None task_err: BaseException|None = None
# only to avoid ``asyncio`` complaining about uncaptured # only to avoid ``asyncio`` complaining about uncaptured
# task exceptions # task exceptions
@ -349,11 +367,11 @@ async def translate_aio_errors(
''' '''
trio_task = trio.lowlevel.current_task() trio_task = trio.lowlevel.current_task()
aio_err: BaseException | None = None aio_err: BaseException|None = None
# TODO: make thisi a channel method? # TODO: make thisi a channel method?
def maybe_raise_aio_err( def maybe_raise_aio_err(
err: Exception | None = None err: Exception|None = None
) -> None: ) -> None:
aio_err = chan._aio_err aio_err = chan._aio_err
if ( if (
@ -531,6 +549,16 @@ def run_as_asyncio_guest(
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future() trio_done_fut = asyncio.Future()
if debug_mode():
# XXX make it obvi we know this isn't supported yet!
log.error(
'Attempting to enter unsupported `greenback` init '
'from `asyncio` task..'
)
await _debug.maybe_init_greenback(
force_reload=True,
)
def trio_done_callback(main_outcome): def trio_done_callback(main_outcome):
if isinstance(main_outcome, Error): if isinstance(main_outcome, Error):