forked from goodboy/tractor
commit
14379a0f46
|
@ -0,0 +1,54 @@
|
||||||
|
'''
|
||||||
|
Fast fail test with a context.
|
||||||
|
|
||||||
|
Ensure the partially initialized sub-actor process
|
||||||
|
doesn't cause a hang on error/cancel of the parent
|
||||||
|
nursery.
|
||||||
|
|
||||||
|
'''
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def sleep(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
):
|
||||||
|
await trio.sleep(0.5)
|
||||||
|
await ctx.started()
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def open_ctx(
|
||||||
|
n: tractor._trionics.ActorNursery
|
||||||
|
):
|
||||||
|
|
||||||
|
# spawn both actors
|
||||||
|
portal = await n.start_actor(
|
||||||
|
name='sleeper',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
async with portal.open_context(
|
||||||
|
sleep,
|
||||||
|
) as (ctx, first):
|
||||||
|
assert first is None
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=True,
|
||||||
|
loglevel='runtime',
|
||||||
|
) as an:
|
||||||
|
|
||||||
|
async with trio.open_nursery() as n:
|
||||||
|
n.start_soon(open_ctx, an)
|
||||||
|
|
||||||
|
await trio.sleep(0.2)
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
assert 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
|
@ -0,0 +1,31 @@
|
||||||
|
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
async def key_error():
|
||||||
|
"Raise a ``NameError``"
|
||||||
|
return {}['doggy']
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""Root dies
|
||||||
|
|
||||||
|
"""
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=True,
|
||||||
|
loglevel='debug'
|
||||||
|
) as n:
|
||||||
|
|
||||||
|
# spawn both actors
|
||||||
|
portal = await n.run_in_actor(key_error)
|
||||||
|
|
||||||
|
# XXX: originally a bug caused by this is where root would enter
|
||||||
|
# the debugger and clobber the tty used by the repl even though
|
||||||
|
# child should have it locked.
|
||||||
|
with trio.fail_after(1):
|
||||||
|
await trio.Event().wait()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
|
@ -317,32 +317,58 @@ def test_multi_daemon_subactors(spawn, loglevel):
|
||||||
next_msg = name_error_msg
|
next_msg = name_error_msg
|
||||||
|
|
||||||
elif name_error_msg in before:
|
elif name_error_msg in before:
|
||||||
next_msg = None
|
next_msg = bp_forever_msg
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise ValueError("Neither log msg was found !?")
|
raise ValueError("Neither log msg was found !?")
|
||||||
|
|
||||||
child.sendline('c')
|
# NOTE: previously since we did not have clobber prevention
|
||||||
|
# in the root actor this final resume could result in the debugger
|
||||||
|
# tearing down since both child actors would be cancelled and it was
|
||||||
|
# unlikely that `bp_forever` would re-acquire the tty lock again.
|
||||||
|
# Now, we should have a final resumption in the root plus a possible
|
||||||
|
# second entry by `bp_forever`.
|
||||||
|
|
||||||
# first name_error failure
|
child.sendline('c')
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
|
|
||||||
if next_msg:
|
assert next_msg in before
|
||||||
assert next_msg in before
|
|
||||||
|
|
||||||
child.sendline('c')
|
# XXX: hooray the root clobbering the child here was fixed!
|
||||||
|
# IMO, this demonstrates the true power of SC system design.
|
||||||
|
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
# now the root actor won't clobber the bp_forever child
|
||||||
before = str(child.before.decode())
|
# during it's first access to the debug lock, but will instead
|
||||||
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
|
# wait for the lock to release, by the edge triggered
|
||||||
|
# ``_debug._no_remote_has_tty`` event before sending cancel messages
|
||||||
|
# (via portals) to its underlings B)
|
||||||
|
|
||||||
|
# at some point here there should have been some warning msg from
|
||||||
|
# the root announcing it avoided a clobber of the child's lock, but
|
||||||
|
# it seems unreliable in testing here to gnab it:
|
||||||
|
# assert "in use by child ('bp_forever'," in before
|
||||||
|
|
||||||
|
# wait for final error in root
|
||||||
|
while True:
|
||||||
|
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
before = str(child.before.decode())
|
||||||
|
try:
|
||||||
|
|
||||||
|
# root error should be packed as remote error
|
||||||
|
assert "_exceptions.RemoteActorError: ('name_error'" in before
|
||||||
|
break
|
||||||
|
|
||||||
|
except AssertionError:
|
||||||
|
assert bp_forever_msg in before
|
||||||
|
|
||||||
try:
|
try:
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
|
||||||
except pexpect.exceptions.TIMEOUT:
|
except pexpect.exceptions.TIMEOUT:
|
||||||
|
|
||||||
# Failed to exit using continue..?
|
# Failed to exit using continue..?
|
||||||
child.sendline('q')
|
child.sendline('q')
|
||||||
child.expect(pexpect.EOF)
|
child.expect(pexpect.EOF)
|
||||||
|
@ -386,9 +412,9 @@ def test_multi_subactors_root_errors(spawn):
|
||||||
def test_multi_nested_subactors_error_through_nurseries(spawn):
|
def test_multi_nested_subactors_error_through_nurseries(spawn):
|
||||||
"""Verify deeply nested actors that error trigger debugger entries
|
"""Verify deeply nested actors that error trigger debugger entries
|
||||||
at each actor nurserly (level) all the way up the tree.
|
at each actor nurserly (level) all the way up the tree.
|
||||||
"""
|
|
||||||
|
|
||||||
# NOTE: previously, inside this script was a a bug where if the
|
"""
|
||||||
|
# NOTE: previously, inside this script was a bug where if the
|
||||||
# parent errors before a 2-levels-lower actor has released the lock,
|
# parent errors before a 2-levels-lower actor has released the lock,
|
||||||
# the parent tries to cancel it but it's stuck in the debugger?
|
# the parent tries to cancel it but it's stuck in the debugger?
|
||||||
# A test (below) has now been added to explicitly verify this is
|
# A test (below) has now been added to explicitly verify this is
|
||||||
|
@ -396,9 +422,6 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
|
||||||
|
|
||||||
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
||||||
|
|
||||||
# startup time can be iffy
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
for i in range(12):
|
for i in range(12):
|
||||||
try:
|
try:
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
@ -471,7 +494,12 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
||||||
|
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
|
||||||
child.expect(pexpect.EOF)
|
while True:
|
||||||
|
try:
|
||||||
|
child.expect(pexpect.EOF)
|
||||||
|
break
|
||||||
|
except pexpect.exceptions.TIMEOUT:
|
||||||
|
print('child was able to grab tty lock again?')
|
||||||
|
|
||||||
if not timed_out_early:
|
if not timed_out_early:
|
||||||
|
|
||||||
|
@ -479,3 +507,21 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(
|
||||||
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
|
assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before
|
||||||
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
|
assert "tractor._exceptions.RemoteActorError: ('name_error'" in before
|
||||||
assert "NameError: name 'doggypants' is not defined" in before
|
assert "NameError: name 'doggypants' is not defined" in before
|
||||||
|
|
||||||
|
|
||||||
|
def test_root_cancels_child_context_during_startup(
|
||||||
|
spawn,
|
||||||
|
):
|
||||||
|
'''Verify a fast fail in the root doesn't lock up the child reaping
|
||||||
|
and all while using the new context api.
|
||||||
|
|
||||||
|
'''
|
||||||
|
child = spawn('fast_error_in_root_after_spawn')
|
||||||
|
|
||||||
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
||||||
|
before = str(child.before.decode())
|
||||||
|
assert "AssertionError" in before
|
||||||
|
|
||||||
|
child.sendline('c')
|
||||||
|
child.expect(pexpect.EOF)
|
||||||
|
|
|
@ -41,10 +41,6 @@ from . import _mp_fixup_main
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
|
||||||
|
|
||||||
class ActorFailure(Exception):
|
|
||||||
"General actor failure"
|
|
||||||
|
|
||||||
|
|
||||||
async def _invoke(
|
async def _invoke(
|
||||||
|
|
||||||
actor: 'Actor',
|
actor: 'Actor',
|
||||||
|
@ -56,8 +52,10 @@ async def _invoke(
|
||||||
Union[trio.CancelScope, BaseException]
|
Union[trio.CancelScope, BaseException]
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
):
|
):
|
||||||
"""Invoke local func and deliver result(s) over provided channel.
|
'''Invoke local func and deliver result(s) over provided channel.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
|
__tracebackhide__ = True
|
||||||
treat_as_gen = False
|
treat_as_gen = False
|
||||||
|
|
||||||
# possible a traceback (not sure what typing is for this..)
|
# possible a traceback (not sure what typing is for this..)
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
"""
|
"""
|
||||||
Multi-core debugging for da peeps!
|
Multi-core debugging for da peeps!
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import bdb
|
import bdb
|
||||||
import sys
|
import sys
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
|
from typing import Tuple, Optional, Callable, AsyncIterator
|
||||||
|
|
||||||
from async_generator import aclosing
|
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
@ -31,14 +31,22 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
__all__ = ['breakpoint', 'post_mortem']
|
__all__ = ['breakpoint', 'post_mortem']
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: wrap all these in a static global class: ``DebugLock`` maybe?
|
||||||
|
|
||||||
# placeholder for function to set a ``trio.Event`` on debugger exit
|
# placeholder for function to set a ``trio.Event`` on debugger exit
|
||||||
_pdb_release_hook: Optional[Callable] = None
|
_pdb_release_hook: Optional[Callable] = None
|
||||||
|
|
||||||
# actor-wide variable pointing to current task name using debugger
|
# actor-wide variable pointing to current task name using debugger
|
||||||
_in_debug = False
|
_local_task_in_debug: Optional[str] = None
|
||||||
|
|
||||||
|
# actor tree-wide actor uid that supposedly has the tty lock
|
||||||
|
_global_actor_in_debug: Optional[Tuple[str, str]] = None
|
||||||
|
|
||||||
# lock in root actor preventing multi-access to local tty
|
# lock in root actor preventing multi-access to local tty
|
||||||
_debug_lock = trio.StrictFIFOLock()
|
_debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
|
||||||
|
_local_pdb_complete: Optional[trio.Event] = None
|
||||||
|
_no_remote_has_tty: Optional[trio.Event] = None
|
||||||
|
|
||||||
# XXX: set by the current task waiting on the root tty lock
|
# XXX: set by the current task waiting on the root tty lock
|
||||||
# and must be cancelled if this actor is cancelled via message
|
# and must be cancelled if this actor is cancelled via message
|
||||||
|
@ -58,22 +66,22 @@ class PdbwTeardown(pdbpp.Pdb):
|
||||||
# override the pdbpp config with our coolio one
|
# override the pdbpp config with our coolio one
|
||||||
DefaultConfig = TractorConfig
|
DefaultConfig = TractorConfig
|
||||||
|
|
||||||
# TODO: figure out how to dissallow recursive .set_trace() entry
|
# TODO: figure out how to disallow recursive .set_trace() entry
|
||||||
# since that'll cause deadlock for us.
|
# since that'll cause deadlock for us.
|
||||||
def set_continue(self):
|
def set_continue(self):
|
||||||
global _in_debug
|
|
||||||
try:
|
try:
|
||||||
super().set_continue()
|
super().set_continue()
|
||||||
finally:
|
finally:
|
||||||
_in_debug = False
|
global _local_task_in_debug
|
||||||
|
_local_task_in_debug = None
|
||||||
_pdb_release_hook()
|
_pdb_release_hook()
|
||||||
|
|
||||||
def set_quit(self):
|
def set_quit(self):
|
||||||
global _in_debug
|
|
||||||
try:
|
try:
|
||||||
super().set_quit()
|
super().set_quit()
|
||||||
finally:
|
finally:
|
||||||
_in_debug = False
|
global _local_task_in_debug
|
||||||
|
_local_task_in_debug = None
|
||||||
_pdb_release_hook()
|
_pdb_release_hook()
|
||||||
|
|
||||||
|
|
||||||
|
@ -115,147 +123,283 @@ class PdbwTeardown(pdbpp.Pdb):
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
async def _acquire_debug_lock(
|
||||||
"""Acquire a actor local FIFO lock meant to mutex entry to a local
|
uid: Tuple[str, str]
|
||||||
debugger entry point to avoid tty clobbering by multiple processes.
|
|
||||||
"""
|
) -> AsyncIterator[trio.StrictFIFOLock]:
|
||||||
|
'''Acquire a root-actor local FIFO lock which tracks mutex access of
|
||||||
|
the process tree's global debugger breakpoint.
|
||||||
|
|
||||||
|
This lock avoids tty clobbering (by preventing multiple processes
|
||||||
|
reading from stdstreams) and ensures multi-actor, sequential access
|
||||||
|
to the ``pdb`` repl.
|
||||||
|
|
||||||
|
'''
|
||||||
|
global _debug_lock, _global_actor_in_debug, _no_remote_has_tty
|
||||||
|
|
||||||
task_name = trio.lowlevel.current_task().name
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
|
log.pdb(
|
||||||
|
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
|
||||||
|
)
|
||||||
|
|
||||||
|
we_acquired = False
|
||||||
|
|
||||||
|
if _no_remote_has_tty is None:
|
||||||
|
# mark the tty lock as being in use so that the runtime
|
||||||
|
# can try to avoid clobbering any connection from a child
|
||||||
|
# that's currently relying on it.
|
||||||
|
_no_remote_has_tty = trio.Event()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}")
|
f"entering lock checkpoint, remote task: {task_name}:{uid}"
|
||||||
|
)
|
||||||
|
we_acquired = True
|
||||||
await _debug_lock.acquire()
|
await _debug_lock.acquire()
|
||||||
|
|
||||||
|
_global_actor_in_debug = uid
|
||||||
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
|
||||||
yield
|
|
||||||
|
# NOTE: critical section: this yield is unshielded!
|
||||||
|
|
||||||
|
# IF we received a cancel during the shielded lock entry of some
|
||||||
|
# next-in-queue requesting task, then the resumption here will
|
||||||
|
# result in that ``trio.Cancelled`` being raised to our caller
|
||||||
|
# (likely from ``_hijack_stdin_for_child()`` below)! In
|
||||||
|
# this case the ``finally:`` below should trigger and the
|
||||||
|
# surrounding caller side context should cancel normally
|
||||||
|
# relaying back to the caller.
|
||||||
|
|
||||||
|
yield _debug_lock
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
_debug_lock.release()
|
# if _global_actor_in_debug == uid:
|
||||||
|
if we_acquired and _debug_lock.locked():
|
||||||
|
_debug_lock.release()
|
||||||
|
|
||||||
|
# IFF there are no more requesting tasks queued up fire, the
|
||||||
|
# "tty-unlocked" event thereby alerting any monitors of the lock that
|
||||||
|
# we are now back in the "tty unlocked" state. This is basically
|
||||||
|
# and edge triggered signal around an empty queue of sub-actor
|
||||||
|
# tasks that may have tried to acquire the lock.
|
||||||
|
stats = _debug_lock.statistics()
|
||||||
|
if (
|
||||||
|
not stats.owner
|
||||||
|
):
|
||||||
|
log.pdb(f"No more tasks waiting on tty lock! says {uid}")
|
||||||
|
_no_remote_has_tty.set()
|
||||||
|
_no_remote_has_tty = None
|
||||||
|
|
||||||
|
_global_actor_in_debug = None
|
||||||
|
|
||||||
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
|
||||||
|
|
||||||
|
|
||||||
# @contextmanager
|
@tractor.context
|
||||||
# def _disable_sigint():
|
async def _hijack_stdin_for_child(
|
||||||
# try:
|
|
||||||
# # disable sigint handling while in debug
|
|
||||||
# prior_handler = signal.signal(signal.SIGINT, handler)
|
|
||||||
# yield
|
|
||||||
# finally:
|
|
||||||
# # restore SIGINT handling
|
|
||||||
# signal.signal(signal.SIGINT, prior_handler)
|
|
||||||
|
|
||||||
|
ctx: tractor.Context,
|
||||||
async def _hijack_stdin_relay_to_child(
|
|
||||||
subactor_uid: Tuple[str, str]
|
subactor_uid: Tuple[str, str]
|
||||||
) -> AsyncIterator[str]:
|
|
||||||
|
) -> str:
|
||||||
|
'''Hijack the tty in the root process of an actor tree such that
|
||||||
|
the pdbpp debugger console can be allocated to a sub-actor for repl
|
||||||
|
bossing.
|
||||||
|
|
||||||
|
'''
|
||||||
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
# TODO: when we get to true remote debugging
|
# TODO: when we get to true remote debugging
|
||||||
# this will deliver stdin data
|
# this will deliver stdin data?
|
||||||
log.warning(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
|
||||||
async with _acquire_debug_lock(subactor_uid):
|
|
||||||
log.warning(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
|
||||||
|
|
||||||
# with _disable_sigint():
|
log.debug(
|
||||||
|
"Attempting to acquire TTY lock\n"
|
||||||
|
f"remote task: {task_name}:{subactor_uid}"
|
||||||
|
)
|
||||||
|
|
||||||
# indicate to child that we've locked stdio
|
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
|
||||||
yield 'Locked'
|
|
||||||
|
|
||||||
# wait for cancellation of stream by child
|
with trio.CancelScope(shield=True):
|
||||||
# indicating debugger is dis-engaged
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
log.debug(f"Actor {subactor_uid} RELEASED stdin hijack lock")
|
async with _acquire_debug_lock(subactor_uid):
|
||||||
|
|
||||||
|
# indicate to child that we've locked stdio
|
||||||
|
await ctx.started('Locked')
|
||||||
|
log.pdb(f"Actor {subactor_uid} ACQUIRED stdin hijack lock")
|
||||||
|
|
||||||
|
# wait for unlock pdb by child
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
try:
|
||||||
|
assert await stream.receive() == 'pdb_unlock'
|
||||||
|
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
# XXX: there may be a race with the portal teardown
|
||||||
|
# with the calling actor which we can safely ignore.
|
||||||
|
# The alternative would be sending an ack message
|
||||||
|
# and allowing the client to wait for us to teardown
|
||||||
|
# first?
|
||||||
|
pass
|
||||||
|
|
||||||
|
log.debug(f"TTY lock released, remote task: {task_name}:{subactor_uid}")
|
||||||
|
|
||||||
|
return "pdb_unlock_complete"
|
||||||
|
|
||||||
|
|
||||||
# XXX: We only make this sync in case someone wants to
|
async def _breakpoint(
|
||||||
# overload the ``breakpoint()`` built-in.
|
|
||||||
def _breakpoint(debug_func) -> Awaitable[None]:
|
debug_func,
|
||||||
"""``tractor`` breakpoint entry for engaging pdb machinery
|
|
||||||
in subactors.
|
# TODO:
|
||||||
"""
|
# shield: bool = False
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''``tractor`` breakpoint entry for engaging pdb machinery
|
||||||
|
in the root or a subactor.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# TODO: is it possible to debug a trio.Cancelled except block?
|
||||||
|
# right now it seems like we can kinda do with by shielding
|
||||||
|
# around ``tractor.breakpoint()`` but not if we move the shielded
|
||||||
|
# scope here???
|
||||||
|
# with trio.CancelScope(shield=shield):
|
||||||
|
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
do_unlock = trio.Event()
|
task_name = trio.lowlevel.current_task().name
|
||||||
|
|
||||||
|
global _local_pdb_complete, _pdb_release_hook
|
||||||
|
global _local_task_in_debug, _global_actor_in_debug
|
||||||
|
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
async def wait_for_parent_stdin_hijack(
|
async def wait_for_parent_stdin_hijack(
|
||||||
task_status=trio.TASK_STATUS_IGNORED
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
):
|
):
|
||||||
global _debugger_request_cs
|
global _debugger_request_cs
|
||||||
with trio.CancelScope() as cs:
|
|
||||||
|
with trio.CancelScope(shield=True) as cs:
|
||||||
_debugger_request_cs = cs
|
_debugger_request_cs = cs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with get_root() as portal:
|
async with get_root() as portal:
|
||||||
async with portal.open_stream_from(
|
|
||||||
tractor._debug._hijack_stdin_relay_to_child,
|
|
||||||
subactor_uid=actor.uid,
|
|
||||||
) as stream:
|
|
||||||
|
|
||||||
# block until first yield above
|
log.error('got portal')
|
||||||
async for val in stream:
|
|
||||||
|
|
||||||
assert val == 'Locked'
|
# this syncs to child's ``Context.started()`` call.
|
||||||
task_status.started()
|
async with portal.open_context(
|
||||||
|
|
||||||
# with trio.CancelScope(shield=True):
|
tractor._debug._hijack_stdin_for_child,
|
||||||
await do_unlock.wait()
|
subactor_uid=actor.uid,
|
||||||
|
|
||||||
|
) as (ctx, val):
|
||||||
|
|
||||||
|
log.error('locked context')
|
||||||
|
assert val == 'Locked'
|
||||||
|
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
log.error('opened stream')
|
||||||
|
# unblock local caller
|
||||||
|
task_status.started()
|
||||||
|
|
||||||
|
try:
|
||||||
|
await _local_pdb_complete.wait()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# TODO: shielding currently can cause hangs...
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await stream.send('pdb_unlock')
|
||||||
|
|
||||||
|
# sync with callee termination
|
||||||
|
assert await ctx.result() == "pdb_unlock_complete"
|
||||||
|
|
||||||
|
except tractor.ContextCancelled:
|
||||||
|
log.warning('Root actor cancelled debug lock')
|
||||||
|
|
||||||
# trigger cancellation of remote stream
|
|
||||||
break
|
|
||||||
finally:
|
finally:
|
||||||
log.debug(f"Exiting debugger for actor {actor}")
|
log.debug(f"Exiting debugger for actor {actor}")
|
||||||
global _in_debug
|
global _local_task_in_debug
|
||||||
_in_debug = False
|
_local_task_in_debug = None
|
||||||
log.debug(f"Child {actor} released parent stdio lock")
|
log.debug(f"Child {actor} released parent stdio lock")
|
||||||
|
|
||||||
async def _bp():
|
if not _local_pdb_complete or _local_pdb_complete.is_set():
|
||||||
"""Async breakpoint which schedules a parent stdio lock, and once complete
|
_local_pdb_complete = trio.Event()
|
||||||
enters the ``pdbpp`` debugging console.
|
|
||||||
"""
|
|
||||||
task_name = trio.lowlevel.current_task().name
|
|
||||||
|
|
||||||
global _in_debug
|
# TODO: need a more robust check for the "root" actor
|
||||||
|
if actor._parent_chan and not is_root_process():
|
||||||
|
|
||||||
# TODO: need a more robust check for the "root" actor
|
if _local_task_in_debug:
|
||||||
if actor._parent_chan and not is_root_process():
|
if _local_task_in_debug == task_name:
|
||||||
if _in_debug:
|
# this task already has the lock and is
|
||||||
if _in_debug == task_name:
|
# likely recurrently entering a breakpoint
|
||||||
# this task already has the lock and is
|
return
|
||||||
# likely recurrently entering a breakpoint
|
|
||||||
return
|
|
||||||
|
|
||||||
# if **this** actor is already in debug mode block here
|
# if **this** actor is already in debug mode block here
|
||||||
# waiting for the control to be released - this allows
|
# waiting for the control to be released - this allows
|
||||||
# support for recursive entries to `tractor.breakpoint()`
|
# support for recursive entries to `tractor.breakpoint()`
|
||||||
log.warning(
|
log.warning(f"{actor.uid} already has a debug lock, waiting...")
|
||||||
f"Actor {actor.uid} already has a debug lock, waiting...")
|
|
||||||
await do_unlock.wait()
|
|
||||||
await trio.sleep(0.1)
|
|
||||||
|
|
||||||
# assign unlock callback for debugger teardown hooks
|
await _local_pdb_complete.wait()
|
||||||
global _pdb_release_hook
|
await trio.sleep(0.1)
|
||||||
_pdb_release_hook = do_unlock.set
|
|
||||||
|
|
||||||
# mark local actor as "in debug mode" to avoid recurrent
|
# mark local actor as "in debug mode" to avoid recurrent
|
||||||
# entries/requests to the root process
|
# entries/requests to the root process
|
||||||
_in_debug = task_name
|
_local_task_in_debug = task_name
|
||||||
|
|
||||||
# this **must** be awaited by the caller and is done using the
|
# assign unlock callback for debugger teardown hooks
|
||||||
# root nursery so that the debugger can continue to run without
|
_pdb_release_hook = _local_pdb_complete.set
|
||||||
# being restricted by the scope of a new task nursery.
|
|
||||||
|
# this **must** be awaited by the caller and is done using the
|
||||||
|
# root nursery so that the debugger can continue to run without
|
||||||
|
# being restricted by the scope of a new task nursery.
|
||||||
|
|
||||||
|
# NOTE: if we want to debug a trio.Cancelled triggered exception
|
||||||
|
# we have to figure out how to avoid having the service nursery
|
||||||
|
# cancel on this task start? I *think* this works below?
|
||||||
|
# actor._service_n.cancel_scope.shield = shield
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
await actor._service_n.start(wait_for_parent_stdin_hijack)
|
||||||
|
|
||||||
elif is_root_process():
|
elif is_root_process():
|
||||||
# we also wait in the root-parent for any child that
|
|
||||||
# may have the tty locked prior
|
|
||||||
if _debug_lock.locked(): # root process already has it; ignore
|
|
||||||
return
|
|
||||||
await _debug_lock.acquire()
|
|
||||||
_pdb_release_hook = _debug_lock.release
|
|
||||||
|
|
||||||
# block here one (at the appropriate frame *up* where
|
# we also wait in the root-parent for any child that
|
||||||
# ``breakpoint()`` was awaited and begin handling stdio
|
# may have the tty locked prior
|
||||||
log.debug("Entering the synchronous world of pdb")
|
global _debug_lock
|
||||||
debug_func(actor)
|
|
||||||
|
|
||||||
# user code **must** await this!
|
# TODO: wait, what about multiple root tasks acquiring it though?
|
||||||
return _bp()
|
# root process (us) already has it; ignore
|
||||||
|
if _global_actor_in_debug == actor.uid:
|
||||||
|
return
|
||||||
|
|
||||||
|
# XXX: since we need to enter pdb synchronously below,
|
||||||
|
# we have to release the lock manually from pdb completion
|
||||||
|
# callbacks. Can't think of a nicer way then this atm.
|
||||||
|
if _debug_lock.locked():
|
||||||
|
log.warning(
|
||||||
|
'Root actor attempting to acquire active tty lock'
|
||||||
|
f' owned by {_global_actor_in_debug}')
|
||||||
|
|
||||||
|
await _debug_lock.acquire()
|
||||||
|
|
||||||
|
_global_actor_in_debug = actor.uid
|
||||||
|
_local_task_in_debug = task_name
|
||||||
|
|
||||||
|
# the lock must be released on pdb completion
|
||||||
|
def teardown():
|
||||||
|
global _local_pdb_complete, _debug_lock
|
||||||
|
global _global_actor_in_debug, _local_task_in_debug
|
||||||
|
|
||||||
|
_debug_lock.release()
|
||||||
|
_global_actor_in_debug = None
|
||||||
|
_local_task_in_debug = None
|
||||||
|
_local_pdb_complete.set()
|
||||||
|
|
||||||
|
_pdb_release_hook = teardown
|
||||||
|
|
||||||
|
# block here one (at the appropriate frame *up*) where
|
||||||
|
# ``breakpoint()`` was awaited and begin handling stdio.
|
||||||
|
log.debug("Entering the synchronous world of pdb")
|
||||||
|
debug_func(actor)
|
||||||
|
|
||||||
|
|
||||||
def _mk_pdb():
|
def _mk_pdb():
|
||||||
|
@ -285,8 +429,8 @@ def _set_trace(actor=None):
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# we entered the global ``breakpoint()`` built-in from sync code
|
# we entered the global ``breakpoint()`` built-in from sync code
|
||||||
global _in_debug, _pdb_release_hook
|
global _local_task_in_debug, _pdb_release_hook
|
||||||
_in_debug = 'sync'
|
_local_task_in_debug = 'sync'
|
||||||
|
|
||||||
def nuttin():
|
def nuttin():
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -12,6 +12,10 @@ import trio
|
||||||
_this_mod = importlib.import_module(__name__)
|
_this_mod = importlib.import_module(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ActorFailure(Exception):
|
||||||
|
"General actor failure"
|
||||||
|
|
||||||
|
|
||||||
class RemoteActorError(Exception):
|
class RemoteActorError(Exception):
|
||||||
# TODO: local recontruction of remote exception deats
|
# TODO: local recontruction of remote exception deats
|
||||||
"Remote actor exception bundled locally"
|
"Remote actor exception bundled locally"
|
||||||
|
@ -40,6 +44,7 @@ class InternalActorError(RemoteActorError):
|
||||||
class TransportClosed(trio.ClosedResourceError):
|
class TransportClosed(trio.ClosedResourceError):
|
||||||
"Underlying channel transport was closed prior to use"
|
"Underlying channel transport was closed prior to use"
|
||||||
|
|
||||||
|
|
||||||
class ContextCancelled(RemoteActorError):
|
class ContextCancelled(RemoteActorError):
|
||||||
"Inter-actor task context cancelled itself on the callee side."
|
"Inter-actor task context cancelled itself on the callee side."
|
||||||
|
|
||||||
|
@ -58,7 +63,7 @@ class NoRuntime(RuntimeError):
|
||||||
|
|
||||||
def pack_error(
|
def pack_error(
|
||||||
exc: BaseException,
|
exc: BaseException,
|
||||||
tb = None,
|
tb=None,
|
||||||
|
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""Create an "error message" for tranmission over
|
"""Create an "error message" for tranmission over
|
||||||
|
|
|
@ -174,8 +174,11 @@ async def open_root_actor(
|
||||||
yield actor
|
yield actor
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
logger.exception("Actor crashed:")
|
|
||||||
await _debug._maybe_enter_pm(err)
|
entered = await _debug._maybe_enter_pm(err)
|
||||||
|
|
||||||
|
if not entered:
|
||||||
|
logger.exception("Root actor crashed:")
|
||||||
|
|
||||||
# always re-raise
|
# always re-raise
|
||||||
raise
|
raise
|
||||||
|
|
|
@ -26,10 +26,12 @@ from ._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
is_main_process,
|
is_main_process,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._actor import Actor, ActorFailure
|
from ._actor import Actor
|
||||||
from ._entry import _mp_main
|
from ._entry import _mp_main
|
||||||
|
from ._exceptions import ActorFailure
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
@ -144,7 +146,7 @@ async def cancel_on_completion(
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.info(
|
log.runtime(
|
||||||
f"Cancelling {portal.channel.uid} gracefully "
|
f"Cancelling {portal.channel.uid} gracefully "
|
||||||
f"after result {result}")
|
f"after result {result}")
|
||||||
|
|
||||||
|
@ -206,12 +208,12 @@ async def spawn_subactor(
|
||||||
yield proc
|
yield proc
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
log.runtime(f"Attempting to kill {proc}")
|
||||||
|
|
||||||
# XXX: do this **after** cancellation/tearfown
|
# XXX: do this **after** cancellation/tearfown
|
||||||
# to avoid killing the process too early
|
# to avoid killing the process too early
|
||||||
# since trio does this internally on ``__aexit__()``
|
# since trio does this internally on ``__aexit__()``
|
||||||
|
|
||||||
log.debug(f"Attempting to kill {proc}")
|
|
||||||
await do_hard_kill(proc)
|
await do_hard_kill(proc)
|
||||||
|
|
||||||
|
|
||||||
|
@ -241,7 +243,7 @@ async def new_proc(
|
||||||
subactor,
|
subactor,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
) as proc:
|
) as proc:
|
||||||
log.info(f"Started {proc}")
|
log.runtime(f"Started {proc}")
|
||||||
|
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
# channel should have handshake completed by the
|
# channel should have handshake completed by the
|
||||||
|
@ -396,7 +398,7 @@ async def mp_new_proc(
|
||||||
if not proc.is_alive():
|
if not proc.is_alive():
|
||||||
raise ActorFailure("Couldn't start sub-actor?")
|
raise ActorFailure("Couldn't start sub-actor?")
|
||||||
|
|
||||||
log.info(f"Started {proc}")
|
log.runtime(f"Started {proc}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
|
|
|
@ -11,7 +11,8 @@ import warnings
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
from ._state import current_actor, is_main_process
|
from . import _debug
|
||||||
|
from ._state import current_actor, is_main_process, is_root_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
@ -169,16 +170,25 @@ class ActorNursery:
|
||||||
|
|
||||||
log.warning(f"Cancelling nursery in {self._actor.uid}")
|
log.warning(f"Cancelling nursery in {self._actor.uid}")
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
|
|
||||||
for subactor, proc, portal in self._children.values():
|
for subactor, proc, portal in self._children.values():
|
||||||
|
|
||||||
|
# TODO: are we ever even going to use this or
|
||||||
|
# is the spawning backend responsible for such
|
||||||
|
# things? I'm thinking latter.
|
||||||
if hard_kill:
|
if hard_kill:
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if portal is None: # actor hasn't fully spawned yet
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
event = self._actor._peer_connected[subactor.uid]
|
event = self._actor._peer_connected[subactor.uid]
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{subactor.uid} wasn't finished spawning?")
|
f"{subactor.uid} wasn't finished spawning?")
|
||||||
|
|
||||||
await event.wait()
|
await event.wait()
|
||||||
|
|
||||||
# channel/portal should now be up
|
# channel/portal should now be up
|
||||||
_, _, portal = self._children[subactor.uid]
|
_, _, portal = self._children[subactor.uid]
|
||||||
|
|
||||||
|
@ -238,6 +248,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# As such if the strategy propagates any error(s) upwards
|
# As such if the strategy propagates any error(s) upwards
|
||||||
# the above "daemon actor" nursery will be notified.
|
# the above "daemon actor" nursery will be notified.
|
||||||
async with trio.open_nursery() as ria_nursery:
|
async with trio.open_nursery() as ria_nursery:
|
||||||
|
|
||||||
anursery = ActorNursery(
|
anursery = ActorNursery(
|
||||||
actor,
|
actor,
|
||||||
ria_nursery,
|
ria_nursery,
|
||||||
|
@ -248,21 +259,54 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# spawning of actors happens in the caller's scope
|
# spawning of actors happens in the caller's scope
|
||||||
# after we yield upwards
|
# after we yield upwards
|
||||||
yield anursery
|
yield anursery
|
||||||
log.debug(
|
|
||||||
|
log.runtime(
|
||||||
f"Waiting on subactors {anursery._children} "
|
f"Waiting on subactors {anursery._children} "
|
||||||
"to complete"
|
"to complete"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Last bit before first nursery block ends in the case
|
# Last bit before first nursery block ends in the case
|
||||||
# where we didn't error in the caller's scope
|
# where we didn't error in the caller's scope
|
||||||
log.debug("Waiting on all subactors to complete")
|
|
||||||
|
# signal all process monitor tasks to conduct
|
||||||
|
# hard join phase.
|
||||||
anursery._join_procs.set()
|
anursery._join_procs.set()
|
||||||
|
|
||||||
except BaseException as err:
|
except BaseException as err:
|
||||||
|
|
||||||
|
# If we error in the root but the debugger is
|
||||||
|
# engaged we don't want to prematurely kill (and
|
||||||
|
# thus clobber access to) the local tty since it
|
||||||
|
# will make the pdb repl unusable.
|
||||||
|
# Instead try to wait for pdb to be released before
|
||||||
|
# tearing down.
|
||||||
|
if is_root_process():
|
||||||
|
log.exception(f"we're root with {err}")
|
||||||
|
|
||||||
|
# TODO: could this make things more deterministic?
|
||||||
|
# wait to see if a sub-actor task will be
|
||||||
|
# scheduled and grab the tty lock on the next
|
||||||
|
# tick?
|
||||||
|
# await trio.testing.wait_all_tasks_blocked()
|
||||||
|
|
||||||
|
debug_complete = _debug._no_remote_has_tty
|
||||||
|
if (
|
||||||
|
debug_complete and
|
||||||
|
not debug_complete.is_set()
|
||||||
|
):
|
||||||
|
log.warning(
|
||||||
|
'Root has errored but pdb is in use by '
|
||||||
|
f'child {_debug._global_actor_in_debug}\n'
|
||||||
|
'Waiting on tty lock to release..')
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await debug_complete.wait()
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# if the caller's scope errored then we activate our
|
||||||
# one-cancels-all supervisor strategy (don't
|
# one-cancels-all supervisor strategy (don't
|
||||||
# worry more are coming).
|
# worry more are coming).
|
||||||
anursery._join_procs.set()
|
anursery._join_procs.set()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# XXX: hypothetically an error could be
|
# XXX: hypothetically an error could be
|
||||||
# raised and then a cancel signal shows up
|
# raised and then a cancel signal shows up
|
||||||
|
@ -301,7 +345,15 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# ria_nursery scope end
|
# ria_nursery scope end
|
||||||
|
|
||||||
# XXX: do we need a `trio.Cancelled` catch here as well?
|
# XXX: do we need a `trio.Cancelled` catch here as well?
|
||||||
except (Exception, trio.MultiError, trio.Cancelled) as err:
|
# this is the catch around the ``.run_in_actor()`` nursery
|
||||||
|
except (
|
||||||
|
|
||||||
|
Exception,
|
||||||
|
trio.MultiError,
|
||||||
|
trio.Cancelled
|
||||||
|
|
||||||
|
) as err:
|
||||||
|
|
||||||
# If actor-local error was raised while waiting on
|
# If actor-local error was raised while waiting on
|
||||||
# ".run_in_actor()" actors then we also want to cancel all
|
# ".run_in_actor()" actors then we also want to cancel all
|
||||||
# remaining sub-actors (due to our lone strategy:
|
# remaining sub-actors (due to our lone strategy:
|
||||||
|
@ -368,6 +420,7 @@ async def open_nursery(
|
||||||
async with open_root_actor(**kwargs) as actor:
|
async with open_root_actor(**kwargs) as actor:
|
||||||
assert actor is current_actor()
|
assert actor is current_actor()
|
||||||
|
|
||||||
|
# try:
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
actor
|
actor
|
||||||
) as anursery:
|
) as anursery:
|
||||||
|
|
Loading…
Reference in New Issue