forked from goodboy/tractor
Solve the root-cancels-child-in-tty-lock race
Finally this makes a cancelled root actor nursery not clobber child tasks which request and lock the root's tty for the debugger repl. Using an edge triggered event which is set after all fifo-lock-queued tasks are complete, we can be sure that no lingering child tasks are going to get interrupted during pdb use and tty lock acquisition. Further, even if new tasks do queue up to get the lock, the root will incrementally send cancel msgs to each sub-actor only once the tty is not locked by a (set of) child request task(s). Add shielding around all the critical sections where the child attempts to allocate the lock from the root such that it won't be disrupted from cancel messages from the root after the acquire lock transaction has started.pre_bad_close
parent
9fa451fdd3
commit
900f04a31d
|
@ -124,10 +124,11 @@ class PdbwTeardown(pdbpp.Pdb):
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
||||||
"""Acquire a actor local FIFO lock meant to mutex entry to a local
|
'''Acquire a actor local FIFO lock meant to mutex entry to a local
|
||||||
debugger entry point to avoid tty clobbering by multiple processes.
|
debugger entry point to avoid tty clobbering a global root process.
|
||||||
"""
|
|
||||||
global _debug_lock, _global_actor_in_debug
|
'''
|
||||||
|
global _debug_lock, _global_actor_in_debug, _no_remote_has_tty
|
||||||
|
|
||||||
task_name = trio.lowlevel.current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
|
@ -135,15 +136,61 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
||||||
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
|
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
|
||||||
)
|
)
|
||||||
|
|
||||||
async with _debug_lock:
|
we_acquired = False
|
||||||
|
|
||||||
|
if _no_remote_has_tty is None:
|
||||||
|
# mark the tty lock as being in use so that the runtime
|
||||||
|
# can try to avoid clobbering any connection from a child
|
||||||
|
# that's currently relying on it.
|
||||||
|
_no_remote_has_tty = trio.Event()
|
||||||
|
|
||||||
|
try:
|
||||||
|
log.debug(
|
||||||
|
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
||||||
|
)
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
we_acquired = True
|
||||||
|
await _debug_lock.acquire()
|
||||||
|
|
||||||
|
# we_acquired = True
|
||||||
|
|
||||||
_global_actor_in_debug = uid
|
_global_actor_in_debug = uid
|
||||||
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
||||||
|
|
||||||
yield
|
# NOTE: critical section!
|
||||||
|
# this yield is unshielded.
|
||||||
|
# IF we received a cancel during the shielded lock
|
||||||
|
# entry of some next-in-queue requesting task,
|
||||||
|
# then the resumption here will result in that
|
||||||
|
# Cancelled being raised to our caller below!
|
||||||
|
|
||||||
_global_actor_in_debug = None
|
# in this case the finally below should trigger
|
||||||
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
# and the surrounding calle side context should cancel
|
||||||
|
# normally relaying back to the caller.
|
||||||
|
|
||||||
|
yield _debug_lock
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# if _global_actor_in_debug == uid:
|
||||||
|
if we_acquired and _debug_lock.locked():
|
||||||
|
_debug_lock.release()
|
||||||
|
|
||||||
|
# IFF there are no more requesting tasks queued up fire, the
|
||||||
|
# "tty-unlocked" event thereby alerting any monitors of the lock that
|
||||||
|
# we are now back in the "tty unlocked" state. This is basically
|
||||||
|
# and edge triggered signal around an empty queue of sub-actor
|
||||||
|
# tasks that may have tried to acquire the lock.
|
||||||
|
stats = _debug_lock.statistics()
|
||||||
|
if (
|
||||||
|
not stats.owner
|
||||||
|
):
|
||||||
|
log.pdb(f"No more tasks waiting on tty lock! says {uid}")
|
||||||
|
_no_remote_has_tty.set()
|
||||||
|
_no_remote_has_tty = None
|
||||||
|
|
||||||
|
_global_actor_in_debug = None
|
||||||
|
|
||||||
|
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
||||||
|
|
||||||
|
|
||||||
# @contextmanager
|
# @contextmanager
|
||||||
|
@ -169,53 +216,43 @@ async def _hijack_stdin_relay_to_child(
|
||||||
bossing.
|
bossing.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _no_remote_has_tty
|
|
||||||
|
|
||||||
# mark the tty lock as being in use so that the runtime
|
|
||||||
# can try to avoid clobbering any connection from a child
|
|
||||||
# that's currently relying on it.
|
|
||||||
_no_remote_has_tty = trio.Event()
|
|
||||||
|
|
||||||
task_name = trio.lowlevel.current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
# TODO: when we get to true remote debugging
|
# TODO: when we get to true remote debugging
|
||||||
# this will deliver stdin data?
|
# this will deliver stdin data?
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
"Attempting to acquire TTY lock, "
|
"Attempting to acquire TTY lock\n"
|
||||||
f"remote task: {task_name}:{subactor_uid}"
|
f"remote task: {task_name}:{subactor_uid}"
|
||||||
)
|
)
|
||||||
|
|
||||||
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
||||||
|
|
||||||
async with _acquire_debug_lock(subactor_uid):
|
with trio.CancelScope(shield=True):
|
||||||
|
|
||||||
# XXX: only shield the context sync step!
|
async with _acquire_debug_lock(subactor_uid):
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
|
|
||||||
# indicate to child that we've locked stdio
|
# indicate to child that we've locked stdio
|
||||||
await ctx.started('Locked')
|
await ctx.started('Locked')
|
||||||
log.pdb( # type: ignore
|
log.pdb( # type: ignore
|
||||||
f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
||||||
|
|
||||||
# wait for unlock pdb by child
|
# wait for unlock pdb by child
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
try:
|
try:
|
||||||
assert await stream.receive() == 'pdb_unlock'
|
assert await stream.receive() == 'pdb_unlock'
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
# XXX: there may be a race with the portal teardown
|
# XXX: there may be a race with the portal teardown
|
||||||
# with the calling actor which we can safely ignore
|
# with the calling actor which we can safely ignore
|
||||||
# the alternative would be sending an ack message
|
# the alternative would be sending an ack message
|
||||||
# and allowing the client to wait for us to teardown
|
# and allowing the client to wait for us to teardown
|
||||||
# first?
|
# first?
|
||||||
pass
|
pass
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
f"TTY lock released, remote task: {task_name}:{subactor_uid}")
|
f"TTY lock released, remote task: {task_name}:{subactor_uid}")
|
||||||
|
|
||||||
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
|
|
||||||
_no_remote_has_tty.set()
|
|
||||||
return "pdb_unlock_complete"
|
return "pdb_unlock_complete"
|
||||||
|
|
||||||
|
|
||||||
|
@ -243,17 +280,21 @@ async def _breakpoint(
|
||||||
global _local_pdb_complete, _pdb_release_hook
|
global _local_pdb_complete, _pdb_release_hook
|
||||||
global _local_task_in_debug, _global_actor_in_debug
|
global _local_task_in_debug, _global_actor_in_debug
|
||||||
|
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
async def wait_for_parent_stdin_hijack(
|
async def wait_for_parent_stdin_hijack(
|
||||||
task_status=trio.TASK_STATUS_IGNORED
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
):
|
):
|
||||||
global _debugger_request_cs
|
global _debugger_request_cs
|
||||||
|
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope(shield=True) as cs:
|
||||||
_debugger_request_cs = cs
|
_debugger_request_cs = cs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with get_root() as portal:
|
async with get_root() as portal:
|
||||||
|
|
||||||
|
log.error('got portal')
|
||||||
|
|
||||||
# this syncs to child's ``Context.started()`` call.
|
# this syncs to child's ``Context.started()`` call.
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
|
|
||||||
|
@ -262,18 +303,22 @@ async def _breakpoint(
|
||||||
|
|
||||||
) as (ctx, val):
|
) as (ctx, val):
|
||||||
|
|
||||||
|
log.error('locked context')
|
||||||
assert val == 'Locked'
|
assert val == 'Locked'
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
log.error('opened stream')
|
||||||
# unblock local caller
|
# unblock local caller
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
|
||||||
# TODO: shielding currently can cause hangs...
|
try:
|
||||||
# with trio.CancelScope(shield=True):
|
await _local_pdb_complete.wait()
|
||||||
|
|
||||||
await _local_pdb_complete.wait()
|
finally:
|
||||||
await stream.send('pdb_unlock')
|
# TODO: shielding currently can cause hangs...
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await stream.send('pdb_unlock')
|
||||||
|
|
||||||
# sync with callee termination
|
# sync with callee termination
|
||||||
assert await ctx.result() == "pdb_unlock_complete"
|
assert await ctx.result() == "pdb_unlock_complete"
|
||||||
|
@ -292,6 +337,7 @@ async def _breakpoint(
|
||||||
|
|
||||||
# TODO: need a more robust check for the "root" actor
|
# TODO: need a more robust check for the "root" actor
|
||||||
if actor._parent_chan and not is_root_process():
|
if actor._parent_chan and not is_root_process():
|
||||||
|
|
||||||
if _local_task_in_debug:
|
if _local_task_in_debug:
|
||||||
if _local_task_in_debug == task_name:
|
if _local_task_in_debug == task_name:
|
||||||
# this task already has the lock and is
|
# this task already has the lock and is
|
||||||
|
@ -321,7 +367,8 @@ async def _breakpoint(
|
||||||
# we have to figure out how to avoid having the service nursery
|
# we have to figure out how to avoid having the service nursery
|
||||||
# cancel on this task start? I *think* this works below?
|
# cancel on this task start? I *think* this works below?
|
||||||
# actor._service_n.cancel_scope.shield = shield
|
# actor._service_n.cancel_scope.shield = shield
|
||||||
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
with trio.CancelScope(shield=True):
|
||||||
|
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
||||||
# actor._service_n.cancel_scope.shield = False
|
# actor._service_n.cancel_scope.shield = False
|
||||||
|
|
||||||
elif is_root_process():
|
elif is_root_process():
|
||||||
|
@ -339,6 +386,11 @@ async def _breakpoint(
|
||||||
# XXX: since we need to enter pdb synchronously below,
|
# XXX: since we need to enter pdb synchronously below,
|
||||||
# we have to release the lock manually from pdb completion
|
# we have to release the lock manually from pdb completion
|
||||||
# callbacks. Can't think of a nicer way then this atm.
|
# callbacks. Can't think of a nicer way then this atm.
|
||||||
|
if _debug_lock.locked():
|
||||||
|
log.warning(
|
||||||
|
'Root actor attempting to acquire active tty lock'
|
||||||
|
f' owned by {_global_actor_in_debug}')
|
||||||
|
|
||||||
await _debug_lock.acquire()
|
await _debug_lock.acquire()
|
||||||
|
|
||||||
_global_actor_in_debug = actor.uid
|
_global_actor_in_debug = actor.uid
|
||||||
|
|
|
@ -170,16 +170,25 @@ class ActorNursery:
|
||||||
|
|
||||||
log.warning(f"Cancelling nursery in {self._actor.uid}")
|
log.warning(f"Cancelling nursery in {self._actor.uid}")
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
|
|
||||||
for subactor, proc, portal in self._children.values():
|
for subactor, proc, portal in self._children.values():
|
||||||
|
|
||||||
|
# TODO: are we ever even going to use this or
|
||||||
|
# is the spawning backend responsible for such
|
||||||
|
# things? I'm thinking latter.
|
||||||
if hard_kill:
|
if hard_kill:
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if portal is None: # actor hasn't fully spawned yet
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
event = self._actor._peer_connected[subactor.uid]
|
event = self._actor._peer_connected[subactor.uid]
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{subactor.uid} wasn't finished spawning?")
|
f"{subactor.uid} wasn't finished spawning?")
|
||||||
|
|
||||||
await event.wait()
|
await event.wait()
|
||||||
|
|
||||||
# channel/portal should now be up
|
# channel/portal should now be up
|
||||||
_, _, portal = self._children[subactor.uid]
|
_, _, portal = self._children[subactor.uid]
|
||||||
|
|
||||||
|
@ -239,6 +248,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# As such if the strategy propagates any error(s) upwards
|
# As such if the strategy propagates any error(s) upwards
|
||||||
# the above "daemon actor" nursery will be notified.
|
# the above "daemon actor" nursery will be notified.
|
||||||
async with trio.open_nursery() as ria_nursery:
|
async with trio.open_nursery() as ria_nursery:
|
||||||
|
|
||||||
anursery = ActorNursery(
|
anursery = ActorNursery(
|
||||||
actor,
|
actor,
|
||||||
ria_nursery,
|
ria_nursery,
|
||||||
|
@ -249,35 +259,53 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# spawning of actors happens in the caller's scope
|
# spawning of actors happens in the caller's scope
|
||||||
# after we yield upwards
|
# after we yield upwards
|
||||||
yield anursery
|
yield anursery
|
||||||
log.debug(
|
|
||||||
|
log.runtime(
|
||||||
f"Waiting on subactors {anursery._children} "
|
f"Waiting on subactors {anursery._children} "
|
||||||
"to complete"
|
"to complete"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Last bit before first nursery block ends in the case
|
||||||
|
# where we didn't error in the caller's scope
|
||||||
|
|
||||||
|
# signal all process monitor tasks to conduct
|
||||||
|
# hard join phase.
|
||||||
|
anursery._join_procs.set()
|
||||||
|
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
|
|
||||||
if is_root_process() and (
|
# If we error in the root but the debugger is
|
||||||
type(err) in {
|
# engaged we don't want to prematurely kill (and
|
||||||
Exception, trio.MultiError, trio.Cancelled
|
# thus clobber access to) the local tty since it
|
||||||
}
|
# will make the pdb repl unusable.
|
||||||
):
|
# Instead try to wait for pdb to be released before
|
||||||
# if we error in the root but the debugger is
|
# tearing down.
|
||||||
# engaged we don't want to prematurely kill (and
|
if is_root_process():
|
||||||
# thus clobber access to) the local tty streams.
|
log.exception(f"we're root with {err}")
|
||||||
# instead try to wait for pdb to be released before
|
|
||||||
# tearing down.
|
|
||||||
debug_complete = _debug._pdb_complete
|
|
||||||
if debug_complete and not debug_complete.is_set():
|
|
||||||
log.warning(
|
|
||||||
"Root has errored but pdb is active..waiting "
|
|
||||||
"on debug lock")
|
|
||||||
await _debug._pdb_complete.wait()
|
|
||||||
|
|
||||||
# raise
|
# wait to see if a sub-actor task
|
||||||
|
# will be scheduled and grab the tty
|
||||||
|
# lock on the next tick
|
||||||
|
# await trio.testing.wait_all_tasks_blocked()
|
||||||
|
|
||||||
|
debug_complete = _debug._no_remote_has_tty
|
||||||
|
if (
|
||||||
|
debug_complete and
|
||||||
|
not debug_complete.is_set()
|
||||||
|
):
|
||||||
|
log.warning(
|
||||||
|
'Root has errored but pdb is in use by '
|
||||||
|
f'child {_debug._global_actor_in_debug}\n'
|
||||||
|
'Waiting on tty lock to release..')
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await debug_complete.wait()
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# if the caller's scope errored then we activate our
|
||||||
# one-cancels-all supervisor strategy (don't
|
# one-cancels-all supervisor strategy (don't
|
||||||
# worry more are coming).
|
# worry more are coming).
|
||||||
anursery._join_procs.set()
|
anursery._join_procs.set()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# XXX: hypothetically an error could be
|
# XXX: hypothetically an error could be
|
||||||
# raised and then a cancel signal shows up
|
# raised and then a cancel signal shows up
|
||||||
|
@ -313,15 +341,18 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# Last bit before first nursery block ends in the case
|
|
||||||
# where we didn't error in the caller's scope
|
|
||||||
log.debug("Waiting on all subactors to complete")
|
|
||||||
anursery._join_procs.set()
|
|
||||||
|
|
||||||
# ria_nursery scope end
|
# ria_nursery scope end
|
||||||
|
|
||||||
# XXX: do we need a `trio.Cancelled` catch here as well?
|
# XXX: do we need a `trio.Cancelled` catch here as well?
|
||||||
except (Exception, trio.MultiError, trio.Cancelled) as err:
|
# this is the catch around the ``.run_in_actor()`` nursery
|
||||||
|
except (
|
||||||
|
|
||||||
|
Exception,
|
||||||
|
trio.MultiError,
|
||||||
|
trio.Cancelled
|
||||||
|
|
||||||
|
) as err:
|
||||||
|
|
||||||
# If actor-local error was raised while waiting on
|
# If actor-local error was raised while waiting on
|
||||||
# ".run_in_actor()" actors then we also want to cancel all
|
# ".run_in_actor()" actors then we also want to cancel all
|
||||||
# remaining sub-actors (due to our lone strategy:
|
# remaining sub-actors (due to our lone strategy:
|
||||||
|
|
Loading…
Reference in New Issue