forked from goodboy/tractor
Breakout `wait_for_parent_stdin_hijack()`, increase root pdb checker poll time
parent
f3a6ab62af
commit
d30ce96740
|
@ -272,12 +272,72 @@ async def _hijack_stdin_for_child(
|
||||||
if isinstance(err, trio.Cancelled):
|
if isinstance(err, trio.Cancelled):
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
log.debug("TTY lock released, remote task:"
|
log.debug(
|
||||||
f"{task_name}:{subactor_uid}")
|
"TTY lock released, remote task:"
|
||||||
|
f"{task_name}:{subactor_uid}")
|
||||||
|
|
||||||
return "pdb_unlock_complete"
|
return "pdb_unlock_complete"
|
||||||
|
|
||||||
|
|
||||||
|
async def wait_for_parent_stdin_hijack(
|
||||||
|
actor: 'Actor', # noqa
|
||||||
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Connect to the root actor via a ctx and invoke a task which locks a root-local
|
||||||
|
TTY lock.
|
||||||
|
|
||||||
|
This function is used by any sub-actor to acquire mutex access to
|
||||||
|
pdb and the root's TTY for interactive debugging (see below inside
|
||||||
|
``_breakpoint()``). It can be used to ensure that an intermediate
|
||||||
|
nursery-owning actor does not clobber its children if they are in
|
||||||
|
debug (see below inside ``maybe_wait_for_debugger()``).
|
||||||
|
|
||||||
|
'''
|
||||||
|
global _debugger_request_cs
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True) as cs:
|
||||||
|
_debugger_request_cs = cs
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with get_root() as portal:
|
||||||
|
|
||||||
|
# this syncs to child's ``Context.started()`` call.
|
||||||
|
async with portal.open_context(
|
||||||
|
|
||||||
|
tractor._debug._hijack_stdin_for_child,
|
||||||
|
subactor_uid=actor.uid,
|
||||||
|
|
||||||
|
) as (ctx, val):
|
||||||
|
|
||||||
|
log.pdb('locked context')
|
||||||
|
assert val == 'Locked'
|
||||||
|
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
# unblock local caller
|
||||||
|
task_status.started(cs)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await _local_pdb_complete.wait()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# TODO: shielding currently can cause hangs...
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await stream.send('pdb_unlock')
|
||||||
|
|
||||||
|
# sync with callee termination
|
||||||
|
assert await ctx.result() == "pdb_unlock_complete"
|
||||||
|
|
||||||
|
except tractor.ContextCancelled:
|
||||||
|
log.warning('Root actor cancelled debug lock')
|
||||||
|
|
||||||
|
finally:
|
||||||
|
log.debug(f"Exiting debugger for actor {actor.uid}")
|
||||||
|
global _local_task_in_debug
|
||||||
|
_local_task_in_debug = None
|
||||||
|
log.debug(f"Child {actor.uid} released parent stdio lock")
|
||||||
|
|
||||||
|
|
||||||
async def _breakpoint(
|
async def _breakpoint(
|
||||||
|
|
||||||
debug_func,
|
debug_func,
|
||||||
|
@ -304,54 +364,6 @@ async def _breakpoint(
|
||||||
|
|
||||||
await trio.lowlevel.checkpoint()
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
async def wait_for_parent_stdin_hijack(
|
|
||||||
task_status=trio.TASK_STATUS_IGNORED
|
|
||||||
):
|
|
||||||
global _debugger_request_cs
|
|
||||||
|
|
||||||
with trio.CancelScope(shield=True) as cs:
|
|
||||||
_debugger_request_cs = cs
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with get_root() as portal:
|
|
||||||
|
|
||||||
# this syncs to child's ``Context.started()`` call.
|
|
||||||
async with portal.open_context(
|
|
||||||
|
|
||||||
tractor._debug._hijack_stdin_for_child,
|
|
||||||
subactor_uid=actor.uid,
|
|
||||||
|
|
||||||
) as (ctx, val):
|
|
||||||
|
|
||||||
log.pdb('locked context')
|
|
||||||
assert val == 'Locked'
|
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
|
||||||
|
|
||||||
log.error('opened stream')
|
|
||||||
# unblock local caller
|
|
||||||
task_status.started()
|
|
||||||
|
|
||||||
try:
|
|
||||||
await _local_pdb_complete.wait()
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# TODO: shielding currently can cause hangs...
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await stream.send('pdb_unlock')
|
|
||||||
|
|
||||||
# sync with callee termination
|
|
||||||
assert await ctx.result() == "pdb_unlock_complete"
|
|
||||||
|
|
||||||
except tractor.ContextCancelled:
|
|
||||||
log.warning('Root actor cancelled debug lock')
|
|
||||||
|
|
||||||
finally:
|
|
||||||
log.debug(f"Exiting debugger for actor {actor}")
|
|
||||||
global _local_task_in_debug
|
|
||||||
_local_task_in_debug = None
|
|
||||||
log.debug(f"Child {actor} released parent stdio lock")
|
|
||||||
|
|
||||||
if not _local_pdb_complete or _local_pdb_complete.is_set():
|
if not _local_pdb_complete or _local_pdb_complete.is_set():
|
||||||
_local_pdb_complete = trio.Event()
|
_local_pdb_complete = trio.Event()
|
||||||
|
|
||||||
|
@ -388,7 +400,10 @@ async def _breakpoint(
|
||||||
# cancel on this task start? I *think* this works below?
|
# cancel on this task start? I *think* this works below?
|
||||||
# actor._service_n.cancel_scope.shield = shield
|
# actor._service_n.cancel_scope.shield = shield
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
await actor._service_n.start(
|
||||||
|
wait_for_parent_stdin_hijack,
|
||||||
|
actor,
|
||||||
|
)
|
||||||
|
|
||||||
elif is_root_process():
|
elif is_root_process():
|
||||||
|
|
||||||
|
@ -529,7 +544,7 @@ async def _maybe_enter_pm(err):
|
||||||
|
|
||||||
async def maybe_wait_for_debugger() -> None:
|
async def maybe_wait_for_debugger() -> None:
|
||||||
|
|
||||||
global _no_remote_has_tty, _global_actor_in_debug
|
global _no_remote_has_tty, _global_actor_in_debug, _wait_all_tasks_lock
|
||||||
|
|
||||||
# If we error in the root but the debugger is
|
# If we error in the root but the debugger is
|
||||||
# engaged we don't want to prematurely kill (and
|
# engaged we don't want to prematurely kill (and
|
||||||
|
@ -538,7 +553,7 @@ async def maybe_wait_for_debugger() -> None:
|
||||||
# Instead try to wait for pdb to be released before
|
# Instead try to wait for pdb to be released before
|
||||||
# tearing down.
|
# tearing down.
|
||||||
if (
|
if (
|
||||||
_state.debug_mode() and
|
_state.debug_mode() or
|
||||||
is_root_process()
|
is_root_process()
|
||||||
):
|
):
|
||||||
|
|
||||||
|
@ -549,21 +564,23 @@ async def maybe_wait_for_debugger() -> None:
|
||||||
# await trio.testing.wait_all_tasks_blocked()
|
# await trio.testing.wait_all_tasks_blocked()
|
||||||
|
|
||||||
sub_in_debug = None
|
sub_in_debug = None
|
||||||
if _global_actor_in_debug:
|
|
||||||
sub_in_debug = tuple(_global_actor_in_debug)
|
|
||||||
|
|
||||||
for _ in range(2):
|
for _ in range(2):
|
||||||
|
|
||||||
|
if _global_actor_in_debug:
|
||||||
|
sub_in_debug = tuple(_global_actor_in_debug)
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
'Root polling for debug')
|
||||||
|
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
log.warning(
|
# TODO: could this make things more deterministic? wait
|
||||||
'Root polling for debug')
|
# to see if a sub-actor task will be scheduled and grab
|
||||||
await trio.sleep(0.01)
|
# the tty lock on the next tick?
|
||||||
|
# XXX: doesn't seem to work
|
||||||
# TODO: could this make things more deterministic?
|
# await trio.testing.wait_all_tasks_blocked(cushion=0)
|
||||||
# wait to see if a sub-actor task will be
|
|
||||||
# scheduled and grab the tty lock on the next
|
|
||||||
# tick?
|
|
||||||
# await trio.testing.wait_all_tasks_blocked()
|
|
||||||
|
|
||||||
debug_complete = _no_remote_has_tty
|
debug_complete = _no_remote_has_tty
|
||||||
if (
|
if (
|
||||||
|
@ -577,10 +594,10 @@ async def maybe_wait_for_debugger() -> None:
|
||||||
|
|
||||||
await debug_complete.wait()
|
await debug_complete.wait()
|
||||||
|
|
||||||
await trio.sleep(0.01)
|
await trio.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Root acquired DEBUGGER'
|
'Root acquired TTY LOCK'
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue