forked from goodboy/tractor
commit
87ce6c8eb3
newsfragments
|
@ -0,0 +1,6 @@
|
||||||
|
Handle broken channel/stream faults where the root's tty lock is left acquired by some
|
||||||
|
child actor who went MIA and the root ends up hanging indefinitely.
|
||||||
|
|
||||||
|
There's two parts here:
|
||||||
|
- Don't shield wait on the lock
|
||||||
|
- Always do our best to release the lock on the expected worst case connection faults
|
|
@ -23,6 +23,7 @@ from ._exceptions import (
|
||||||
from ._debug import breakpoint, post_mortem
|
from ._debug import breakpoint, post_mortem
|
||||||
from . import msg
|
from . import msg
|
||||||
from ._root import run, run_daemon, open_root_actor
|
from ._root import run, run_daemon, open_root_actor
|
||||||
|
from ._portal import Portal
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
@ -40,6 +41,7 @@ __all__ = [
|
||||||
'msg',
|
'msg',
|
||||||
'open_nursery',
|
'open_nursery',
|
||||||
'open_root_actor',
|
'open_root_actor',
|
||||||
|
'Portal',
|
||||||
'post_mortem',
|
'post_mortem',
|
||||||
'run',
|
'run',
|
||||||
'run_daemon',
|
'run_daemon',
|
||||||
|
|
|
@ -196,6 +196,22 @@ async def _acquire_debug_lock(
|
||||||
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
||||||
|
|
||||||
|
|
||||||
|
def handler(signum, frame, *args):
|
||||||
|
"""Specialized debugger compatible SIGINT handler.
|
||||||
|
|
||||||
|
In childred we always ignore to avoid deadlocks since cancellation
|
||||||
|
should always be managed by the parent supervising actor. The root
|
||||||
|
is always cancelled on ctrl-c.
|
||||||
|
"""
|
||||||
|
if is_root_process():
|
||||||
|
tractor.current_actor().cancel_soon()
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
"tractor ignores SIGINT while in debug mode\n"
|
||||||
|
"If you have a special need for it please open an issue.\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def _hijack_stdin_for_child(
|
async def _hijack_stdin_for_child(
|
||||||
|
|
||||||
|
@ -222,24 +238,36 @@ async def _hijack_stdin_for_child(
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
|
|
||||||
async with _acquire_debug_lock(subactor_uid):
|
try:
|
||||||
|
lock = None
|
||||||
|
async with _acquire_debug_lock(subactor_uid) as lock:
|
||||||
|
|
||||||
# indicate to child that we've locked stdio
|
# indicate to child that we've locked stdio
|
||||||
await ctx.started('Locked')
|
await ctx.started('Locked')
|
||||||
log.pdb(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
log.pdb(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
||||||
|
|
||||||
# wait for unlock pdb by child
|
# wait for unlock pdb by child
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
try:
|
|
||||||
assert await stream.receive() == 'pdb_unlock'
|
assert await stream.receive() == 'pdb_unlock'
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
# try:
|
||||||
# XXX: there may be a race with the portal teardown
|
# assert await stream.receive() == 'pdb_unlock'
|
||||||
# with the calling actor which we can safely ignore.
|
|
||||||
# The alternative would be sending an ack message
|
except (
|
||||||
# and allowing the client to wait for us to teardown
|
trio.BrokenResourceError,
|
||||||
# first?
|
trio.Cancelled, # by local cancellation
|
||||||
pass
|
trio.ClosedResourceError, # by self._rx_chan
|
||||||
|
) as err:
|
||||||
|
# XXX: there may be a race with the portal teardown
|
||||||
|
# with the calling actor which we can safely ignore.
|
||||||
|
# The alternative would be sending an ack message
|
||||||
|
# and allowing the client to wait for us to teardown
|
||||||
|
# first?
|
||||||
|
if lock and lock.locked():
|
||||||
|
lock.release()
|
||||||
|
|
||||||
|
if isinstance(err, trio.Cancelled):
|
||||||
|
raise
|
||||||
|
|
||||||
log.debug(f"TTY lock released, remote task: {task_name}:{subactor_uid}")
|
log.debug(f"TTY lock released, remote task: {task_name}:{subactor_uid}")
|
||||||
|
|
||||||
|
@ -283,7 +311,7 @@ async def _breakpoint(
|
||||||
try:
|
try:
|
||||||
async with get_root() as portal:
|
async with get_root() as portal:
|
||||||
|
|
||||||
log.error('got portal')
|
log.pdb('got portal')
|
||||||
|
|
||||||
# this syncs to child's ``Context.started()`` call.
|
# this syncs to child's ``Context.started()`` call.
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
|
@ -293,7 +321,7 @@ async def _breakpoint(
|
||||||
|
|
||||||
) as (ctx, val):
|
) as (ctx, val):
|
||||||
|
|
||||||
log.error('locked context')
|
log.pdb('locked context')
|
||||||
assert val == 'Locked'
|
assert val == 'Locked'
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
@ -376,10 +404,17 @@ async def _breakpoint(
|
||||||
# callbacks. Can't think of a nicer way then this atm.
|
# callbacks. Can't think of a nicer way then this atm.
|
||||||
if _debug_lock.locked():
|
if _debug_lock.locked():
|
||||||
log.warning(
|
log.warning(
|
||||||
'Root actor attempting to acquire active tty lock'
|
'Root actor attempting to shield-acquire active tty lock'
|
||||||
f' owned by {_global_actor_in_debug}')
|
f' owned by {_global_actor_in_debug}')
|
||||||
|
|
||||||
await _debug_lock.acquire()
|
with trio.CancelScope(shield=True):
|
||||||
|
# must shield here to avoid hitting a ``Cancelled`` and
|
||||||
|
# a child getting stuck bc we clobbered the tty
|
||||||
|
await _debug_lock.acquire()
|
||||||
|
|
||||||
|
else:
|
||||||
|
# may be cancelled
|
||||||
|
await _debug_lock.acquire()
|
||||||
|
|
||||||
_global_actor_in_debug = actor.uid
|
_global_actor_in_debug = actor.uid
|
||||||
_local_task_in_debug = task_name
|
_local_task_in_debug = task_name
|
||||||
|
@ -402,12 +437,13 @@ async def _breakpoint(
|
||||||
debug_func(actor)
|
debug_func(actor)
|
||||||
|
|
||||||
|
|
||||||
def _mk_pdb():
|
def _mk_pdb() -> PdbwTeardown:
|
||||||
|
|
||||||
# XXX: setting these flags on the pdb instance are absolutely
|
# XXX: setting these flags on the pdb instance are absolutely
|
||||||
# critical to having ctrl-c work in the ``trio`` standard way!
|
# critical to having ctrl-c work in the ``trio`` standard way! The
|
||||||
# The stdlib's pdb supports entering the current sync frame
|
# stdlib's pdb supports entering the current sync frame on a SIGINT,
|
||||||
# on a SIGINT, with ``trio`` we pretty much never want this
|
# with ``trio`` we pretty much never want this and if we did we can
|
||||||
# and we did we can handle it in the ``tractor`` task runtime.
|
# handle it in the ``tractor`` task runtime.
|
||||||
|
|
||||||
pdb = PdbwTeardown()
|
pdb = PdbwTeardown()
|
||||||
pdb.allow_kbdint = True
|
pdb.allow_kbdint = True
|
||||||
|
|
|
@ -62,7 +62,7 @@ def _trio_main(
|
||||||
"""
|
"""
|
||||||
# Disable sigint handling in children;
|
# Disable sigint handling in children;
|
||||||
# we don't need it thanks to our cancellation machinery.
|
# we don't need it thanks to our cancellation machinery.
|
||||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
# signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||||
|
|
||||||
log.info(f"Started new trio process for {actor.uid}")
|
log.info(f"Started new trio process for {actor.uid}")
|
||||||
|
|
||||||
|
|
|
@ -298,8 +298,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
f'child {_debug._global_actor_in_debug}\n'
|
f'child {_debug._global_actor_in_debug}\n'
|
||||||
'Waiting on tty lock to release..')
|
'Waiting on tty lock to release..')
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
await debug_complete.wait()
|
await debug_complete.wait()
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# if the caller's scope errored then we activate our
|
||||||
# one-cancels-all supervisor strategy (don't
|
# one-cancels-all supervisor strategy (don't
|
||||||
|
|
Loading…
Reference in New Issue