Raise explicitly on missing `greenback` portal
When `.pause_from_sync()` is called from an `asyncio.Task` which was never bestowed a portal we want to be mega pedantic about it; indicate that the task was NOT spawned from our `.to_asyncio` API and likely by some out-of-our-control code (normally using `asyncio.ensure_future()/.create_task()`). Though `greenback` already errors on such usage, it's not always clear why no portal exists; explaining the situation of a 3rd-party-bg-spawned-task should avoid dev confusion for most cases. Impl deats, - distinguish between an actor in infected mode versus the actual caller of `.pause_from_sync()` being an `asyncio.Task` with more explicit `asyncio_task` and `is_infected_aio` vars. - ONLY in the case of being both an infected-mode-actor AND detecting that the caller is an `asyncio.Task`, check `greenback.has_portal()` such that when not bestowed we presume the aforementioned 3rd-party-bg-task case above and raise a new explicit RTE with a detailed explanatory message. - add some masked draft code for handling the speical case of a root actor `asyncio.Task` caller which could (in theory) not actually require gb portal use since the `Lock` can be acquired directly without IPC. |_this will likely require factoring of various pause machinery funcs into a `_pause_from_root_task()` to mk the impl sane XD Other, - expose a new `debug_filter: Callable` which can be provided by the caller of `_maybe_enter_pm()` to predicate whether to enter the debugger REPL based on the caught `BaseException|BaseExceptionGroup`; this is handy for customizing the meaning of "graceful cancellations" so as to avoid crash handling on expected egs of more then `trioCancelled`. |_ make the default as it was implemented: `not is_multi_cancelled(err)` - pass-through a new `ignore: set[BaseException]` as `open_crash_handler(ignore_nested=ignore)` to allow for the same silent-cancellation-egs-swallowing as desired from outside the actor runtime.hilevel_serman
parent
350a94f39e
commit
bf9689e10a
|
@ -75,6 +75,7 @@ from tractor import _state
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
InternalError,
|
InternalError,
|
||||||
NoRuntime,
|
NoRuntime,
|
||||||
|
is_multi_cancelled,
|
||||||
)
|
)
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
|
@ -1743,7 +1744,7 @@ async def _pause(
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
**debug_func_kwargs,
|
**debug_func_kwargs,
|
||||||
|
|
||||||
) -> tuple[PdbREPL, Task]|None:
|
) -> tuple[Task, PdbREPL]|None:
|
||||||
'''
|
'''
|
||||||
Inner impl for `pause()` to avoid the `trio.CancelScope.__exit__()`
|
Inner impl for `pause()` to avoid the `trio.CancelScope.__exit__()`
|
||||||
stack frame when not shielded (since apparently i can't figure out
|
stack frame when not shielded (since apparently i can't figure out
|
||||||
|
@ -1929,7 +1930,7 @@ async def _pause(
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=shield):
|
with trio.CancelScope(shield=shield):
|
||||||
await trio.lowlevel.checkpoint()
|
await trio.lowlevel.checkpoint()
|
||||||
return repl, task
|
return (repl, task)
|
||||||
|
|
||||||
# elif repl_task:
|
# elif repl_task:
|
||||||
# log.warning(
|
# log.warning(
|
||||||
|
@ -2530,26 +2531,17 @@ def pause_from_sync(
|
||||||
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
|
f'{actor.uid} task called `tractor.pause_from_sync()`\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: once supported, remove this AND the one
|
|
||||||
# inside `._pause()`!
|
|
||||||
# outstanding impl fixes:
|
|
||||||
# -[ ] need to make `.shield_sigint()` below work here!
|
|
||||||
# -[ ] how to handle `asyncio`'s new SIGINT-handler
|
|
||||||
# injection?
|
|
||||||
# -[ ] should `breakpoint()` work and what does it normally
|
|
||||||
# do in `asyncio` ctxs?
|
|
||||||
# if actor.is_infected_aio():
|
|
||||||
# raise RuntimeError(
|
|
||||||
# '`tractor.pause[_from_sync]()` not yet supported '
|
|
||||||
# 'for infected `asyncio` mode!'
|
|
||||||
# )
|
|
||||||
|
|
||||||
repl: PdbREPL = mk_pdb()
|
repl: PdbREPL = mk_pdb()
|
||||||
|
|
||||||
# message += f'-> created local REPL {repl}\n'
|
# message += f'-> created local REPL {repl}\n'
|
||||||
is_trio_thread: bool = DebugStatus.is_main_trio_thread()
|
is_trio_thread: bool = DebugStatus.is_main_trio_thread()
|
||||||
is_root: bool = is_root_process()
|
is_root: bool = is_root_process()
|
||||||
is_aio: bool = actor.is_infected_aio()
|
is_infected_aio: bool = actor.is_infected_aio()
|
||||||
|
thread: Thread = threading.current_thread()
|
||||||
|
|
||||||
|
asyncio_task: asyncio.Task|None = None
|
||||||
|
if is_infected_aio:
|
||||||
|
asyncio_task = asyncio.current_task()
|
||||||
|
|
||||||
# TODO: we could also check for a non-`.to_thread` context
|
# TODO: we could also check for a non-`.to_thread` context
|
||||||
# using `trio.from_thread.check_cancelled()` (says
|
# using `trio.from_thread.check_cancelled()` (says
|
||||||
|
@ -2565,20 +2557,14 @@ def pause_from_sync(
|
||||||
if (
|
if (
|
||||||
not is_trio_thread
|
not is_trio_thread
|
||||||
and
|
and
|
||||||
not is_aio # see below for this usage
|
not asyncio_task
|
||||||
):
|
):
|
||||||
# TODO: `threading.Lock()` this so we don't get races in
|
# TODO: `threading.Lock()` this so we don't get races in
|
||||||
# multi-thr cases where they're acquiring/releasing the
|
# multi-thr cases where they're acquiring/releasing the
|
||||||
# REPL and setting request/`Lock` state, etc..
|
# REPL and setting request/`Lock` state, etc..
|
||||||
thread: threading.Thread = threading.current_thread()
|
repl_owner: Thread = thread
|
||||||
repl_owner = thread
|
|
||||||
|
|
||||||
# TODO: make root-actor bg thread usage work!
|
# TODO: make root-actor bg thread usage work!
|
||||||
if (
|
|
||||||
is_root
|
|
||||||
# or
|
|
||||||
# is_aio
|
|
||||||
):
|
|
||||||
if is_root:
|
if is_root:
|
||||||
message += (
|
message += (
|
||||||
f'-> called from a root-actor bg {thread}\n'
|
f'-> called from a root-actor bg {thread}\n'
|
||||||
|
@ -2637,13 +2623,32 @@ def pause_from_sync(
|
||||||
DebugStatus.shield_sigint()
|
DebugStatus.shield_sigint()
|
||||||
assert bg_task is not DebugStatus.repl_task
|
assert bg_task is not DebugStatus.repl_task
|
||||||
|
|
||||||
|
# TODO: once supported, remove this AND the one
|
||||||
|
# inside `._pause()`!
|
||||||
|
# outstanding impl fixes:
|
||||||
|
# -[ ] need to make `.shield_sigint()` below work here!
|
||||||
|
# -[ ] how to handle `asyncio`'s new SIGINT-handler
|
||||||
|
# injection?
|
||||||
|
# -[ ] should `breakpoint()` work and what does it normally
|
||||||
|
# do in `asyncio` ctxs?
|
||||||
|
# if actor.is_infected_aio():
|
||||||
|
# raise RuntimeError(
|
||||||
|
# '`tractor.pause[_from_sync]()` not yet supported '
|
||||||
|
# 'for infected `asyncio` mode!'
|
||||||
|
# )
|
||||||
elif (
|
elif (
|
||||||
not is_trio_thread
|
not is_trio_thread
|
||||||
and
|
and
|
||||||
is_aio
|
is_infected_aio # as in, the special actor-runtime mode
|
||||||
|
# ^NOTE XXX, that doesn't mean the caller is necessarily
|
||||||
|
# an `asyncio.Task` just that `trio` has been embedded on
|
||||||
|
# the `asyncio` event loop!
|
||||||
|
and
|
||||||
|
asyncio_task # transitive caller is an actual `asyncio.Task`
|
||||||
):
|
):
|
||||||
greenback: ModuleType = maybe_import_greenback()
|
greenback: ModuleType = maybe_import_greenback()
|
||||||
repl_owner: Task = asyncio.current_task()
|
|
||||||
|
if greenback.has_portal():
|
||||||
DebugStatus.shield_sigint()
|
DebugStatus.shield_sigint()
|
||||||
fute: asyncio.Future = run_trio_task_in_future(
|
fute: asyncio.Future = run_trio_task_in_future(
|
||||||
partial(
|
partial(
|
||||||
|
@ -2660,11 +2665,53 @@ def pause_from_sync(
|
||||||
**_pause_kwargs
|
**_pause_kwargs
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
repl_owner = asyncio_task
|
||||||
# TODO: for async version -> `.pause_from_aio()`?
|
|
||||||
# bg_task, _ = await fute
|
|
||||||
bg_task, _ = greenback.await_(fute)
|
bg_task, _ = greenback.await_(fute)
|
||||||
bg_task: asyncio.Task = asyncio.current_task()
|
# TODO: ASYNC version -> `.pause_from_aio()`?
|
||||||
|
# bg_task, _ = await fute
|
||||||
|
|
||||||
|
# handle the case where an `asyncio` task has been
|
||||||
|
# spawned WITHOUT enabling a `greenback` portal..
|
||||||
|
# => can often happen in 3rd party libs.
|
||||||
|
else:
|
||||||
|
bg_task = repl_owner
|
||||||
|
|
||||||
|
# TODO, ostensibly we can just acquire the
|
||||||
|
# debug lock directly presuming we're the
|
||||||
|
# root actor running in infected asyncio
|
||||||
|
# mode?
|
||||||
|
#
|
||||||
|
# TODO, this would be a special case where
|
||||||
|
# a `_pause_from_root()` would come in very
|
||||||
|
# handy!
|
||||||
|
# if is_root:
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
|
# log.warning(
|
||||||
|
# 'Allowing `asyncio` task to acquire debug-lock in root-actor..\n'
|
||||||
|
# 'This is not fully implemented yet; there may be teardown hangs!\n\n'
|
||||||
|
# )
|
||||||
|
# else:
|
||||||
|
|
||||||
|
# simply unsupported, since there exists no hack (i
|
||||||
|
# can think of) to workaround this in a subactor
|
||||||
|
# which needs to lock the root's REPL ow we're sure
|
||||||
|
# to get prompt stdstreams clobbering..
|
||||||
|
cf_repr: str = ''
|
||||||
|
if api_frame:
|
||||||
|
caller_frame: FrameType = api_frame.f_back
|
||||||
|
cf_repr: str = f'caller_frame: {caller_frame!r}\n'
|
||||||
|
|
||||||
|
raise RuntimeError(
|
||||||
|
f"CAN'T USE `greenback._await()` without a portal !?\n\n"
|
||||||
|
f'Likely this task was NOT spawned via the `tractor.to_asyncio` API..\n'
|
||||||
|
f'{asyncio_task}\n'
|
||||||
|
f'{cf_repr}\n'
|
||||||
|
|
||||||
|
f'Prolly the task was started out-of-band (from some lib?)\n'
|
||||||
|
f'AND one of the below was never called ??\n'
|
||||||
|
f'- greenback.ensure_portal()\n'
|
||||||
|
f'- greenback.bestow_portal(<task>)\n'
|
||||||
|
)
|
||||||
|
|
||||||
else: # we are presumably the `trio.run()` + main thread
|
else: # we are presumably the `trio.run()` + main thread
|
||||||
# raises on not-found by default
|
# raises on not-found by default
|
||||||
|
@ -2915,8 +2962,14 @@ async def _maybe_enter_pm(
|
||||||
tb: TracebackType|None = None,
|
tb: TracebackType|None = None,
|
||||||
api_frame: FrameType|None = None,
|
api_frame: FrameType|None = None,
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = False,
|
||||||
|
|
||||||
|
# only enter debugger REPL when returns `True`
|
||||||
|
debug_filter: Callable[
|
||||||
|
[BaseException|BaseExceptionGroup],
|
||||||
|
bool,
|
||||||
|
] = lambda err: not is_multi_cancelled(err),
|
||||||
|
|
||||||
):
|
):
|
||||||
from tractor._exceptions import is_multi_cancelled
|
|
||||||
if (
|
if (
|
||||||
debug_mode()
|
debug_mode()
|
||||||
|
|
||||||
|
@ -2933,7 +2986,8 @@ async def _maybe_enter_pm(
|
||||||
|
|
||||||
# Really we just want to mostly avoid catching KBIs here so there
|
# Really we just want to mostly avoid catching KBIs here so there
|
||||||
# might be a simpler check we can do?
|
# might be a simpler check we can do?
|
||||||
and not is_multi_cancelled(err)
|
and
|
||||||
|
debug_filter(err)
|
||||||
):
|
):
|
||||||
api_frame: FrameType = api_frame or inspect.currentframe()
|
api_frame: FrameType = api_frame or inspect.currentframe()
|
||||||
tb: TracebackType = tb or sys.exc_info()[2]
|
tb: TracebackType = tb or sys.exc_info()[2]
|
||||||
|
@ -3139,10 +3193,16 @@ def open_crash_handler(
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
except tuple(catch) as err:
|
except tuple(catch) as err:
|
||||||
if type(err) not in ignore:
|
if (
|
||||||
|
type(err) not in ignore
|
||||||
# use our re-impl-ed version
|
and
|
||||||
|
not is_multi_cancelled(
|
||||||
|
err,
|
||||||
|
ignore_nested=ignore
|
||||||
|
)
|
||||||
|
):
|
||||||
try:
|
try:
|
||||||
|
# use our re-impl-ed version
|
||||||
_post_mortem(
|
_post_mortem(
|
||||||
repl=mk_pdb(),
|
repl=mk_pdb(),
|
||||||
tb=sys.exc_info()[2],
|
tb=sys.exc_info()[2],
|
||||||
|
|
Loading…
Reference in New Issue