forked from goodboy/tractor
Merge pull request #337 from goodboy/debug_lock_blocking
Debug lock blockingdun_unset_current_actor
commit
247d3448ae
|
@ -0,0 +1,41 @@
|
||||||
|
Add support for debug-lock blocking using a ``._debug.Lock._blocked:
|
||||||
|
set[tuple]`` and add ids when no-more IPC connections with the
|
||||||
|
root actor are detected.
|
||||||
|
|
||||||
|
This is an enhancement which (mostly) solves a lingering debugger
|
||||||
|
locking race case we needed to handle:
|
||||||
|
|
||||||
|
- child crashes acquires TTY lock in root and attaches to ``pdb``
|
||||||
|
- child IPC goes down such that all channels to the root are broken
|
||||||
|
/ non-functional.
|
||||||
|
- root is stuck thinking the child is still in debug even though it
|
||||||
|
can't be contacted and the child actor machinery hasn't been
|
||||||
|
cancelled by its parent.
|
||||||
|
- root get's stuck in deadlock with child since it won't send a cancel
|
||||||
|
request until the child is finished debugging (to avoid clobbering
|
||||||
|
a child that is actually using the debugger), but the child can't
|
||||||
|
unlock the debugger bc IPC is down and it can't contact the root.
|
||||||
|
|
||||||
|
To avoid this scenario add debug lock blocking list via
|
||||||
|
`._debug.Lock._blocked: set[tuple]` which holds actor uids for any actor
|
||||||
|
that is detected by the root as having no transport channel connections
|
||||||
|
(of which at least one should exist if this sub-actor at some point
|
||||||
|
acquired the debug lock). The root consequently checks this list for any
|
||||||
|
actor that tries to (re)acquire the lock and blocks with
|
||||||
|
a ``ContextCancelled``. Further, when a debug condition is tested in
|
||||||
|
``._runtime._invoke``, the context's ``._enter_debugger_on_cancel`` is
|
||||||
|
set to `False` if the actor was put on the block list then all
|
||||||
|
post-mortem / crash handling will be bypassed for that task.
|
||||||
|
|
||||||
|
In theory this approach to block list management may cause problems
|
||||||
|
where some nested child actor acquires and releases the lock multiple
|
||||||
|
times and it gets stuck on the block list after the first use? If this
|
||||||
|
turns out to be an issue we can try changing the strat so blocks are
|
||||||
|
only added when the root has zero IPC peers left?
|
||||||
|
|
||||||
|
Further, this adds a root-locking-task side cancel scope,
|
||||||
|
``Lock._root_local_task_cs_in_debug``, which can be ``.cancel()``-ed by the root
|
||||||
|
runtime when a stale lock is detected during the IPC channel testing.
|
||||||
|
However, right now we're NOT using this since it seems to cause test
|
||||||
|
failures likely due to causing pre-mature cancellation and maybe needs
|
||||||
|
a bit more experimenting?
|
|
@ -2,7 +2,7 @@
|
||||||
package = "tractor"
|
package = "tractor"
|
||||||
filename = "NEWS.rst"
|
filename = "NEWS.rst"
|
||||||
directory = "nooz/"
|
directory = "nooz/"
|
||||||
version = "0.1.0a5"
|
version = "0.1.0a6"
|
||||||
title_format = "tractor {version} ({project_date})"
|
title_format = "tractor {version} ({project_date})"
|
||||||
template = "nooz/_template.rst"
|
template = "nooz/_template.rst"
|
||||||
all_bullets = true
|
all_bullets = true
|
||||||
|
|
|
@ -170,11 +170,11 @@ async def trio_ctx(
|
||||||
# message.
|
# message.
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(2):
|
||||||
async with (
|
async with (
|
||||||
|
trio.open_nursery() as n,
|
||||||
|
|
||||||
tractor.to_asyncio.open_channel_from(
|
tractor.to_asyncio.open_channel_from(
|
||||||
sleep_and_err,
|
sleep_and_err,
|
||||||
) as (first, chan),
|
) as (first, chan),
|
||||||
|
|
||||||
trio.open_nursery() as n,
|
|
||||||
):
|
):
|
||||||
|
|
||||||
assert first == 'start'
|
assert first == 'start'
|
||||||
|
@ -203,24 +203,25 @@ def test_context_spawns_aio_task_that_errors(
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
async with tractor.open_nursery() as n:
|
with trio.fail_after(2):
|
||||||
p = await n.start_actor(
|
async with tractor.open_nursery() as n:
|
||||||
'aio_daemon',
|
p = await n.start_actor(
|
||||||
enable_modules=[__name__],
|
'aio_daemon',
|
||||||
infect_asyncio=True,
|
enable_modules=[__name__],
|
||||||
# debug_mode=True,
|
infect_asyncio=True,
|
||||||
loglevel='cancel',
|
# debug_mode=True,
|
||||||
)
|
loglevel='cancel',
|
||||||
async with p.open_context(
|
)
|
||||||
trio_ctx,
|
async with p.open_context(
|
||||||
) as (ctx, first):
|
trio_ctx,
|
||||||
|
) as (ctx, first):
|
||||||
|
|
||||||
assert first == 'start'
|
assert first == 'start'
|
||||||
|
|
||||||
if parent_cancels:
|
if parent_cancels:
|
||||||
await p.cancel_actor()
|
await p.cancel_actor()
|
||||||
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
with pytest.raises(RemoteActorError) as excinfo:
|
with pytest.raises(RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -38,8 +38,14 @@ from trio_typing import TaskStatus
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._discovery import get_root
|
from ._discovery import get_root
|
||||||
from ._state import is_root_process, debug_mode
|
from ._state import (
|
||||||
from ._exceptions import is_multi_cancelled
|
is_root_process,
|
||||||
|
debug_mode,
|
||||||
|
)
|
||||||
|
from ._exceptions import (
|
||||||
|
is_multi_cancelled,
|
||||||
|
ContextCancelled,
|
||||||
|
)
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
|
|
||||||
|
|
||||||
|
@ -72,6 +78,18 @@ class Lock:
|
||||||
# actor-wide variable pointing to current task name using debugger
|
# actor-wide variable pointing to current task name using debugger
|
||||||
local_task_in_debug: Optional[str] = None
|
local_task_in_debug: Optional[str] = None
|
||||||
|
|
||||||
|
# NOTE: set by the current task waiting on the root tty lock from
|
||||||
|
# the CALLER side of the `lock_tty_for_child()` context entry-call
|
||||||
|
# and must be cancelled if this actor is cancelled via IPC
|
||||||
|
# request-message otherwise deadlocks with the parent actor may
|
||||||
|
# ensure
|
||||||
|
_debugger_request_cs: Optional[trio.CancelScope] = None
|
||||||
|
|
||||||
|
# NOTE: set only in the root actor for the **local** root spawned task
|
||||||
|
# which has acquired the lock (i.e. this is on the callee side of
|
||||||
|
# the `lock_tty_for_child()` context entry).
|
||||||
|
_root_local_task_cs_in_debug: Optional[trio.CancelScope] = None
|
||||||
|
|
||||||
# actor tree-wide actor uid that supposedly has the tty lock
|
# actor tree-wide actor uid that supposedly has the tty lock
|
||||||
global_actor_in_debug: Optional[tuple[str, str]] = None
|
global_actor_in_debug: Optional[tuple[str, str]] = None
|
||||||
|
|
||||||
|
@ -81,12 +99,8 @@ class Lock:
|
||||||
# lock in root actor preventing multi-access to local tty
|
# lock in root actor preventing multi-access to local tty
|
||||||
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
||||||
|
|
||||||
# XXX: set by the current task waiting on the root tty lock
|
|
||||||
# and must be cancelled if this actor is cancelled via message
|
|
||||||
# otherwise deadlocks with the parent actor may ensure
|
|
||||||
_debugger_request_cs: Optional[trio.CancelScope] = None
|
|
||||||
|
|
||||||
_orig_sigint_handler: Optional[Callable] = None
|
_orig_sigint_handler: Optional[Callable] = None
|
||||||
|
_blocked: set[tuple[str, str]] = set()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def shield_sigint(cls):
|
def shield_sigint(cls):
|
||||||
|
@ -196,6 +210,12 @@ async def _acquire_debug_lock_from_root_task(
|
||||||
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
||||||
)
|
)
|
||||||
we_acquired = True
|
we_acquired = True
|
||||||
|
|
||||||
|
# NOTE: if the surrounding cancel scope from the
|
||||||
|
# `lock_tty_for_child()` caller is cancelled, this line should
|
||||||
|
# unblock and NOT leave us in some kind of
|
||||||
|
# a "child-locked-TTY-but-child-is-uncontactable-over-IPC"
|
||||||
|
# condition.
|
||||||
await Lock._debug_lock.acquire()
|
await Lock._debug_lock.acquire()
|
||||||
|
|
||||||
if Lock.no_remote_has_tty is None:
|
if Lock.no_remote_has_tty is None:
|
||||||
|
@ -267,6 +287,15 @@ async def lock_tty_for_child(
|
||||||
'''
|
'''
|
||||||
task_name = trio.lowlevel.current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
|
if tuple(subactor_uid) in Lock._blocked:
|
||||||
|
log.warning(
|
||||||
|
f'Actor {subactor_uid} is blocked from acquiring debug lock\n'
|
||||||
|
f"remote task: {task_name}:{subactor_uid}"
|
||||||
|
)
|
||||||
|
ctx._enter_debugger_on_cancel = False
|
||||||
|
await ctx.cancel(f'Debug lock blocked for {subactor_uid}')
|
||||||
|
return 'pdb_lock_blocked'
|
||||||
|
|
||||||
# TODO: when we get to true remote debugging
|
# TODO: when we get to true remote debugging
|
||||||
# this will deliver stdin data?
|
# this will deliver stdin data?
|
||||||
|
|
||||||
|
@ -280,8 +309,9 @@ async def lock_tty_for_child(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with (
|
with (
|
||||||
trio.CancelScope(shield=True),
|
trio.CancelScope(shield=True) as debug_lock_cs,
|
||||||
):
|
):
|
||||||
|
Lock._root_local_task_cs_in_debug = debug_lock_cs
|
||||||
async with _acquire_debug_lock_from_root_task(subactor_uid):
|
async with _acquire_debug_lock_from_root_task(subactor_uid):
|
||||||
|
|
||||||
# indicate to child that we've locked stdio
|
# indicate to child that we've locked stdio
|
||||||
|
@ -297,6 +327,7 @@ async def lock_tty_for_child(
|
||||||
return "pdb_unlock_complete"
|
return "pdb_unlock_complete"
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
Lock._root_local_task_cs_in_debug = None
|
||||||
Lock.unshield_sigint()
|
Lock.unshield_sigint()
|
||||||
|
|
||||||
|
|
||||||
|
@ -353,7 +384,7 @@ async def wait_for_parent_stdin_hijack(
|
||||||
|
|
||||||
log.pdb('unlocked context')
|
log.pdb('unlocked context')
|
||||||
|
|
||||||
except tractor.ContextCancelled:
|
except ContextCancelled:
|
||||||
log.warning('Root actor cancelled debug lock')
|
log.warning('Root actor cancelled debug lock')
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -721,9 +752,11 @@ async def _maybe_enter_pm(err):
|
||||||
and not is_multi_cancelled(err)
|
and not is_multi_cancelled(err)
|
||||||
):
|
):
|
||||||
log.debug("Actor crashed, entering debug mode")
|
log.debug("Actor crashed, entering debug mode")
|
||||||
await post_mortem()
|
try:
|
||||||
Lock.release()
|
await post_mortem()
|
||||||
return True
|
finally:
|
||||||
|
Lock.release()
|
||||||
|
return True
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -234,6 +234,9 @@ async def _invoke(
|
||||||
f'{ctx.chan.uid}'
|
f'{ctx.chan.uid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if ctx._cancel_msg:
|
||||||
|
msg += f' with msg:\n{ctx._cancel_msg}'
|
||||||
|
|
||||||
# task-contex was cancelled so relay to the cancel to caller
|
# task-contex was cancelled so relay to the cancel to caller
|
||||||
raise ContextCancelled(
|
raise ContextCancelled(
|
||||||
msg,
|
msg,
|
||||||
|
@ -275,8 +278,16 @@ async def _invoke(
|
||||||
# if not is_multi_cancelled(err) and (
|
# if not is_multi_cancelled(err) and (
|
||||||
|
|
||||||
entered_debug: bool = False
|
entered_debug: bool = False
|
||||||
if not isinstance(err, ContextCancelled) or (
|
if (
|
||||||
isinstance(err, ContextCancelled) and ctx._cancel_called
|
not isinstance(err, ContextCancelled)
|
||||||
|
or (
|
||||||
|
isinstance(err, ContextCancelled)
|
||||||
|
and ctx._cancel_called
|
||||||
|
|
||||||
|
# if the root blocks the debugger lock request from a child
|
||||||
|
# we will get a remote-cancelled condition.
|
||||||
|
and ctx._enter_debugger_on_cancel
|
||||||
|
)
|
||||||
):
|
):
|
||||||
# XXX: is there any case where we'll want to debug IPC
|
# XXX: is there any case where we'll want to debug IPC
|
||||||
# disconnects as a default?
|
# disconnects as a default?
|
||||||
|
@ -286,7 +297,6 @@ async def _invoke(
|
||||||
# recovery logic - the only case is some kind of strange bug
|
# recovery logic - the only case is some kind of strange bug
|
||||||
# in our transport layer itself? Going to keep this
|
# in our transport layer itself? Going to keep this
|
||||||
# open ended for now.
|
# open ended for now.
|
||||||
|
|
||||||
entered_debug = await _debug._maybe_enter_pm(err)
|
entered_debug = await _debug._maybe_enter_pm(err)
|
||||||
|
|
||||||
if not entered_debug:
|
if not entered_debug:
|
||||||
|
@ -698,16 +708,35 @@ class Actor:
|
||||||
log.runtime(f"No more channels for {chan.uid}")
|
log.runtime(f"No more channels for {chan.uid}")
|
||||||
self._peers.pop(uid, None)
|
self._peers.pop(uid, None)
|
||||||
|
|
||||||
# for (uid, cid) in self._contexts.copy():
|
# NOTE: block this actor from acquiring the
|
||||||
# if chan.uid == uid:
|
# debugger-TTY-lock since we have no way to know if we
|
||||||
# self._contexts.pop((uid, cid))
|
# cancelled it and further there is no way to ensure the
|
||||||
|
# lock will be released if acquired due to having no
|
||||||
|
# more active IPC channels.
|
||||||
|
if _state.is_root_process():
|
||||||
|
pdb_lock = _debug.Lock
|
||||||
|
pdb_lock._blocked.add(uid)
|
||||||
|
log.runtime(f"{uid} blocked from pdb locking")
|
||||||
|
|
||||||
|
# if a now stale local task has the TTY lock still
|
||||||
|
# we cancel it to allow servicing other requests for
|
||||||
|
# the lock.
|
||||||
|
if (
|
||||||
|
pdb_lock._root_local_task_cs_in_debug
|
||||||
|
and not pdb_lock._root_local_task_cs_in_debug.cancel_called
|
||||||
|
):
|
||||||
|
log.warning(
|
||||||
|
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
||||||
|
)
|
||||||
|
# TODO: figure out why this breaks tests..
|
||||||
|
# pdb_lock._root_local_task_cs_in_debug.cancel()
|
||||||
|
|
||||||
log.runtime(f"Peers is {self._peers}")
|
log.runtime(f"Peers is {self._peers}")
|
||||||
|
|
||||||
# No more channels to other actors (at all) registered
|
# No more channels to other actors (at all) registered
|
||||||
# as connected.
|
# as connected.
|
||||||
if not self._peers:
|
if not self._peers:
|
||||||
log.runtime("Signalling no more peer channels")
|
log.runtime("Signalling no more peer channel connections")
|
||||||
self._no_more_peers.set()
|
self._no_more_peers.set()
|
||||||
|
|
||||||
# XXX: is this necessary (GC should do it)?
|
# XXX: is this necessary (GC should do it)?
|
||||||
|
|
|
@ -371,6 +371,8 @@ class Context:
|
||||||
|
|
||||||
# status flags
|
# status flags
|
||||||
_cancel_called: bool = False
|
_cancel_called: bool = False
|
||||||
|
_cancel_msg: Optional[str] = None
|
||||||
|
_enter_debugger_on_cancel: bool = True
|
||||||
_started_called: bool = False
|
_started_called: bool = False
|
||||||
_started_received: bool = False
|
_started_received: bool = False
|
||||||
_stream_opened: bool = False
|
_stream_opened: bool = False
|
||||||
|
@ -452,7 +454,11 @@ class Context:
|
||||||
if not self._scope_nursery._closed: # type: ignore
|
if not self._scope_nursery._closed: # type: ignore
|
||||||
self._scope_nursery.start_soon(raiser)
|
self._scope_nursery.start_soon(raiser)
|
||||||
|
|
||||||
async def cancel(self) -> None:
|
async def cancel(
|
||||||
|
self,
|
||||||
|
msg: Optional[str] = None,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Cancel this inter-actor-task context.
|
Cancel this inter-actor-task context.
|
||||||
|
|
||||||
|
@ -461,6 +467,8 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
side = 'caller' if self._portal else 'callee'
|
side = 'caller' if self._portal else 'callee'
|
||||||
|
if msg:
|
||||||
|
assert side == 'callee', 'Only callee side can provide cancel msg'
|
||||||
|
|
||||||
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
||||||
|
|
||||||
|
@ -497,8 +505,10 @@ class Context:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
"Timed out on cancelling remote task "
|
"Timed out on cancelling remote task "
|
||||||
f"{cid} for {self._portal.channel.uid}")
|
f"{cid} for {self._portal.channel.uid}")
|
||||||
|
|
||||||
|
# callee side remote task
|
||||||
else:
|
else:
|
||||||
# callee side remote task
|
self._cancel_msg = msg
|
||||||
|
|
||||||
# TODO: should we have an explicit cancel message
|
# TODO: should we have an explicit cancel message
|
||||||
# or is relaying the local `trio.Cancelled` as an
|
# or is relaying the local `trio.Cancelled` as an
|
||||||
|
|
|
@ -466,11 +466,11 @@ async def open_channel_from(
|
||||||
):
|
):
|
||||||
# sync to a "started()"-like first delivered value from the
|
# sync to a "started()"-like first delivered value from the
|
||||||
# ``asyncio`` task.
|
# ``asyncio`` task.
|
||||||
first = await chan.receive()
|
|
||||||
|
|
||||||
# deliver stream handle upward
|
|
||||||
try:
|
try:
|
||||||
with chan._trio_cs:
|
with chan._trio_cs:
|
||||||
|
first = await chan.receive()
|
||||||
|
|
||||||
|
# deliver stream handle upward
|
||||||
yield first, chan
|
yield first, chan
|
||||||
finally:
|
finally:
|
||||||
chan._trio_exited = True
|
chan._trio_exited = True
|
||||||
|
@ -491,16 +491,18 @@ def run_as_asyncio_guest(
|
||||||
SC semantics.
|
SC semantics.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# Uh, oh. :o
|
# Uh, oh.
|
||||||
|
#
|
||||||
|
# :o
|
||||||
|
|
||||||
# It looks like your event loop has caught a case of the ``trio``s.
|
# It looks like your event loop has caught a case of the ``trio``s.
|
||||||
|
|
||||||
# :()
|
# :()
|
||||||
|
|
||||||
# Don't worry, we've heard you'll barely notice. You might hallucinate
|
# Don't worry, we've heard you'll barely notice. You might
|
||||||
# a few more propagating errors and feel like your digestion has
|
# hallucinate a few more propagating errors and feel like your
|
||||||
# slowed but if anything get's too bad your parents will know about
|
# digestion has slowed but if anything get's too bad your parents
|
||||||
# it.
|
# will know about it.
|
||||||
|
|
||||||
# :)
|
# :)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue