First draft, `asyncio`-task, sync-pausing Bo
Mostly due to magic from @oremanj (a super-end-level-boss) where we slap in a little bit of `.from_asyncio`-type stuff to run a `trio`-task from `asyncio` code I'm not gonna go into tooo too much detail but basically the primary thing needed was a way to (blocking-ly) invoke a `trio.lowlevel.Task` from an `asyncio.Task` (which we now have with a new `run_trio_task_in_future()` thanks to the "aforementioned jefe") which we now invoke from a dedicated aio case-branch inside `.devx._debug.pause_from_sync()`. Further include a case inside `DebugStatus.release()` to handle using the same func to set the `repl_release: trio.Event` from the `asyncio` side when releasing the REPL. Prolly more refinements to come ;{o
parent
1f1a3f19d5
commit
a69bc00593
|
@ -2,7 +2,10 @@ import asyncio
|
|||
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import to_asyncio
|
||||
from tractor import (
|
||||
to_asyncio,
|
||||
Portal,
|
||||
)
|
||||
|
||||
|
||||
async def aio_sleep_forever():
|
||||
|
@ -43,7 +46,7 @@ async def bp_then_error(
|
|||
@tractor.context
|
||||
async def trio_ctx(
|
||||
ctx: tractor.Context,
|
||||
bp_before_started: bool = False,
|
||||
bp_before_started: bool = True,
|
||||
):
|
||||
|
||||
# this will block until the ``asyncio`` task sends a "first"
|
||||
|
@ -57,7 +60,6 @@ async def trio_ctx(
|
|||
|
||||
trio.open_nursery() as n,
|
||||
):
|
||||
|
||||
assert first == 'start'
|
||||
|
||||
if bp_before_started:
|
||||
|
@ -73,15 +75,18 @@ async def trio_ctx(
|
|||
|
||||
|
||||
async def main(
|
||||
bps_all_over: bool = False,
|
||||
bps_all_over: bool = True,
|
||||
|
||||
) -> None:
|
||||
|
||||
async with tractor.open_nursery(
|
||||
# debug_mode=True,
|
||||
debug_mode=True,
|
||||
maybe_enable_greenback=True,
|
||||
# loglevel='devx',
|
||||
# loglevel='runtime',
|
||||
) as n:
|
||||
|
||||
p = await n.start_actor(
|
||||
ptl: Portal = await n.start_actor(
|
||||
'aio_daemon',
|
||||
enable_modules=[__name__],
|
||||
infect_asyncio=True,
|
||||
|
@ -89,7 +94,7 @@ async def main(
|
|||
loglevel='cancel',
|
||||
)
|
||||
|
||||
async with p.open_context(
|
||||
async with ptl.open_context(
|
||||
trio_ctx,
|
||||
bp_before_started=bps_all_over,
|
||||
) as (ctx, first):
|
||||
|
@ -105,7 +110,7 @@ async def main(
|
|||
|
||||
# TODO: case where we cancel from trio-side while asyncio task
|
||||
# has debugger lock?
|
||||
# await p.cancel_actor()
|
||||
# await ptl.cancel_actor()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -20,6 +20,7 @@ Multi-core debugging for da peeps!
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import bdb
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
|
@ -67,6 +68,7 @@ from trio import (
|
|||
TaskStatus,
|
||||
)
|
||||
import tractor
|
||||
from tractor.to_asyncio import run_trio_task_in_future
|
||||
from tractor.log import get_logger
|
||||
from tractor._context import Context
|
||||
from tractor import _state
|
||||
|
@ -296,7 +298,7 @@ class Lock:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
@pdbp.hideframe
|
||||
# @pdbp.hideframe
|
||||
def release(
|
||||
cls,
|
||||
raise_on_thread: bool = True,
|
||||
|
@ -310,39 +312,40 @@ class Lock:
|
|||
we_released: bool = False
|
||||
ctx_in_debug: Context|None = cls.ctx_in_debug
|
||||
repl_task: Task|Thread|None = DebugStatus.repl_task
|
||||
if not DebugStatus.is_main_trio_thread():
|
||||
thread: threading.Thread = threading.current_thread()
|
||||
message: str = (
|
||||
'`Lock.release()` can not be called from a non-main-`trio` thread!\n'
|
||||
f'{thread}\n'
|
||||
)
|
||||
if raise_on_thread:
|
||||
raise RuntimeError(message)
|
||||
|
||||
log.devx(message)
|
||||
return False
|
||||
|
||||
task: Task = current_task()
|
||||
|
||||
# sanity check that if we're the root actor
|
||||
# the lock is marked as such.
|
||||
# note the pre-release value may be diff the the
|
||||
# post-release task.
|
||||
if repl_task is task:
|
||||
assert cls._owned_by_root
|
||||
message: str = (
|
||||
'TTY lock held by root-actor on behalf of local task\n'
|
||||
f'|_{repl_task}\n'
|
||||
)
|
||||
else:
|
||||
assert DebugStatus.repl_task is not task
|
||||
|
||||
message: str = (
|
||||
'TTY lock was NOT released on behalf of caller\n'
|
||||
f'|_{task}\n'
|
||||
)
|
||||
|
||||
try:
|
||||
if not DebugStatus.is_main_trio_thread():
|
||||
thread: threading.Thread = threading.current_thread()
|
||||
message: str = (
|
||||
'`Lock.release()` can not be called from a non-main-`trio` thread!\n'
|
||||
f'{thread}\n'
|
||||
)
|
||||
if raise_on_thread:
|
||||
raise RuntimeError(message)
|
||||
|
||||
log.devx(message)
|
||||
return False
|
||||
|
||||
task: Task = current_task()
|
||||
|
||||
# sanity check that if we're the root actor
|
||||
# the lock is marked as such.
|
||||
# note the pre-release value may be diff the the
|
||||
# post-release task.
|
||||
if repl_task is task:
|
||||
assert cls._owned_by_root
|
||||
message: str = (
|
||||
'TTY lock held by root-actor on behalf of local task\n'
|
||||
f'|_{repl_task}\n'
|
||||
)
|
||||
else:
|
||||
assert DebugStatus.repl_task is not task
|
||||
|
||||
message: str = (
|
||||
'TTY lock was NOT released on behalf of caller\n'
|
||||
f'|_{task}\n'
|
||||
)
|
||||
|
||||
lock: trio.StrictFIFOLock = cls._debug_lock
|
||||
owner: Task = lock.statistics().owner
|
||||
if (
|
||||
|
@ -788,7 +791,14 @@ class DebugStatus:
|
|||
# in which case schedule the SIGINT shielding override
|
||||
# to in the main thread.
|
||||
# https://docs.python.org/3/library/signal.html#signals-and-threads
|
||||
if not cls.is_main_trio_thread():
|
||||
if (
|
||||
not cls.is_main_trio_thread()
|
||||
and
|
||||
not _state._runtime_vars.get(
|
||||
'_is_infected_aio',
|
||||
False,
|
||||
)
|
||||
):
|
||||
cls._orig_sigint_handler: Callable = trio.from_thread.run_sync(
|
||||
signal.signal,
|
||||
signal.SIGINT,
|
||||
|
@ -813,7 +823,16 @@ class DebugStatus:
|
|||
# always restore ``trio``'s sigint handler. see notes below in
|
||||
# the pdb factory about the nightmare that is that code swapping
|
||||
# out the handler when the repl activates...
|
||||
if not cls.is_main_trio_thread():
|
||||
# if not cls.is_main_trio_thread():
|
||||
if (
|
||||
not cls.is_main_trio_thread()
|
||||
and
|
||||
# not _state._runtime_vars.get(
|
||||
# '_is_infected_aio',
|
||||
# False,
|
||||
# )
|
||||
not current_actor().is_infected_aio()
|
||||
):
|
||||
trio.from_thread.run_sync(
|
||||
signal.signal,
|
||||
signal.SIGINT,
|
||||
|
@ -871,7 +890,7 @@ class DebugStatus:
|
|||
return False
|
||||
|
||||
@classmethod
|
||||
@pdbp.hideframe
|
||||
# @pdbp.hideframe
|
||||
def release(
|
||||
cls,
|
||||
cancel_req_task: bool = False,
|
||||
|
@ -880,11 +899,21 @@ class DebugStatus:
|
|||
try:
|
||||
# sometimes the task might already be terminated in
|
||||
# which case this call will raise an RTE?
|
||||
if (
|
||||
repl_release is not None
|
||||
):
|
||||
if repl_release is not None:
|
||||
if cls.is_main_trio_thread():
|
||||
repl_release.set()
|
||||
|
||||
elif current_actor().is_infected_aio():
|
||||
|
||||
async def _set_repl_release():
|
||||
repl_release.set()
|
||||
|
||||
fute: asyncio.Future = run_trio_task_in_future(
|
||||
_set_repl_release
|
||||
)
|
||||
if not fute.done():
|
||||
log.warning('REPL release state unknown..?')
|
||||
|
||||
else:
|
||||
# XXX NOTE ONLY used for bg root-actor sync
|
||||
# threads, see `.pause_from_sync()`.
|
||||
|
@ -1658,18 +1687,24 @@ async def _pause(
|
|||
try:
|
||||
task: Task = current_task()
|
||||
except RuntimeError as rte:
|
||||
# NOTE, 2 cases we might get here:
|
||||
#
|
||||
# - ACTUALLY not a `trio.lowlevel.Task` nor runtime caller,
|
||||
# |_ error out as normal
|
||||
#
|
||||
# - an infected `asycio` actor calls it from an actual
|
||||
# `asyncio.Task`
|
||||
# |_ in this case we DO NOT want to RTE!
|
||||
__tracebackhide__: bool = False
|
||||
log.exception(
|
||||
'Failed to get current `trio`-task?'
|
||||
)
|
||||
# if actor.is_infected_aio():
|
||||
# mk_pdb().set_trace()
|
||||
# raise RuntimeError(
|
||||
# '`tractor.pause[_from_sync]()` not yet supported '
|
||||
# 'directly (infected) `asyncio` tasks!'
|
||||
# ) from rte
|
||||
|
||||
raise rte
|
||||
if actor.is_infected_aio():
|
||||
log.exception(
|
||||
'Failed to get current `trio`-task?'
|
||||
)
|
||||
raise RuntimeError(
|
||||
'An `asyncio` task should not be calling this!?'
|
||||
) from rte
|
||||
else:
|
||||
task = asyncio.current_task()
|
||||
|
||||
if debug_func is not None:
|
||||
debug_func = partial(debug_func)
|
||||
|
@ -2060,7 +2095,8 @@ async def _pause(
|
|||
f'on behalf of {repl_task} ??\n'
|
||||
)
|
||||
|
||||
DebugStatus.release(cancel_req_task=True)
|
||||
if not actor.is_infected_aio():
|
||||
DebugStatus.release(cancel_req_task=True)
|
||||
|
||||
# sanity checks for ^ on request/status teardown
|
||||
# assert DebugStatus.repl is None # XXX no more bc bg thread cases?
|
||||
|
@ -2113,7 +2149,9 @@ def _set_trace(
|
|||
log.pdb(
|
||||
f'{_pause_msg}\n'
|
||||
f'>(\n'
|
||||
f'|_ {task} @ {actor.uid}\n'
|
||||
f'|_{actor.uid}\n'
|
||||
f' |_{task}\n' # @ {actor.uid}\n'
|
||||
# f'|_{task}\n'
|
||||
# ^-TODO-^ more compact pformating?
|
||||
# -[ ] make an `Actor.__repr()__`
|
||||
# -[ ] should we use `log.pformat_task_uid()`?
|
||||
|
@ -2390,9 +2428,6 @@ def pause_from_sync(
|
|||
actor: tractor.Actor = current_actor(
|
||||
err_on_no_runtime=False,
|
||||
)
|
||||
message: str = (
|
||||
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
|
||||
)
|
||||
if not actor:
|
||||
raise RuntimeError(
|
||||
'Not inside the `tractor`-runtime?\n'
|
||||
|
@ -2400,6 +2435,9 @@ def pause_from_sync(
|
|||
'- `async with tractor.open_nursery()` or,\n'
|
||||
'- `async with tractor.open_root_actor()`\n'
|
||||
)
|
||||
message: str = (
|
||||
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
|
||||
)
|
||||
|
||||
# TODO: once supported, remove this AND the one
|
||||
# inside `._pause()`!
|
||||
|
@ -2409,16 +2447,17 @@ def pause_from_sync(
|
|||
# injection?
|
||||
# -[ ] should `breakpoint()` work and what does it normally
|
||||
# do in `asyncio` ctxs?
|
||||
if actor.is_infected_aio():
|
||||
raise RuntimeError(
|
||||
'`tractor.pause[_from_sync]()` not yet supported '
|
||||
'for infected `asyncio` mode!'
|
||||
)
|
||||
# if actor.is_infected_aio():
|
||||
# raise RuntimeError(
|
||||
# '`tractor.pause[_from_sync]()` not yet supported '
|
||||
# 'for infected `asyncio` mode!'
|
||||
# )
|
||||
|
||||
repl: PdbREPL = mk_pdb()
|
||||
|
||||
# message += f'-> created local REPL {repl}\n'
|
||||
is_root: bool = is_root_process()
|
||||
is_aio: bool = actor.is_infected_aio()
|
||||
|
||||
# TODO: we could also check for a non-`.to_thread` context
|
||||
# using `trio.from_thread.check_cancelled()` (says
|
||||
|
@ -2431,8 +2470,11 @@ def pause_from_sync(
|
|||
# when called from a (bg) thread, run an async task in a new
|
||||
# thread which will call `._pause()` manually with special
|
||||
# handling for root-actor caller usage.
|
||||
if not DebugStatus.is_main_trio_thread():
|
||||
|
||||
if (
|
||||
not DebugStatus.is_main_trio_thread()
|
||||
and
|
||||
not is_aio # see below for this usage
|
||||
):
|
||||
# TODO: `threading.Lock()` this so we don't get races in
|
||||
# multi-thr cases where they're acquiring/releasing the
|
||||
# REPL and setting request/`Lock` state, etc..
|
||||
|
@ -2440,10 +2482,21 @@ def pause_from_sync(
|
|||
repl_owner = thread
|
||||
|
||||
# TODO: make root-actor bg thread usage work!
|
||||
if is_root:
|
||||
if (
|
||||
is_root
|
||||
# or
|
||||
# is_aio
|
||||
):
|
||||
if is_root:
|
||||
message += (
|
||||
f'-> called from a root-actor bg {thread}\n'
|
||||
)
|
||||
elif is_aio:
|
||||
message += (
|
||||
f'-> called from a `asyncio`-task bg {thread}\n'
|
||||
)
|
||||
message += (
|
||||
f'-> called from a root-actor bg {thread}\n'
|
||||
f'-> scheduling `._pause_from_bg_root_thread()`..\n'
|
||||
'-> scheduling `._pause_from_bg_root_thread()`..\n'
|
||||
)
|
||||
# XXX SUBTLE BADNESS XXX that should really change!
|
||||
# don't over-write the `repl` here since when
|
||||
|
@ -2462,7 +2515,8 @@ def pause_from_sync(
|
|||
hide_tb=hide_tb,
|
||||
**_pause_kwargs,
|
||||
),
|
||||
)
|
||||
),
|
||||
trio_token=trio.lowlevel.current_trio_token(),
|
||||
)
|
||||
DebugStatus.shield_sigint()
|
||||
message += (
|
||||
|
@ -2495,6 +2549,29 @@ def pause_from_sync(
|
|||
DebugStatus.shield_sigint()
|
||||
assert bg_task is not DebugStatus.repl_task
|
||||
|
||||
elif is_aio:
|
||||
greenback: ModuleType = maybe_import_greenback()
|
||||
repl_owner: Task = asyncio.current_task()
|
||||
fute: asyncio.Future = run_trio_task_in_future(
|
||||
partial(
|
||||
_pause,
|
||||
debug_func=None,
|
||||
repl=repl,
|
||||
hide_tb=hide_tb,
|
||||
|
||||
# XXX to prevent `._pause()` for setting
|
||||
# `DebugStatus.repl_task` to the gb task!
|
||||
called_from_sync=True,
|
||||
called_from_bg_thread=True,
|
||||
|
||||
**_pause_kwargs
|
||||
)
|
||||
)
|
||||
# TODO: for async version -> `.pause_from_aio()`?
|
||||
# bg_task, _ = await fute
|
||||
bg_task, _ = greenback.await_(fute)
|
||||
bg_task: asyncio.Task = asyncio.current_task()
|
||||
|
||||
else: # we are presumably the `trio.run()` + main thread
|
||||
# raises on not-found by default
|
||||
greenback: ModuleType = maybe_import_greenback()
|
||||
|
@ -2509,8 +2586,8 @@ def pause_from_sync(
|
|||
# NOTE XXX seems to need to be set BEFORE the `_pause()`
|
||||
# invoke using gb below?
|
||||
DebugStatus.shield_sigint()
|
||||
|
||||
repl_owner: Task = current_task()
|
||||
|
||||
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
|
||||
try:
|
||||
out = greenback.await_(
|
||||
|
@ -2572,6 +2649,10 @@ def pause_from_sync(
|
|||
# -[ ] tried to use `@pdbp.hideframe` decoration but
|
||||
# still doesn't work
|
||||
except BaseException as err:
|
||||
log.exception(
|
||||
'Failed to sync-pause from\n\n'
|
||||
f'{repl_owner}\n'
|
||||
)
|
||||
__tracebackhide__: bool = False
|
||||
raise err
|
||||
|
||||
|
|
|
@ -562,6 +562,100 @@ class AsyncioRuntimeTranslationError(RuntimeError):
|
|||
'''
|
||||
|
||||
|
||||
def run_trio_task_in_future(
|
||||
async_fn,
|
||||
*args,
|
||||
) -> asyncio.Future:
|
||||
'''
|
||||
Run an async-func as a `trio` task from an `asyncio.Task` wrapped
|
||||
in a `asyncio.Future` which is returned to the caller.
|
||||
|
||||
Another astounding feat by the great @oremanj !!
|
||||
|
||||
Bo
|
||||
|
||||
'''
|
||||
result_future = asyncio.Future()
|
||||
cancel_scope = trio.CancelScope()
|
||||
finished: bool = False
|
||||
|
||||
# Monkeypatch the returned future's cancel() method to forward
|
||||
# cancellation to the Trio task
|
||||
cancel_message = None
|
||||
orig_cancel = result_future.cancel
|
||||
|
||||
def wrapped_cancel(msg=None):
|
||||
nonlocal cancel_message
|
||||
if finished:
|
||||
# We're being called back after the task completed
|
||||
if msg is not None:
|
||||
return orig_cancel(msg)
|
||||
elif cancel_message is not None:
|
||||
return orig_cancel(cancel_message)
|
||||
else:
|
||||
return orig_cancel()
|
||||
|
||||
if result_future.done():
|
||||
return False
|
||||
|
||||
# Forward cancellation to the Trio task, don't mark
|
||||
# future as cancelled until it completes
|
||||
cancel_message = msg
|
||||
cancel_scope.cancel()
|
||||
return True
|
||||
|
||||
result_future.cancel = wrapped_cancel
|
||||
# End of monkeypatching
|
||||
|
||||
async def trio_task() -> None:
|
||||
nonlocal finished
|
||||
try:
|
||||
with cancel_scope:
|
||||
try:
|
||||
# TODO: type this with new tech in 3.13
|
||||
result: Any = await async_fn(*args)
|
||||
finally:
|
||||
finished = True
|
||||
|
||||
# Propagate result or cancellation to the Future
|
||||
if cancel_scope.cancelled_caught:
|
||||
result_future.cancel()
|
||||
|
||||
elif not result_future.cancelled():
|
||||
result_future.set_result(result)
|
||||
|
||||
except BaseException as exc:
|
||||
# The result future gets all the non-Cancelled
|
||||
# exceptions. Any Cancelled need to keep propagating
|
||||
# out of this stack frame in order to reach the cancel
|
||||
# scope for which they're intended.
|
||||
cancelled: BaseException|None
|
||||
rest: BaseException|None
|
||||
if isinstance(exc, BaseExceptionGroup):
|
||||
cancelled, rest = exc.split(trio.Cancelled)
|
||||
|
||||
elif isinstance(exc, trio.Cancelled):
|
||||
cancelled, rest = exc, None
|
||||
|
||||
else:
|
||||
cancelled, rest = None, exc
|
||||
|
||||
if not result_future.cancelled():
|
||||
if rest:
|
||||
result_future.set_exception(rest)
|
||||
else:
|
||||
result_future.cancel()
|
||||
|
||||
if cancelled:
|
||||
raise cancelled
|
||||
|
||||
trio.lowlevel.spawn_system_task(
|
||||
trio_task,
|
||||
name=async_fn,
|
||||
)
|
||||
return result_future
|
||||
|
||||
|
||||
def run_as_asyncio_guest(
|
||||
trio_main: Callable,
|
||||
# ^-NOTE-^ when spawned with `infected_aio=True` this func is
|
||||
|
|
Loading…
Reference in New Issue