Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 7dd72e042d Show full KBI trace for help with CI hangs 2022-07-27 11:38:13 -04:00
Tyler Goodlet ee8ead4d7a Move pydantic-click hang example to new dir, skip in test suite 2022-07-27 11:38:13 -04:00
Tyler Goodlet 8a70a52ff9 Add spaces before values in log msg 2022-07-27 11:38:06 -04:00
Tyler Goodlet 70e4458fb0 Add runtime level msg around channel draining 2022-07-27 11:38:06 -04:00
Tyler Goodlet 70d1c98c10 Always undo SIGINT overrides , cancel detached children
Ensure that even when `pdb` resumption methods are called during a crash
where `trio`'s runtime has already terminated (eg. `Event.set()` will
raise) we always revert our sigint handler to the original. Further
inside the handler if we hit a case where a child is in debug and
(thinks it) has the global pdb lock, if it has no IPC connection to
a parent, simply presume tty sync-coordination is now lost and cancel
the child immediately.
2022-07-27 11:38:06 -04:00
Tyler Goodlet dade6a4b43 Readme formatting tweaks 2022-07-27 11:38:06 -04:00
Tyler Goodlet 7b4049198a Tolerate double `.remove()`s of stream on portal teardowns 2022-07-27 11:37:59 -04:00
Tyler Goodlet 931b20cf35 Always propagate SIGINT when no locking peer found
A hopefully significant fix here is to always avoid suppressing a SIGINT
when the root actor can not detect an active IPC connections (via
a connected channel) to the supposed debug lock holding actor. In that
case it is most likely that the actor has either terminated or has lost
its connection for debugger control and there is no way the root can
verify the lock is in use; thus we choose to allow KBI cancellation.

Drop the (by comment) `try`-`finally` block in
`_hijoack_stdin_for_child()` around the `_acquire_debug_lock()` call
since all that logic should now be handled internal to that locking
manager. Try to catch a weird error around the `.do_longlist()` method
call that seems to sometimes break on py3.10 and latest `pdbpp`.
2022-07-27 11:37:59 -04:00
Tyler Goodlet 11c1582c39 Always call pdb hook even if tty locking fails 2022-07-27 11:37:59 -04:00
Tyler Goodlet d1f347c21f Log cancels with appropriate level 2022-07-27 11:37:59 -04:00
9 changed files with 182 additions and 94 deletions

View File

@ -111,4 +111,4 @@ jobs:
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs --full-trace

View File

@ -3,13 +3,14 @@
|gh_actions| |gh_actions|
|docs| |docs|
``tractor`` is a `structured concurrent`_, multi-processing_ runtime built on trio_. ``tractor`` is a `structured concurrent`_, multi-processing_ runtime
built on trio_.
Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*": Fundamentally ``tractor`` gives you parallelism via ``trio``-"*actors*":
our nurseries_ let you spawn new Python processes which each run a ``trio`` our nurseries_ let you spawn new Python processes which each run a ``trio``
scheduled runtime - a call to ``trio.run()``. scheduled runtime - a call to ``trio.run()``.
We believe the system adhere's to the `3 axioms`_ of an "`actor model`_" We believe the system adheres to the `3 axioms`_ of an "`actor model`_"
but likely *does not* look like what *you* probably think an "actor but likely *does not* look like what *you* probably think an "actor
model" looks like, and that's *intentional*. model" looks like, and that's *intentional*.
@ -577,13 +578,13 @@ say hi, please feel free to reach us in our `matrix channel`_. If
matrix seems too hip, we're also mostly all in the the `trio gitter matrix seems too hip, we're also mostly all in the the `trio gitter
channel`_! channel`_!
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trio: https://github.com/python-trio/trio
.. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements .. _nurseries: https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/#nurseries-a-structured-replacement-for-go-statements
.. _actor model: https://en.wikipedia.org/wiki/Actor_model .. _actor model: https://en.wikipedia.org/wiki/Actor_model
.. _trio: https://github.com/python-trio/trio
.. _multi-processing: https://en.wikipedia.org/wiki/Multiprocessing
.. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles .. _trionic: https://trio.readthedocs.io/en/latest/design.html#high-level-design-principles
.. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich .. _async sandwich: https://trio.readthedocs.io/en/latest/tutorial.html#async-sandwich
.. _structured concurrent: https://trio.discourse.group/t/concise-definition-of-structured-concurrency/228
.. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s .. _3 axioms: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=162s
.. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts .. .. _3 axioms: https://en.wikipedia.org/wiki/Actor_model#Fundamental_concepts
.. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s .. _adherance to: https://www.youtube.com/watch?v=7erJ1DV_Tlo&t=1821s

View File

@ -81,11 +81,14 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
'example_script', 'example_script',
# walk yields: (dirpath, dirnames, filenames) # walk yields: (dirpath, dirnames, filenames)
[(p[0], f) for p in os.walk(examples_dir()) for f in p[2] [
(p[0], f) for p in os.walk(examples_dir()) for f in p[2]
if '__' not in f if '__' not in f
and f[0] != '_' and f[0] != '_'
and 'debugging' not in p[0]], and 'debugging' not in p[0]
and 'integration' not in p[0]
],
ids=lambda t: t[1], ids=lambda t: t[1],
) )

View File

@ -611,7 +611,8 @@ class Actor:
entry = local_nursery._children.get(uid) entry = local_nursery._children.get(uid)
if entry: if entry:
_, proc, _ = entry _, proc, _ = entry
log.warning(f'Actor {uid}@{proc} IPC connection broke!?') log.warning(
f'Actor {uid}@{proc} IPC connection broke!?')
# if proc.poll() is not None: # if proc.poll() is not None:
# log.error('Actor {uid} proc died and IPC broke?') # log.error('Actor {uid} proc died and IPC broke?')
@ -630,6 +631,11 @@ class Actor:
# Attempt to wait for the far end to close the channel # Attempt to wait for the far end to close the channel
# and bail after timeout (2-generals on closure). # and bail after timeout (2-generals on closure).
assert chan.msgstream assert chan.msgstream
log.runtime(
f'Draining lingering msgs from stream {chan.msgstream}'
)
async for msg in chan.msgstream.drain(): async for msg in chan.msgstream.drain():
# try to deliver any lingering msgs # try to deliver any lingering msgs
# before we destroy the channel. # before we destroy the channel.

View File

@ -41,6 +41,7 @@ from .log import get_logger
from ._discovery import get_root from ._discovery import get_root
from ._state import is_root_process, debug_mode from ._state import is_root_process, debug_mode
from ._exceptions import is_multi_cancelled from ._exceptions import is_multi_cancelled
from ._ipc import Channel
try: try:
@ -178,12 +179,6 @@ async def _acquire_debug_lock(
we_acquired = False we_acquired = False
if _no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
_no_remote_has_tty = trio.Event()
try: try:
log.runtime( log.runtime(
f"entering lock checkpoint, remote task: {task_name}:{uid}" f"entering lock checkpoint, remote task: {task_name}:{uid}"
@ -191,6 +186,12 @@ async def _acquire_debug_lock(
we_acquired = True we_acquired = True
await _debug_lock.acquire() await _debug_lock.acquire()
if _no_remote_has_tty is None:
# mark the tty lock as being in use so that the runtime
# can try to avoid clobbering any connection from a child
# that's currently relying on it.
_no_remote_has_tty = trio.Event()
_global_actor_in_debug = uid _global_actor_in_debug = uid
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}") log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
@ -208,6 +209,7 @@ async def _acquire_debug_lock(
finally: finally:
# if _global_actor_in_debug == uid: # if _global_actor_in_debug == uid:
if ( if (
we_acquired we_acquired
and _debug_lock.locked() and _debug_lock.locked()
@ -224,12 +226,15 @@ async def _acquire_debug_lock(
not stats.owner not stats.owner
): ):
log.runtime(f"No more tasks waiting on tty lock! says {uid}") log.runtime(f"No more tasks waiting on tty lock! says {uid}")
if _no_remote_has_tty is not None:
_no_remote_has_tty.set() _no_remote_has_tty.set()
_no_remote_has_tty = None _no_remote_has_tty = None
_global_actor_in_debug = None _global_actor_in_debug = None
log.runtime(f"TTY lock released, remote task: {task_name}:{uid}") log.runtime(
f"TTY lock released, remote task: {task_name}:{uid}"
)
@tractor.context @tractor.context
@ -244,6 +249,10 @@ async def _hijack_stdin_for_child(
the pdbpp debugger console can be allocated to a sub-actor for repl the pdbpp debugger console can be allocated to a sub-actor for repl
bossing. bossing.
NOTE: this task is invoked in the root actor-process of the actor
tree. It is meant to be invoked as an rpc-task which should be
highly reliable at cleaning out the tty-lock state when complete!
''' '''
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
@ -265,9 +274,9 @@ async def _hijack_stdin_for_child(
with ( with (
trio.CancelScope(shield=True), trio.CancelScope(shield=True),
): ):
try: # try:
lock = None # lock = None
async with _acquire_debug_lock(subactor_uid) as lock: async with _acquire_debug_lock(subactor_uid): # as lock:
# indicate to child that we've locked stdio # indicate to child that we've locked stdio
await ctx.started('Locked') await ctx.started('Locked')
@ -279,32 +288,35 @@ async def _hijack_stdin_for_child(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
assert await stream.receive() == 'pdb_unlock' assert await stream.receive() == 'pdb_unlock'
except ( # except (
BaseException, # BaseException,
# trio.MultiError, # # trio.MultiError,
# Exception, # # Exception,
# trio.BrokenResourceError, # # trio.BrokenResourceError,
# trio.Cancelled, # by local cancellation # # trio.Cancelled, # by local cancellation
# trio.ClosedResourceError, # by self._rx_chan # # trio.ClosedResourceError, # by self._rx_chan
# ContextCancelled, # # ContextCancelled,
# ConnectionResetError, # # ConnectionResetError,
): # ):
# XXX: there may be a race with the portal teardown # # XXX: there may be a race with the portal teardown
# with the calling actor which we can safely ignore. # # with the calling actor which we can safely ignore.
# The alternative would be sending an ack message # # The alternative would be sending an ack message
# and allowing the client to wait for us to teardown # # and allowing the client to wait for us to teardown
# first? # # first?
if lock and lock.locked(): # if lock and lock.locked():
lock.release() # try:
# lock.release()
# except RuntimeError:
# log.exception(f"we don't own the tty lock?")
# if isinstance(err, trio.Cancelled): # # if isinstance(err, trio.Cancelled):
raise # raise
finally: # finally:
log.runtime( # log.runtime(
"TTY lock released, remote task:" # "TTY lock released, remote task:"
f"{task_name}:{subactor_uid}" # f"{task_name}:{subactor_uid}"
) # )
return "pdb_unlock_complete" return "pdb_unlock_complete"
@ -385,6 +397,10 @@ def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
signal.SIGINT, signal.SIGINT,
partial(shield_sigint, pdb_obj=pdb), partial(shield_sigint, pdb_obj=pdb),
) )
# XXX: These are the important flags mentioned in
# https://github.com/python-trio/trio/issues/1155
# which resolve the traceback spews to console.
pdb.allow_kbdint = True pdb.allow_kbdint = True
pdb.nosigint = True pdb.nosigint = True
@ -453,11 +469,15 @@ async def _breakpoint(
_local_task_in_debug = task_name _local_task_in_debug = task_name
def child_release_hook(): def child_release_hook():
# _local_task_in_debug = None try:
# sometimes the ``trio`` might already be termianated in
# which case this call will raise.
_local_pdb_complete.set() _local_pdb_complete.set()
finally:
# restore original sigint handler # restore original sigint handler
undo_sigint() undo_sigint()
# should always be cleared in the hijack hook aboved right?
# _local_task_in_debug = None
# assign unlock callback for debugger teardown hooks # assign unlock callback for debugger teardown hooks
# _pdb_release_hook = _local_pdb_complete.set # _pdb_release_hook = _local_pdb_complete.set
@ -471,11 +491,15 @@ async def _breakpoint(
# we have to figure out how to avoid having the service nursery # we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below? # cancel on this task start? I *think* this works below?
# actor._service_n.cancel_scope.shield = shield # actor._service_n.cancel_scope.shield = shield
try:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await actor._service_n.start( await actor._service_n.start(
wait_for_parent_stdin_hijack, wait_for_parent_stdin_hijack,
actor.uid, actor.uid,
) )
except RuntimeError:
child_release_hook()
raise
elif is_root_process(): elif is_root_process():
@ -524,8 +548,12 @@ async def _breakpoint(
_global_actor_in_debug = None _global_actor_in_debug = None
_local_task_in_debug = None _local_task_in_debug = None
_local_pdb_complete.set()
try:
# sometimes the ``trio`` might already be termianated in
# which case this call will raise.
_local_pdb_complete.set()
finally:
# restore original sigint handler # restore original sigint handler
undo_sigint() undo_sigint()
@ -581,20 +609,58 @@ def shield_sigint(
__tracebackhide__ = True __tracebackhide__ = True
global _local_task_in_debug, _global_actor_in_debug global _local_task_in_debug, _global_actor_in_debug
in_debug = _global_actor_in_debug uid_in_debug = _global_actor_in_debug
actor = tractor.current_actor() actor = tractor.current_actor()
def do_cancel():
# If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
# If the runtime is already cancelled it likely means the user
# hit ctrl-c again because teardown didn't full take place in
# which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
any_connected = False
if uid_in_debug is not None:
# try to see if the supposed (sub)actor in debug still
# has an active connection to *this* actor, and if not
# it's likely they aren't using the TTY lock / debugger
# and we should propagate SIGINT normally.
chans = actor._peers.get(tuple(uid_in_debug))
if chans:
any_connected = any(chan.connected() for chan in chans)
if not any_connected:
log.warning(
'A global actor reported to be in debug '
'but no connection exists for this child:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
# root actor branch that reports whether or not a child # root actor branch that reports whether or not a child
# has locked debugger. # has locked debugger.
if ( if (
is_root_process() is_root_process()
and in_debug and uid_in_debug is not None
# XXX: only if there is an existing connection to the
# (sub-)actor in debug do we ignore SIGINT in this
# parent! Otherwise we may hang waiting for an actor
# which has already terminated to unlock.
and any_connected
): ):
name = in_debug[0] name = uid_in_debug[0]
if name != 'root': if name != 'root':
log.pdb( log.pdb(
f"Ignoring SIGINT while child in debug mode: `{in_debug}`" f"Ignoring SIGINT while child in debug mode: `{uid_in_debug}`"
) )
else: else:
@ -606,6 +672,16 @@ def shield_sigint(
elif ( elif (
not is_root_process() not is_root_process()
): ):
chan: Channel = actor._parent_chan
if not chan or not chan.connected():
log.warning(
'A global actor reported to be in debug '
'but no connection exists for its parent:\n'
f'{uid_in_debug}\n'
'Allowing SIGINT propagation..'
)
return do_cancel()
task = _local_task_in_debug task = _local_task_in_debug
if task: if task:
log.pdb( log.pdb(
@ -621,20 +697,6 @@ def shield_sigint(
"Ignoring SIGINT since debug mode is enabled" "Ignoring SIGINT since debug mode is enabled"
) )
# noone has the debugger so raise KBI
else:
# If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
# If the runtime is already cancelled it likely means the user
# hit ctrl-c again because teardown didn't full take place in
# which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
# maybe redraw/print last REPL output to console # maybe redraw/print last REPL output to console
if pdb_obj: if pdb_obj:
@ -648,8 +710,12 @@ def shield_sigint(
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040 # https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
try:
pdb_obj.do_longlist(None) pdb_obj.do_longlist(None)
print(pdb_obj.prompt, end='', flush=True) print(pdb_obj.prompt, end='', flush=True)
except AttributeError:
log.exception('pdbpp longlist failed...')
raise KeyboardInterrupt
def _set_trace( def _set_trace(
@ -667,7 +733,7 @@ def _set_trace(
# start 2 levels up in user code # start 2 levels up in user code
frame: FrameType = sys._getframe() frame: FrameType = sys._getframe()
if frame: if frame:
frame = frame.f_back.f_back # type: ignore frame = frame.f_back # type: ignore
if pdb and actor is not None: if pdb and actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")

View File

@ -511,8 +511,8 @@ class Portal:
if ctx.chan.connected(): if ctx.chan.connected():
log.info( log.info(
'Waiting on final context-task result for\n' 'Waiting on final context-task result for\n'
f'task:{cid}\n' f'task: {cid}\n'
f'actor:{uid}' f'actor: {uid}'
) )
result = await ctx.result() result = await ctx.result()

View File

@ -601,7 +601,14 @@ class Context:
finally: finally:
if self._portal: if self._portal:
self._portal._streams.remove(rchan) try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
async def result(self) -> Any: async def result(self) -> Any:
''' '''

View File

@ -271,7 +271,12 @@ def _run_asyncio_task(
task.exception() task.exception()
except BaseException as terr: except BaseException as terr:
task_err = terr task_err = terr
if isinstance(terr, CancelledError):
log.cancel(f'`asyncio` task cancelled: {task.get_name()}')
else:
log.exception(f'`asyncio` task: {task.get_name()} errored') log.exception(f'`asyncio` task: {task.get_name()} errored')
assert type(terr) is type(aio_err), 'Asyncio task error mismatch?' assert type(terr) is type(aio_err), 'Asyncio task error mismatch?'
if aio_err is not None: if aio_err is not None: