Add a cancel scope around child debugger requests
This is needed in order to avoid the deadlock condition where a child actor is waiting on the root actor's tty lock but it's parent (possibly the root) is waiting on it to terminate after sending a cancel request. The solution is simple: create a cancel scope around the request in the child and always cancel it when a cancel request from the parent arrives.debug_tests
parent
363498b882
commit
25e93925b0
|
@ -27,7 +27,7 @@ from ._exceptions import (
|
||||||
unpack_error,
|
unpack_error,
|
||||||
ModuleNotExposed
|
ModuleNotExposed
|
||||||
)
|
)
|
||||||
from ._debug import _maybe_enter_pm
|
from . import _debug
|
||||||
from ._discovery import get_arbiter
|
from ._discovery import get_arbiter
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from . import _state
|
from . import _state
|
||||||
|
@ -129,7 +129,7 @@ async def _invoke(
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# NOTE: don't enter debug mode recursively after quitting pdb
|
# NOTE: don't enter debug mode recursively after quitting pdb
|
||||||
log.exception("Actor crashed:")
|
log.exception("Actor crashed:")
|
||||||
await _maybe_enter_pm(err)
|
await _debug._maybe_enter_pm(err)
|
||||||
|
|
||||||
# always ship errors back to caller
|
# always ship errors back to caller
|
||||||
err_msg = pack_error(err)
|
err_msg = pack_error(err)
|
||||||
|
@ -832,6 +832,14 @@ class Actor:
|
||||||
|
|
||||||
# cancel all ongoing rpc tasks
|
# cancel all ongoing rpc tasks
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
|
|
||||||
|
# kill any debugger request task to avoid deadlock
|
||||||
|
# with the root actor in this tree
|
||||||
|
dbcs = _debug._debugger_request_cs
|
||||||
|
if dbcs is not None:
|
||||||
|
log.debug("Cancelling active debugger request")
|
||||||
|
dbcs.cancel()
|
||||||
|
|
||||||
# kill all ongoing tasks
|
# kill all ongoing tasks
|
||||||
await self.cancel_rpc_tasks()
|
await self.cancel_rpc_tasks()
|
||||||
|
|
||||||
|
@ -1051,7 +1059,7 @@ async def _start_actor(
|
||||||
result = await main()
|
result = await main()
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
log.exception("Actor crashed:")
|
log.exception("Actor crashed:")
|
||||||
await _maybe_enter_pm(err)
|
await _debug._maybe_enter_pm(err)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# XXX: the actor is cancelled when this context is complete
|
# XXX: the actor is cancelled when this context is complete
|
||||||
|
|
|
@ -11,6 +11,7 @@ import signal
|
||||||
from async_generator import aclosing
|
from async_generator import aclosing
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
|
# from trio.testing import wait_all_tasks_blocked
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from . import _state
|
from . import _state
|
||||||
|
@ -39,6 +40,11 @@ _in_debug = False
|
||||||
# lock in root actor preventing multi-access to local tty
|
# lock in root actor preventing multi-access to local tty
|
||||||
_debug_lock = trio.StrictFIFOLock()
|
_debug_lock = trio.StrictFIFOLock()
|
||||||
|
|
||||||
|
# XXX: set by the current task waiting on the root tty lock
|
||||||
|
# and must be cancelled if this actor is cancelled via message
|
||||||
|
# otherwise deadlocks with the parent actor may ensure
|
||||||
|
_debugger_request_cs: trio.CancelScope = None
|
||||||
|
|
||||||
|
|
||||||
class TractorConfig(pdbpp.DefaultConfig):
|
class TractorConfig(pdbpp.DefaultConfig):
|
||||||
"""Custom ``pdbpp`` goodness.
|
"""Custom ``pdbpp`` goodness.
|
||||||
|
@ -119,8 +125,7 @@ async def _acquire_debug_lock():
|
||||||
log.error("TTY lock acquired")
|
log.error("TTY lock acquired")
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
if _debug_lock.locked():
|
_debug_lock.release()
|
||||||
_debug_lock.release()
|
|
||||||
log.error("TTY lock released")
|
log.error("TTY lock released")
|
||||||
|
|
||||||
|
|
||||||
|
@ -151,16 +156,6 @@ def _disable_sigint():
|
||||||
async def _hijack_stdin_relay_to_child(
|
async def _hijack_stdin_relay_to_child(
|
||||||
subactor_uid: Tuple[str, str]
|
subactor_uid: Tuple[str, str]
|
||||||
) -> None:
|
) -> None:
|
||||||
actor = tractor.current_actor()
|
|
||||||
nursery = actor._actoruid2nursery[subactor_uid]
|
|
||||||
print(f'NURSERY: {nursery}')
|
|
||||||
print(f'nursery is cancelled {nursery.cancelled}')
|
|
||||||
if actor._is_cancelled or nursery.cancelled:
|
|
||||||
raise RuntimeError(
|
|
||||||
"Can not engage debugger actor is already cancelled")
|
|
||||||
|
|
||||||
await trio.sleep(0)
|
|
||||||
|
|
||||||
# TODO: when we get to true remote debugging
|
# TODO: when we get to true remote debugging
|
||||||
# this will deliver stdin data
|
# this will deliver stdin data
|
||||||
log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
||||||
|
@ -172,6 +167,7 @@ async def _hijack_stdin_relay_to_child(
|
||||||
yield 'Locked'
|
yield 'Locked'
|
||||||
|
|
||||||
# wait for cancellation of stream by child
|
# wait for cancellation of stream by child
|
||||||
|
# indicating debugger is dis-engaged
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
|
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
|
||||||
|
@ -190,37 +186,40 @@ def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
async def wait_for_parent_stdin_hijack(
|
async def wait_for_parent_stdin_hijack(
|
||||||
task_status=trio.TASK_STATUS_IGNORED
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
):
|
):
|
||||||
try:
|
global _debugger_request_cs
|
||||||
async with tractor._portal.open_portal(
|
with trio.CancelScope() as cs:
|
||||||
actor._parent_chan,
|
_debugger_request_cs = cs
|
||||||
start_msg_loop=False,
|
try:
|
||||||
# shield=True,
|
async with tractor._portal.open_portal(
|
||||||
) as portal:
|
actor._parent_chan,
|
||||||
with trio.fail_after(1):
|
start_msg_loop=False,
|
||||||
agen = await portal.run(
|
# shield=True,
|
||||||
'tractor._debug',
|
) as portal:
|
||||||
'_hijack_stdin_relay_to_child',
|
with trio.fail_after(1):
|
||||||
subactor_uid=actor.uid,
|
agen = await portal.run(
|
||||||
)
|
'tractor._debug',
|
||||||
async with aclosing(agen):
|
'_hijack_stdin_relay_to_child',
|
||||||
|
subactor_uid=actor.uid,
|
||||||
|
)
|
||||||
|
async with aclosing(agen):
|
||||||
|
|
||||||
# block until first yield above
|
# block until first yield above
|
||||||
async for val in agen:
|
async for val in agen:
|
||||||
|
|
||||||
assert val == 'Locked'
|
assert val == 'Locked'
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
|
||||||
# with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
await do_unlock.wait()
|
await do_unlock.wait()
|
||||||
|
|
||||||
# trigger cancellation of remote stream
|
# trigger cancellation of remote stream
|
||||||
break
|
break
|
||||||
finally:
|
finally:
|
||||||
log.debug(f"Exiting debugger for actor {actor}")
|
log.debug(f"Exiting debugger for actor {actor}")
|
||||||
global _in_debug
|
global _in_debug
|
||||||
_in_debug = False
|
_in_debug = False
|
||||||
# actor.statespace['_in_debug'] = False
|
# actor.statespace['_in_debug'] = False
|
||||||
log.debug(f"Child {actor} released parent stdio lock")
|
log.debug(f"Child {actor} released parent stdio lock")
|
||||||
|
|
||||||
async def _bp():
|
async def _bp():
|
||||||
"""Async breakpoint which schedules a parent stdio lock, and once complete
|
"""Async breakpoint which schedules a parent stdio lock, and once complete
|
||||||
|
@ -237,7 +236,6 @@ def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
f"Actor {actor.uid} already has a debug lock, waiting...")
|
f"Actor {actor.uid} already has a debug lock, waiting...")
|
||||||
await do_unlock.wait()
|
await do_unlock.wait()
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
# return
|
|
||||||
|
|
||||||
# assign unlock callback for debugger teardown hooks
|
# assign unlock callback for debugger teardown hooks
|
||||||
global _pdb_release_hook
|
global _pdb_release_hook
|
||||||
|
@ -253,9 +251,6 @@ def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
# being restricted by the scope of a new task nursery.
|
# being restricted by the scope of a new task nursery.
|
||||||
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
||||||
|
|
||||||
# block here one (at the appropriate frame *up* where
|
|
||||||
# ``breakpoint()`` was awaited and begin handling stdio
|
|
||||||
# debug_func(actor)
|
|
||||||
else:
|
else:
|
||||||
# we also wait in the root-parent for any child that
|
# we also wait in the root-parent for any child that
|
||||||
# may have the tty locked prior
|
# may have the tty locked prior
|
||||||
|
@ -270,6 +265,7 @@ def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
|
|
||||||
# block here one (at the appropriate frame *up* where
|
# block here one (at the appropriate frame *up* where
|
||||||
# ``breakpoint()`` was awaited and begin handling stdio
|
# ``breakpoint()`` was awaited and begin handling stdio
|
||||||
|
log.debug("Entering the synchronous world of the debugger")
|
||||||
debug_func(actor)
|
debug_func(actor)
|
||||||
|
|
||||||
# user code **must** await this!
|
# user code **must** await this!
|
||||||
|
|
Loading…
Reference in New Issue