Compare commits

...

2 Commits

Author SHA1 Message Date
Tyler Goodlet a69bc00593 First draft, `asyncio`-task, sync-pausing Bo
Mostly due to magic from @oremanj (a super-end-level-boss) where we slap
in a little bit of `.from_asyncio`-type stuff to run a `trio`-task from
`asyncio` code

I'm not gonna go into tooo too much detail but basically the primary
thing needed was a way to (blocking-ly) invoke a `trio.lowlevel.Task`
from an `asyncio.Task` (which we now have with a new
`run_trio_task_in_future()` thanks to the "aforementioned jefe") which
we now invoke from a dedicated aio case-branch inside
`.devx._debug.pause_from_sync()`. Further include a case inside
`DebugStatus.release()` to handle using the same func to set the
`repl_release: trio.Event` from the `asyncio` side when releasing the
REPL.

Prolly more refinements to come ;{o
2024-07-13 00:16:28 -04:00
Tyler Goodlet 1f1a3f19d5 Fix multi-daemon debug test `break` signal..
It was expecting `AssertionError` as a proceed-in-test signal (by
breaking from a continue loop), but `in_prompt_msg(raise_on_err=True)`
was changed to raise `ValueError`; so instead just use as a predicate
for the `break`.

Also rework `in_prompt_msg()` to accept the `child: BaseSpawn` as input
instead of `before: str` remove the casting boilerplate, and adjust all
usage to match.
2024-07-12 15:57:41 -04:00
7 changed files with 325 additions and 147 deletions

View File

@ -2,7 +2,10 @@ import asyncio
import trio
import tractor
from tractor import to_asyncio
from tractor import (
to_asyncio,
Portal,
)
async def aio_sleep_forever():
@ -43,7 +46,7 @@ async def bp_then_error(
@tractor.context
async def trio_ctx(
ctx: tractor.Context,
bp_before_started: bool = False,
bp_before_started: bool = True,
):
# this will block until the ``asyncio`` task sends a "first"
@ -57,7 +60,6 @@ async def trio_ctx(
trio.open_nursery() as n,
):
assert first == 'start'
if bp_before_started:
@ -73,15 +75,18 @@ async def trio_ctx(
async def main(
bps_all_over: bool = False,
bps_all_over: bool = True,
) -> None:
async with tractor.open_nursery(
# debug_mode=True,
debug_mode=True,
maybe_enable_greenback=True,
# loglevel='devx',
# loglevel='runtime',
) as n:
p = await n.start_actor(
ptl: Portal = await n.start_actor(
'aio_daemon',
enable_modules=[__name__],
infect_asyncio=True,
@ -89,7 +94,7 @@ async def main(
loglevel='cancel',
)
async with p.open_context(
async with ptl.open_context(
trio_ctx,
bp_before_started=bps_all_over,
) as (ctx, first):
@ -105,7 +110,7 @@ async def main(
# TODO: case where we cancel from trio-side while asyncio task
# has debugger lock?
# await p.cancel_actor()
# await ptl.cancel_actor()
if __name__ == '__main__':

View File

@ -25,7 +25,8 @@ async def main():
"""
async with tractor.open_nursery(
debug_mode=True,
loglevel='cancel',
# loglevel='cancel',
# loglevel='devx',
) as n:
p0 = await n.start_actor('bp_forever', enable_modules=[__name__])

View File

@ -10,6 +10,7 @@ import pytest
from pexpect.exceptions import (
TIMEOUT,
)
from pexpect.spawnbase import SpawnBase
from tractor._testing import (
mk_cmd,
)
@ -107,7 +108,7 @@ def expect(
def in_prompt_msg(
prompt: str,
child: SpawnBase,
parts: list[str],
pause_on_false: bool = False,
@ -125,18 +126,20 @@ def in_prompt_msg(
'''
__tracebackhide__: bool = False
before: str = str(child.before.decode())
for part in parts:
if part not in prompt:
if part not in before:
if pause_on_false:
import pdbp
pdbp.set_trace()
if print_prompt_on_false:
print(prompt)
print(before)
if err_on_false:
raise ValueError(
f'Could not find pattern: {part!r} in `before` output?'
f'Could not find pattern in `before` output?\n'
f'part: {part!r}\n'
)
return False
@ -147,7 +150,7 @@ def in_prompt_msg(
# against call stack frame output from the the 'll' command the like!
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
def assert_before(
child,
child: SpawnBase,
patts: list[str],
**kwargs,
@ -155,10 +158,8 @@ def assert_before(
) -> None:
__tracebackhide__: bool = False
# as in before the prompt end
before: str = str(child.before.decode())
assert in_prompt_msg(
prompt=before,
child=child,
parts=patts,
# since this is an "assert" helper ;)

View File

@ -96,14 +96,15 @@ def test_root_actor_error(
# scan for the prompt
expect(child, PROMPT)
before = str(child.before.decode())
# make sure expected logging and error arrives
assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
child,
[
_crash_msg,
"('root'",
'AssertionError',
]
)
assert 'AssertionError' in before
# send user command
child.sendline(user_input)
@ -243,10 +244,12 @@ def test_subactor_error(
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
child,
[
_crash_msg,
"('name_error'",
]
)
if do_next:
@ -265,17 +268,15 @@ def test_subactor_error(
child.sendline('continue')
child.expect(PROMPT)
before = str(child.before.decode())
# root actor gets debugger engaged
assert in_prompt_msg(
before,
[_crash_msg, "('root'"]
)
# error is a remote error propagated from the subactor
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
child,
[
_crash_msg,
# root actor gets debugger engaged
"('root'",
# error is a remote error propagated from the subactor
"('name_error'",
]
)
# another round
@ -296,14 +297,11 @@ def test_subactor_breakpoint(
"Single subactor with an infinite breakpoint loop"
child = spawn('subactor_breakpoint')
# scan for the prompt
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_pause_msg, "('breakpoint_forever'"]
child,
[_pause_msg,
"('breakpoint_forever'",]
)
# do some "next" commands to demonstrate recurrent breakpoint
@ -319,9 +317,8 @@ def test_subactor_breakpoint(
for _ in range(5):
child.sendline('continue')
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
[_pause_msg, "('breakpoint_forever'"]
)
@ -334,9 +331,8 @@ def test_subactor_breakpoint(
# child process should exit but parent will capture pdb.BdbQuit
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
['RemoteActorError:',
"('breakpoint_forever'",
'bdb.BdbQuit',]
@ -351,9 +347,8 @@ def test_subactor_breakpoint(
# process should exit
child.expect(EOF)
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
['RemoteActorError:',
"('breakpoint_forever'",
'bdb.BdbQuit',]
@ -377,7 +372,7 @@ def test_multi_subactors(
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
[_pause_msg, "('breakpoint_forever'"]
)
@ -398,12 +393,14 @@ def test_multi_subactors(
# first name_error failure
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
[_crash_msg, "('name_error'"]
child,
[
_crash_msg,
"('name_error'",
"NameError",
]
)
assert "NameError" in before
if ctlc:
do_ctlc(child)
@ -427,9 +424,8 @@ def test_multi_subactors(
# breakpoint loop should re-engage
child.sendline('c')
child.expect(PROMPT)
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
[_pause_msg, "('breakpoint_forever'"]
)
@ -522,25 +518,28 @@ def test_multi_daemon_subactors(
# the root's tty lock first so anticipate either crash
# message on the first entry.
bp_forev_parts = [_pause_msg, "('bp_forever'"]
bp_forev_parts = [
_pause_msg,
"('bp_forever'",
]
bp_forev_in_msg = partial(
in_prompt_msg,
parts=bp_forev_parts,
)
name_error_msg = "NameError: name 'doggypants' is not defined"
name_error_parts = [name_error_msg]
name_error_msg: str = "NameError: name 'doggypants' is not defined"
name_error_parts: list[str] = [name_error_msg]
before = str(child.before.decode())
if bp_forev_in_msg(prompt=before):
if bp_forev_in_msg(child=child):
next_parts = name_error_parts
elif name_error_msg in before:
next_parts = bp_forev_parts
else:
raise ValueError("Neither log msg was found !?")
raise ValueError('Neither log msg was found !?')
if ctlc:
do_ctlc(child)
@ -609,14 +608,12 @@ def test_multi_daemon_subactors(
# wait for final error in root
# where it crashs with boxed error
while True:
try:
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
bp_forev_parts
)
except AssertionError:
child.sendline('c')
child.expect(PROMPT)
if not in_prompt_msg(
child,
bp_forev_parts
):
break
assert_before(
@ -797,10 +794,13 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
child = spawn('root_cancelled_but_child_is_in_tty_lock')
child.expect(PROMPT)
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before
assert_before(
child,
[
"NameError: name 'doggypants' is not defined",
"tractor._exceptions.RemoteActorError: ('name_error'",
],
)
time.sleep(0.5)
if ctlc:
@ -891,9 +891,8 @@ def test_different_debug_mode_per_actor(
child.expect(PROMPT)
# only one actor should enter the debugger
before = str(child.before.decode())
assert in_prompt_msg(
before,
child,
[_crash_msg, "('debugged_boi'", "RuntimeError"],
)
@ -903,8 +902,6 @@ def test_different_debug_mode_per_actor(
child.sendline('c')
child.expect(EOF)
before = str(child.before.decode())
# NOTE: this debugged actor error currently WON'T show up since the
# root will actually cancel and terminate the nursery before the error
# msg reported back from the debug mode actor is processed.
@ -956,9 +953,8 @@ def test_pause_from_sync(
child.expect(PROMPT)
# XXX shouldn't see gb loaded message with PDB loglevel!
before = str(child.before.decode())
assert not in_prompt_msg(
before,
child,
['`greenback` portal opened!'],
)
# should be same root task
@ -1039,7 +1035,7 @@ def test_pause_from_sync(
# at the same time as the one that was detected above.
for key, other_patts in attach_patts.copy().items():
assert not in_prompt_msg(
before,
child,
other_patts,
)

View File

@ -291,7 +291,7 @@ def test_context_spawns_aio_task_that_errors(
err = excinfo.value
assert isinstance(err, expect)
assert err.boxed_type == AssertionError
assert err.boxed_type is AssertionError
async def aio_cancel():
@ -497,7 +497,7 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
trio.run(main)
# ensure boxed error type
excinfo.value.boxed_type == Exception
excinfo.value.boxed_type is Exception
def test_trio_closes_early_and_channel_exits(reg_addr):
@ -533,7 +533,7 @@ def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
) as excinfo:
trio.run(main)
excinfo.value.boxed_type == Exception
excinfo.value.boxed_type is Exception
@tractor.context

View File

@ -20,6 +20,7 @@ Multi-core debugging for da peeps!
"""
from __future__ import annotations
import asyncio
import bdb
from contextlib import (
asynccontextmanager as acm,
@ -67,6 +68,7 @@ from trio import (
TaskStatus,
)
import tractor
from tractor.to_asyncio import run_trio_task_in_future
from tractor.log import get_logger
from tractor._context import Context
from tractor import _state
@ -296,7 +298,7 @@ class Lock:
)
@classmethod
@pdbp.hideframe
# @pdbp.hideframe
def release(
cls,
raise_on_thread: bool = True,
@ -310,39 +312,40 @@ class Lock:
we_released: bool = False
ctx_in_debug: Context|None = cls.ctx_in_debug
repl_task: Task|Thread|None = DebugStatus.repl_task
if not DebugStatus.is_main_trio_thread():
thread: threading.Thread = threading.current_thread()
message: str = (
'`Lock.release()` can not be called from a non-main-`trio` thread!\n'
f'{thread}\n'
)
if raise_on_thread:
raise RuntimeError(message)
log.devx(message)
return False
task: Task = current_task()
# sanity check that if we're the root actor
# the lock is marked as such.
# note the pre-release value may be diff the the
# post-release task.
if repl_task is task:
assert cls._owned_by_root
message: str = (
'TTY lock held by root-actor on behalf of local task\n'
f'|_{repl_task}\n'
)
else:
assert DebugStatus.repl_task is not task
message: str = (
'TTY lock was NOT released on behalf of caller\n'
f'|_{task}\n'
)
try:
if not DebugStatus.is_main_trio_thread():
thread: threading.Thread = threading.current_thread()
message: str = (
'`Lock.release()` can not be called from a non-main-`trio` thread!\n'
f'{thread}\n'
)
if raise_on_thread:
raise RuntimeError(message)
log.devx(message)
return False
task: Task = current_task()
# sanity check that if we're the root actor
# the lock is marked as such.
# note the pre-release value may be diff the the
# post-release task.
if repl_task is task:
assert cls._owned_by_root
message: str = (
'TTY lock held by root-actor on behalf of local task\n'
f'|_{repl_task}\n'
)
else:
assert DebugStatus.repl_task is not task
message: str = (
'TTY lock was NOT released on behalf of caller\n'
f'|_{task}\n'
)
lock: trio.StrictFIFOLock = cls._debug_lock
owner: Task = lock.statistics().owner
if (
@ -788,7 +791,14 @@ class DebugStatus:
# in which case schedule the SIGINT shielding override
# to in the main thread.
# https://docs.python.org/3/library/signal.html#signals-and-threads
if not cls.is_main_trio_thread():
if (
not cls.is_main_trio_thread()
and
not _state._runtime_vars.get(
'_is_infected_aio',
False,
)
):
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
signal.signal,
signal.SIGINT,
@ -813,7 +823,16 @@ class DebugStatus:
# always restore ``trio``'s sigint handler. see notes below in
# the pdb factory about the nightmare that is that code swapping
# out the handler when the repl activates...
if not cls.is_main_trio_thread():
# if not cls.is_main_trio_thread():
if (
not cls.is_main_trio_thread()
and
# not _state._runtime_vars.get(
# '_is_infected_aio',
# False,
# )
not current_actor().is_infected_aio()
):
trio.from_thread.run_sync(
signal.signal,
signal.SIGINT,
@ -871,7 +890,7 @@ class DebugStatus:
return False
@classmethod
@pdbp.hideframe
# @pdbp.hideframe
def release(
cls,
cancel_req_task: bool = False,
@ -880,11 +899,21 @@ class DebugStatus:
try:
# sometimes the task might already be terminated in
# which case this call will raise an RTE?
if (
repl_release is not None
):
if repl_release is not None:
if cls.is_main_trio_thread():
repl_release.set()
elif current_actor().is_infected_aio():
async def _set_repl_release():
repl_release.set()
fute: asyncio.Future = run_trio_task_in_future(
_set_repl_release
)
if not fute.done():
log.warning('REPL release state unknown..?')
else:
# XXX NOTE ONLY used for bg root-actor sync
# threads, see `.pause_from_sync()`.
@ -1658,18 +1687,24 @@ async def _pause(
try:
task: Task = current_task()
except RuntimeError as rte:
# NOTE, 2 cases we might get here:
#
# - ACTUALLY not a `trio.lowlevel.Task` nor runtime caller,
# |_ error out as normal
#
# - an infected `asycio` actor calls it from an actual
# `asyncio.Task`
# |_ in this case we DO NOT want to RTE!
__tracebackhide__: bool = False
log.exception(
'Failed to get current `trio`-task?'
)
# if actor.is_infected_aio():
# mk_pdb().set_trace()
# raise RuntimeError(
# '`tractor.pause[_from_sync]()` not yet supported '
# 'directly (infected) `asyncio` tasks!'
# ) from rte
raise rte
if actor.is_infected_aio():
log.exception(
'Failed to get current `trio`-task?'
)
raise RuntimeError(
'An `asyncio` task should not be calling this!?'
) from rte
else:
task = asyncio.current_task()
if debug_func is not None:
debug_func = partial(debug_func)
@ -2060,7 +2095,8 @@ async def _pause(
f'on behalf of {repl_task} ??\n'
)
DebugStatus.release(cancel_req_task=True)
if not actor.is_infected_aio():
DebugStatus.release(cancel_req_task=True)
# sanity checks for ^ on request/status teardown
# assert DebugStatus.repl is None # XXX no more bc bg thread cases?
@ -2113,7 +2149,9 @@ def _set_trace(
log.pdb(
f'{_pause_msg}\n'
f'>(\n'
f'|_ {task} @ {actor.uid}\n'
f'|_{actor.uid}\n'
f' |_{task}\n' # @ {actor.uid}\n'
# f'|_{task}\n'
# ^-TODO-^ more compact pformating?
# -[ ] make an `Actor.__repr()__`
# -[ ] should we use `log.pformat_task_uid()`?
@ -2390,9 +2428,6 @@ def pause_from_sync(
actor: tractor.Actor = current_actor(
err_on_no_runtime=False,
)
message: str = (
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
)
if not actor:
raise RuntimeError(
'Not inside the `tractor`-runtime?\n'
@ -2400,6 +2435,9 @@ def pause_from_sync(
'- `async with tractor.open_nursery()` or,\n'
'- `async with tractor.open_root_actor()`\n'
)
message: str = (
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
)
# TODO: once supported, remove this AND the one
# inside `._pause()`!
@ -2409,16 +2447,17 @@ def pause_from_sync(
# injection?
# -[ ] should `breakpoint()` work and what does it normally
# do in `asyncio` ctxs?
if actor.is_infected_aio():
raise RuntimeError(
'`tractor.pause[_from_sync]()` not yet supported '
'for infected `asyncio` mode!'
)
# if actor.is_infected_aio():
# raise RuntimeError(
# '`tractor.pause[_from_sync]()` not yet supported '
# 'for infected `asyncio` mode!'
# )
repl: PdbREPL = mk_pdb()
# message += f'-> created local REPL {repl}\n'
is_root: bool = is_root_process()
is_aio: bool = actor.is_infected_aio()
# TODO: we could also check for a non-`.to_thread` context
# using `trio.from_thread.check_cancelled()` (says
@ -2431,8 +2470,11 @@ def pause_from_sync(
# when called from a (bg) thread, run an async task in a new
# thread which will call `._pause()` manually with special
# handling for root-actor caller usage.
if not DebugStatus.is_main_trio_thread():
if (
not DebugStatus.is_main_trio_thread()
and
not is_aio # see below for this usage
):
# TODO: `threading.Lock()` this so we don't get races in
# multi-thr cases where they're acquiring/releasing the
# REPL and setting request/`Lock` state, etc..
@ -2440,10 +2482,21 @@ def pause_from_sync(
repl_owner = thread
# TODO: make root-actor bg thread usage work!
if is_root:
if (
is_root
# or
# is_aio
):
if is_root:
message += (
f'-> called from a root-actor bg {thread}\n'
)
elif is_aio:
message += (
f'-> called from a `asyncio`-task bg {thread}\n'
)
message += (
f'-> called from a root-actor bg {thread}\n'
f'-> scheduling `._pause_from_bg_root_thread()`..\n'
'-> scheduling `._pause_from_bg_root_thread()`..\n'
)
# XXX SUBTLE BADNESS XXX that should really change!
# don't over-write the `repl` here since when
@ -2462,7 +2515,8 @@ def pause_from_sync(
hide_tb=hide_tb,
**_pause_kwargs,
),
)
),
trio_token=trio.lowlevel.current_trio_token(),
)
DebugStatus.shield_sigint()
message += (
@ -2495,6 +2549,29 @@ def pause_from_sync(
DebugStatus.shield_sigint()
assert bg_task is not DebugStatus.repl_task
elif is_aio:
greenback: ModuleType = maybe_import_greenback()
repl_owner: Task = asyncio.current_task()
fute: asyncio.Future = run_trio_task_in_future(
partial(
_pause,
debug_func=None,
repl=repl,
hide_tb=hide_tb,
# XXX to prevent `._pause()` for setting
# `DebugStatus.repl_task` to the gb task!
called_from_sync=True,
called_from_bg_thread=True,
**_pause_kwargs
)
)
# TODO: for async version -> `.pause_from_aio()`?
# bg_task, _ = await fute
bg_task, _ = greenback.await_(fute)
bg_task: asyncio.Task = asyncio.current_task()
else: # we are presumably the `trio.run()` + main thread
# raises on not-found by default
greenback: ModuleType = maybe_import_greenback()
@ -2509,8 +2586,8 @@ def pause_from_sync(
# NOTE XXX seems to need to be set BEFORE the `_pause()`
# invoke using gb below?
DebugStatus.shield_sigint()
repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
try:
out = greenback.await_(
@ -2572,6 +2649,10 @@ def pause_from_sync(
# -[ ] tried to use `@pdbp.hideframe` decoration but
# still doesn't work
except BaseException as err:
log.exception(
'Failed to sync-pause from\n\n'
f'{repl_owner}\n'
)
__tracebackhide__: bool = False
raise err

View File

@ -562,6 +562,100 @@ class AsyncioRuntimeTranslationError(RuntimeError):
'''
def run_trio_task_in_future(
async_fn,
*args,
) -> asyncio.Future:
'''
Run an async-func as a `trio` task from an `asyncio.Task` wrapped
in a `asyncio.Future` which is returned to the caller.
Another astounding feat by the great @oremanj !!
Bo
'''
result_future = asyncio.Future()
cancel_scope = trio.CancelScope()
finished: bool = False
# Monkeypatch the returned future's cancel() method to forward
# cancellation to the Trio task
cancel_message = None
orig_cancel = result_future.cancel
def wrapped_cancel(msg=None):
nonlocal cancel_message
if finished:
# We're being called back after the task completed
if msg is not None:
return orig_cancel(msg)
elif cancel_message is not None:
return orig_cancel(cancel_message)
else:
return orig_cancel()
if result_future.done():
return False
# Forward cancellation to the Trio task, don't mark
# future as cancelled until it completes
cancel_message = msg
cancel_scope.cancel()
return True
result_future.cancel = wrapped_cancel
# End of monkeypatching
async def trio_task() -> None:
nonlocal finished
try:
with cancel_scope:
try:
# TODO: type this with new tech in 3.13
result: Any = await async_fn(*args)
finally:
finished = True
# Propagate result or cancellation to the Future
if cancel_scope.cancelled_caught:
result_future.cancel()
elif not result_future.cancelled():
result_future.set_result(result)
except BaseException as exc:
# The result future gets all the non-Cancelled
# exceptions. Any Cancelled need to keep propagating
# out of this stack frame in order to reach the cancel
# scope for which they're intended.
cancelled: BaseException|None
rest: BaseException|None
if isinstance(exc, BaseExceptionGroup):
cancelled, rest = exc.split(trio.Cancelled)
elif isinstance(exc, trio.Cancelled):
cancelled, rest = exc, None
else:
cancelled, rest = None, exc
if not result_future.cancelled():
if rest:
result_future.set_exception(rest)
else:
result_future.cancel()
if cancelled:
raise cancelled
trio.lowlevel.spawn_system_task(
trio_task,
name=async_fn,
)
return result_future
def run_as_asyncio_guest(
trio_main: Callable,
# ^-NOTE-^ when spawned with `infected_aio=True` this func is