Compare commits

...

23 Commits

Author SHA1 Message Date
Tyler Goodlet bce2de9e20 Add example that triggers bug #302 2022-02-17 13:20:05 -05:00
Tyler Goodlet 16e4da6958 Add back in async gen loop 2022-02-17 13:13:18 -05:00
Tyler Goodlet c79363f4a1 Pre-declare disconnected flag 2022-02-17 13:13:18 -05:00
Tyler Goodlet 013e766a16 Avoid attr error XD 2022-02-17 13:13:18 -05:00
Tyler Goodlet 61cc393a1a Type annot updates 2022-02-17 13:13:18 -05:00
Tyler Goodlet ee3dc5123f Drop uneeded backframe traceback hide annotation 2022-02-17 13:13:18 -05:00
Tyler Goodlet f8494de478 Run first soft wait inside a task
Theoretically we can actually concurrently soft wait the process and the
nursery exit event, the only problem at the moment is some pubsub tests
puking. You're right in guessing this isn't really changing anything but
it is meant to be a reminder. If we can add this the spawn task can
report when a process dies earlier then expected and in the longer term
once we remove the `ActorNursery.run_in_actor()` API, we can probably do
away with both the nursery exit event and the portal result fetching.
2022-02-17 13:13:16 -05:00
Tyler Goodlet fc90e1f171 Make `Actor._process_messages()` report disconnects
The method now returns a `bool` which flags whether the transport died
to the caller and allows for reporting a disconnect in the
channel-transport handler task. This is something a user will normally
want to know about on the caller side especially after seeing
a traceback from the peer (if in tree) on console.
2022-02-17 13:12:42 -05:00
Tyler Goodlet 02210d8f8e Only cancel/get-result from a ctx if transport is up
There's no point in sending a cancel message to the remote linked task
and especially no reason to block waiting on a result from that task if
the transport layer is detected to be disconnected. We expect that the
transport shouldn't go down at the layer of the message loop
(reconnection logic should be handled in the transport layer itself) so
if we detect the channel is not connected we don't bother requesting
cancels nor waiting on a final result message.

Why?

- if the connection goes down in error the caller side won't have a way
  to know "how long" it should block to wait for a cancel ack or result
  and causes a potential hang that may require an additional ctrl-c from
  the user especially if using the debugger or if the traceback is not
  seen on console.
- obviously there's no point in waiting for messages when there's no
  transport to deliver them XD

Further, add some more detailed cancel logging detailing the task and
actor ids.
2022-02-17 13:12:42 -05:00
Tyler Goodlet 194c0e120d Drop high log level in ctx example 2022-02-17 13:12:42 -05:00
Tyler Goodlet a4396e78ee Typing fixes, simplify `_set_trace()` 2022-02-17 13:12:42 -05:00
Tyler Goodlet 2cdf6bd8ce Add notes around py3.10 stdlib bug from `pdb++`
There's a bug that's triggered in the stdlib without latest `pdb++`
installed; add a note for that.

Further inside `wait_for_parent_stdin_hijack()` don't `.started()` until
the interactor stream has been opened to avoid races when debugging this
`._debug.py` module (at the least) since we usually don't want the
spawning (parent) task to resume until we know for sure the tty lock has
been acquired. Also, drop the random checkpoint we had inside
`_breakpoint()`, not sure it was actually adding anything useful since
we're (mostly) carefully shielded throughout this func.
2022-02-17 13:12:42 -05:00
Tyler Goodlet 206a001362 Add and use a pdb instance factory 2022-02-17 13:12:42 -05:00
Tyler Goodlet ed9c359a1d A `.open_context()` example that causes a hang!
Finally! I think this may be the root issue we've been seeing in
production in a client project.

No idea yet why this is happening but the fault-causing sequence seems
to be:
- `.open_context()` in a child actor
- enter the debugger via `tractor.breakpoint()`
- continue from that entry via `c` command in REPL
- raise an error just after inside the context task's body

Looking at logging it appears as though the child thinks it has the tty
but no input is accepted on the REPL and a further `ctrl-c` results in
some teardown but also a further hang where both parent and child become
unresponsive..
2022-02-17 13:12:42 -05:00
Tyler Goodlet 5322a6604e Drop all the `@cm.__exit__()` override attempts..
None of it worked (you still will see `.__exit__()` frames on debugger
entry - you'd think this would have been solved by now but, shrug) so
instead wrap the debugger entry-point in a `try:` and put the SIGINT
handler restoration inside `MultiActorPdb` teardown hooks.

This seems to restore the UX as it was prior but with also giving the
desired SIGINT override handler behaviour.
2022-02-17 13:12:42 -05:00
Tyler Goodlet 1142b802d3 Try overriding `_GeneratorContextManager.__exit__()`; didn't work..
Using either of `@pdb.hideframe` or `__tracebackhide__` on stdlib
methods doesn't seem to work either.. This all seems to have something
to do with async generator usage I think ?
2022-02-17 13:12:42 -05:00
Tyler Goodlet 1524c94b23 Fix example name typo 2022-02-17 13:12:42 -05:00
Tyler Goodlet c6d04028fe Handle a context cancel? Might be a noop 2022-02-17 13:12:42 -05:00
Tyler Goodlet 7cee5cfa57 Add a pre-started breakpoint example 2022-02-17 13:12:42 -05:00
Tyler Goodlet b7b03889d6 Make `mypy` happy 2022-02-17 13:12:42 -05:00
Tyler Goodlet 125ce3c2f4 Refine the handler for child vs. root cases
This gets very close to avoiding any possible hangs to do with tty
locking and SIGINT handling minus a special case that will be detailed
below.

Summary of implementation changes:

- convert `_mk_pdb()` -> `with _open_pdb() as pdb:` which implicitly
  handles the `bdb.BdbQuit` case such that debugger teardown hooks are
  always called.
- rename the handler to `shield_sigint()` and handle a variety of new
  cases:
  * the root is in debug but hasn't been cancelled -> call
    `Actor.cancel_soon()`
  * the root is in debug but *has* been called (`Actor.cancel_soon()`
    already called) -> raise KBI
  * a child is in debug *and* has a task locking the debugger -> ignore
    SIGINT in child *and* the root actor.
- if the debugger instance is provided to the handler at acquire time,
  on SIGINT handling completion re-print the last pdb++ REPL output so
  that the user realizes they are still actively in debug.
- ignore the unlock case where a race condition of "no task" holding the
  lock causes the `RuntimeError` normally associated with the "wrong
  task" doing so (not sure if this is a `trio` bug?).
- change debug logs to runtime level.

Unhandled case(s):

- a child is maybe in debug mode but does not itself have any task using
  the debugger.
    * ToDo: we need a way to decide what to do with
      "intermediate" child actors who themselves either are not in
      `debug_mode=True` but have children who *are* such that a SIGINT
      won't cause cancellation of that child-as-parent-of-another-child
      **iff** any of their children are in in debug mode.
2022-02-17 13:12:42 -05:00
Tyler Goodlet f830ffdabc (facepalm) Reraise `BdbQuit` and discard ownerless lock releases 2022-02-17 13:12:42 -05:00
Tyler Goodlet 119cd551c3 Add WIP while-debugger-active SIGINT ignore handler 2022-02-17 13:12:42 -05:00
7 changed files with 577 additions and 197 deletions

View File

@ -0,0 +1,50 @@
import tractor
import trio
async def gen():
yield 'yo'
await tractor.breakpoint()
yield 'yo'
await tractor.breakpoint()
@tractor.context
async def just_bp(
ctx: tractor.Context,
) -> None:
await ctx.started()
await tractor.breakpoint()
# TODO: bps and errors in this call..
async for val in gen():
print(val)
# await trio.sleep(0.5)
# prematurely destroy the connection
await ctx.chan.aclose()
# THIS CAUSES AN UNRECOVERABLE HANG
# without latest ``pdbpp``:
assert 0
async def main():
async with tractor.open_nursery(
debug_mode=True,
) as n:
p = await n.start_actor(
'bp_boi',
enable_modules=[__name__],
)
async with p.open_context(
just_bp,
) as (ctx, first):
await trio.sleep_forever()
if __name__ == '__main__':
trio.run(main)

View File

@ -0,0 +1,49 @@
import trio
import click
import tractor
import pydantic
# from multiprocessing import shared_memory
@tractor.context
async def just_sleep(
ctx: tractor.Context,
**kwargs,
) -> None:
'''
Test a small ping-pong 2-way streaming server.
'''
await ctx.started()
await trio.sleep_forever()
async def main() -> None:
proc = await trio.open_process( (
'python',
'-c',
'import trio; trio.run(trio.sleep_forever)',
))
await proc.wait()
# await trio.sleep_forever()
# async with tractor.open_nursery() as n:
# portal = await n.start_actor(
# 'rpc_server',
# enable_modules=[__name__],
# )
# async with portal.open_context(
# just_sleep, # taken from pytest parameterization
# ) as (ctx, sent):
# await trio.sleep_forever()
if __name__ == '__main__':
import time
# time.sleep(999)
trio.run(main)

View File

@ -26,8 +26,11 @@ import importlib
import importlib.util
import inspect
import uuid
import typing
from typing import Any, Optional, Union
from typing import (
Any, Optional,
Union, TYPE_CHECKING,
Callable,
)
from types import ModuleType
import sys
import os
@ -57,6 +60,10 @@ from . import _state
from . import _mp_fixup_main
if TYPE_CHECKING:
from ._supervise import ActorNursery
log = get_logger('tractor')
@ -65,7 +72,7 @@ async def _invoke(
actor: 'Actor',
cid: str,
chan: Channel,
func: typing.Callable,
func: Callable,
kwargs: dict[str, Any],
is_rpc: bool = True,
@ -200,7 +207,7 @@ async def _invoke(
ctx = actor._contexts.pop((chan.uid, cid))
if ctx:
log.runtime(
f'Context entrypoint for {func} was terminated:\n{ctx}'
f'Context entrypoint {func} was terminated:\n{ctx}'
)
assert cs
@ -316,7 +323,9 @@ async def try_ship_error_to_parent(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
log.error(
# in SC terms this is one of the worst things that can
# happen and creates the 2-general's dilemma.
log.critical(
f"Failed to ship error to parent "
f"{channel.uid}, channel was closed"
)
@ -424,7 +433,7 @@ class Actor:
# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[
tuple[Channel, str],
tuple[trio.CancelScope, typing.Callable, trio.Event]
tuple[trio.CancelScope, Callable, trio.Event]
] = {}
# map {actor uids -> Context}
@ -513,6 +522,7 @@ class Actor:
self._no_more_peers = trio.Event() # unset
chan = Channel.from_stream(stream)
uid: Optional[tuple[str, str]] = chan.uid
log.runtime(f"New connection to us {chan}")
# send/receive initial handshake response
@ -560,33 +570,51 @@ class Actor:
# append new channel
self._peers[uid].append(chan)
local_nursery: Optional[ActorNursery] = None # noqa
disconnected: bool = False
# Begin channel management - respond to remote requests and
# process received reponses.
try:
await self._process_messages(chan)
disconnected = await self._process_messages(chan)
except trio.Cancelled:
except (
trio.Cancelled,
):
log.cancel(f"Msg loop was cancelled for {chan}")
raise
finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
# This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
local_nursery = self._actoruid2nursery.get(chan.uid)
if (
local_nursery
):
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report that the
# IPC layer may have failed unexpectedly since it may be
# the cause of other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
_, proc, _ = entry
log.error(f'Actor {uid}@{proc} IPC connection broke!?')
# if proc.poll() is not None:
# log.error('Actor {uid} proc died and IPC broke?')
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the remote
# peer side since we presume that any channel which
# is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries)
# message is sent to the peer likely by this actor which is
# now in a cancelled condition) when the local runtime here
# is now cancelled while (presumably) in the middle of msg
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
# which is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries) has a message is sent to
# the peer likely by this actor (which is now in
# a cancelled condition) when the local runtime here is
# now cancelled while (presumably) in the middle of msg
# loop processing.
with trio.move_on_after(0.5) as cs:
cs.shield = True
@ -609,6 +637,8 @@ class Actor:
await local_nursery.exited.wait()
# if local_nursery._children
# ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected
@ -618,7 +648,7 @@ class Actor:
if not chans:
log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None)
self._peers.pop(uid, None)
# for (uid, cid) in self._contexts.copy():
# if chan.uid == uid:
@ -626,11 +656,13 @@ class Actor:
log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channels")
self._no_more_peers.set()
# # XXX: is this necessary (GC should do it?)
# XXX: is this necessary (GC should do it)?
if chan.connected():
# if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to
@ -665,8 +697,8 @@ class Actor:
ctx = self._contexts[(uid, cid)]
except KeyError:
log.warning(
f'Ignoring msg from [no-longer/un]known context with {uid}:'
f'\n{msg}')
f'Ignoring msg from [no-longer/un]known context {uid}:'
f'\n{msg}')
return
send_chan = ctx._send_chan
@ -813,7 +845,7 @@ class Actor:
shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
) -> bool:
'''
Process messages for the channel async-RPC style.
@ -839,7 +871,7 @@ class Actor:
if msg is None: # loop terminate sentinel
log.cancel(
f"Channerl to {chan.uid} terminated?\n"
f"Channel to {chan.uid} terminated?\n"
"Cancelling all associated tasks..")
for (channel, cid) in self._rpc_tasks.copy():
@ -986,6 +1018,9 @@ class Actor:
# up.
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err:
if nursery_cancelled_before_task:
sn = self._service_n
@ -1010,6 +1045,9 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
# transport **was not** disconnected
return False
async def _from_parent(
self,
parent_addr: Optional[tuple[str, int]],

View File

@ -18,8 +18,10 @@
Multi-core debugging for da peeps!
"""
from __future__ import annotations
import bdb
import sys
import signal
from functools import partial
from contextlib import asynccontextmanager as acm
from typing import (
@ -29,24 +31,26 @@ from typing import (
AsyncIterator,
AsyncGenerator,
)
from types import FrameType
import tractor
import trio
from trio_typing import TaskStatus
from .log import get_logger
from . import _state
from ._discovery import get_root
from ._state import is_root_process, debug_mode
from ._exceptions import is_multi_cancelled
try:
# wtf: only exported when installed in dev mode?
import pdbpp
except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff
import pdb
assert pdb.xpm, "pdbpp is not installed?" # type: ignore
xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb
log = get_logger(__name__)
@ -81,11 +85,14 @@ class TractorConfig(pdbpp.DefaultConfig):
"""Custom ``pdbpp`` goodness.
"""
# sticky_by_default = True
enable_hidden_frames = False
class PdbwTeardown(pdbpp.Pdb):
"""Add teardown hooks to the regular ``pdbpp.Pdb``.
"""
class MultiActorPdb(pdbpp.Pdb):
'''
Add teardown hooks to the regular ``pdbpp.Pdb``.
'''
# override the pdbpp config with our coolio one
DefaultConfig = TractorConfig
@ -95,17 +102,19 @@ class PdbwTeardown(pdbpp.Pdb):
try:
super().set_continue()
finally:
global _local_task_in_debug
global _local_task_in_debug, _pdb_release_hook
_local_task_in_debug = None
_pdb_release_hook()
if _pdb_release_hook:
_pdb_release_hook()
def set_quit(self):
try:
super().set_quit()
finally:
global _local_task_in_debug
global _local_task_in_debug, _pdb_release_hook
_local_task_in_debug = None
_pdb_release_hook()
if _pdb_release_hook:
_pdb_release_hook()
# TODO: will be needed whenever we get to true remote debugging.
@ -150,7 +159,8 @@ async def _acquire_debug_lock(
uid: Tuple[str, str]
) -> AsyncIterator[trio.StrictFIFOLock]:
'''Acquire a root-actor local FIFO lock which tracks mutex access of
'''
Acquire a root-actor local FIFO lock which tracks mutex access of
the process tree's global debugger breakpoint.
This lock avoids tty clobbering (by preventing multiple processes
@ -162,7 +172,7 @@ async def _acquire_debug_lock(
task_name = trio.lowlevel.current_task().name
log.debug(
log.runtime(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
)
@ -175,14 +185,14 @@ async def _acquire_debug_lock(
_no_remote_has_tty = trio.Event()
try:
log.debug(
log.runtime(
f"entering lock checkpoint, remote task: {task_name}:{uid}"
)
we_acquired = True
await _debug_lock.acquire()
_global_actor_in_debug = uid
log.debug(f"TTY lock acquired, remote task: {task_name}:{uid}")
log.runtime(f"TTY lock acquired, remote task: {task_name}:{uid}")
# NOTE: critical section: this yield is unshielded!
@ -198,7 +208,10 @@ async def _acquire_debug_lock(
finally:
# if _global_actor_in_debug == uid:
if we_acquired and _debug_lock.locked():
if (
we_acquired
and _debug_lock.locked()
):
_debug_lock.release()
# IFF there are no more requesting tasks queued up fire, the
@ -210,29 +223,13 @@ async def _acquire_debug_lock(
if (
not stats.owner
):
log.debug(f"No more tasks waiting on tty lock! says {uid}")
log.runtime(f"No more tasks waiting on tty lock! says {uid}")
_no_remote_has_tty.set()
_no_remote_has_tty = None
_global_actor_in_debug = None
log.debug(f"TTY lock released, remote task: {task_name}:{uid}")
def handler(signum, frame, *args):
"""Specialized debugger compatible SIGINT handler.
In childred we always ignore to avoid deadlocks since cancellation
should always be managed by the parent supervising actor. The root
is always cancelled on ctrl-c.
"""
if is_root_process():
tractor.current_actor().cancel_soon()
else:
print(
"tractor ignores SIGINT while in debug mode\n"
"If you have a special need for it please open an issue.\n"
)
log.runtime(f"TTY lock released, remote task: {task_name}:{uid}")
@tractor.context
@ -260,46 +257,62 @@ async def _hijack_stdin_for_child(
log.debug(f"Actor {subactor_uid} is WAITING on stdin hijack lock")
with trio.CancelScope(shield=True):
orig_handler = signal.signal(
signal.SIGINT,
shield_sigint,
)
try:
with (
trio.CancelScope(shield=True),
):
try:
lock = None
async with _acquire_debug_lock(subactor_uid) as lock:
try:
lock = None
async with _acquire_debug_lock(subactor_uid) as lock:
# indicate to child that we've locked stdio
await ctx.started('Locked')
log.debug(
f"Actor {subactor_uid} acquired stdin hijack lock"
)
# indicate to child that we've locked stdio
await ctx.started('Locked')
log.debug(f"Actor {subactor_uid} acquired stdin hijack lock")
# wait for unlock pdb by child
async with ctx.open_stream() as stream:
assert await stream.receive() == 'pdb_unlock'
# wait for unlock pdb by child
async with ctx.open_stream() as stream:
assert await stream.receive() == 'pdb_unlock'
except (
BaseException,
# trio.MultiError,
# Exception,
# trio.BrokenResourceError,
# trio.Cancelled, # by local cancellation
# trio.ClosedResourceError, # by self._rx_chan
# ContextCancelled,
# ConnectionResetError,
):
# XXX: there may be a race with the portal teardown
# with the calling actor which we can safely ignore.
# The alternative would be sending an ack message
# and allowing the client to wait for us to teardown
# first?
if lock and lock.locked():
lock.release()
# try:
# assert await stream.receive() == 'pdb_unlock'
except (
# BaseException,
trio.MultiError,
trio.BrokenResourceError,
trio.Cancelled, # by local cancellation
trio.ClosedResourceError, # by self._rx_chan
) as err:
# XXX: there may be a race with the portal teardown
# with the calling actor which we can safely ignore.
# The alternative would be sending an ack message
# and allowing the client to wait for us to teardown
# first?
if lock and lock.locked():
lock.release()
if isinstance(err, trio.Cancelled):
# if isinstance(err, trio.Cancelled):
raise
finally:
log.debug(
"TTY lock released, remote task:"
f"{task_name}:{subactor_uid}")
return "pdb_unlock_complete"
finally:
log.runtime(
"TTY lock released, remote task:"
f"{task_name}:{subactor_uid}"
)
return "pdb_unlock_complete"
finally:
signal.signal(
signal.SIGINT,
orig_handler
)
async def wait_for_parent_stdin_hijack(
@ -338,20 +351,22 @@ async def wait_for_parent_stdin_hijack(
async with ctx.open_stream() as stream:
# unblock local caller
task_status.started(cs)
try:
assert _local_pdb_complete
task_status.started(cs)
await _local_pdb_complete.wait()
finally:
# TODO: shielding currently can cause hangs...
with trio.CancelScope(shield=True):
await stream.send('pdb_unlock')
# with trio.CancelScope(shield=True):
await stream.send('pdb_unlock')
# sync with callee termination
assert await ctx.result() == "pdb_unlock_complete"
log.pdb('unlocked context')
except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock')
@ -362,6 +377,28 @@ async def wait_for_parent_stdin_hijack(
log.debug(f"Child {actor_uid} released parent stdio lock")
def mk_mpdb() -> tuple[MultiActorPdb, Callable]:
pdb = MultiActorPdb()
signal.signal = pdbpp.hideframe(signal.signal)
orig_handler = signal.signal(
signal.SIGINT,
partial(shield_sigint, pdb_obj=pdb),
)
pdb.allow_kbdint = True
pdb.nosigint = True
# TODO: add this as method on our pdb obj?
def undo_sigint():
# restore original sigint handler
signal.signal(
signal.SIGINT,
orig_handler
)
return pdb, undo_sigint
async def _breakpoint(
debug_func,
@ -370,23 +407,26 @@ async def _breakpoint(
# shield: bool = False
) -> None:
'''``tractor`` breakpoint entry for engaging pdb machinery
in the root or a subactor.
'''
breakpoint entry for engaging pdb machinery in the root or
a subactor.
'''
# TODO: is it possible to debug a trio.Cancelled except block?
# right now it seems like we can kinda do with by shielding
# around ``tractor.breakpoint()`` but not if we move the shielded
# scope here???
# with trio.CancelScope(shield=shield):
__tracebackhide__ = True
pdb, undo_sigint = mk_mpdb()
actor = tractor.current_actor()
task_name = trio.lowlevel.current_task().name
global _local_pdb_complete, _pdb_release_hook
global _local_task_in_debug, _global_actor_in_debug
await trio.lowlevel.checkpoint()
# TODO: is it possible to debug a trio.Cancelled except block?
# right now it seems like we can kinda do with by shielding
# around ``tractor.breakpoint()`` but not if we move the shielded
# scope here???
# with trio.CancelScope(shield=shield):
# await trio.lowlevel.checkpoint()
if not _local_pdb_complete or _local_pdb_complete.is_set():
_local_pdb_complete = trio.Event()
@ -412,8 +452,16 @@ async def _breakpoint(
# entries/requests to the root process
_local_task_in_debug = task_name
def child_release_hook():
# _local_task_in_debug = None
_local_pdb_complete.set()
# restore original sigint handler
undo_sigint()
# assign unlock callback for debugger teardown hooks
_pdb_release_hook = _local_pdb_complete.set
# _pdb_release_hook = _local_pdb_complete.set
_pdb_release_hook = child_release_hook
# this **must** be awaited by the caller and is done using the
# root nursery so that the debugger can continue to run without
@ -464,59 +512,174 @@ async def _breakpoint(
global _local_pdb_complete, _debug_lock
global _global_actor_in_debug, _local_task_in_debug
_debug_lock.release()
try:
_debug_lock.release()
except RuntimeError:
# uhhh makes no sense but been seeing the non-owner
# release error even though this is definitely the task
# that locked?
owner = _debug_lock.statistics().owner
if owner:
raise
_global_actor_in_debug = None
_local_task_in_debug = None
_local_pdb_complete.set()
# restore original sigint handler
undo_sigint()
_pdb_release_hook = teardown
# block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio.
log.debug("Entering the synchronous world of pdb")
debug_func(actor)
# frame = sys._getframe()
# last_f = frame.f_back
# last_f.f_globals['__tracebackhide__'] = True
try:
# block here one (at the appropriate frame *up*) where
# ``breakpoint()`` was awaited and begin handling stdio.
log.debug("Entering the synchronous world of pdb")
debug_func(actor, pdb)
except bdb.BdbQuit:
if _pdb_release_hook:
_pdb_release_hook()
raise
# XXX: apparently we can't do this without showing this frame
# in the backtrace on first entry to the REPL? Seems like an odd
# behaviour that should have been fixed by now. This is also why
# we scrapped all the @cm approaches that were tried previously.
# finally:
# __tracebackhide__ = True
# # frame = sys._getframe()
# # last_f = frame.f_back
# # last_f.f_globals['__tracebackhide__'] = True
# # signal.signal = pdbpp.hideframe(signal.signal)
# signal.signal(
# signal.SIGINT,
# orig_handler
# )
def _mk_pdb() -> PdbwTeardown:
def shield_sigint(
signum: int,
frame: 'frame', # type: ignore # noqa
pdb_obj: Optional[MultiActorPdb] = None,
*args,
# XXX: setting these flags on the pdb instance are absolutely
# critical to having ctrl-c work in the ``trio`` standard way! The
# stdlib's pdb supports entering the current sync frame on a SIGINT,
# with ``trio`` we pretty much never want this and if we did we can
# handle it in the ``tractor`` task runtime.
) -> None:
'''
Specialized debugger compatible SIGINT handler.
pdb = PdbwTeardown()
pdb.allow_kbdint = True
pdb.nosigint = True
In childred we always ignore to avoid deadlocks since cancellation
should always be managed by the parent supervising actor. The root
is always cancelled on ctrl-c.
return pdb
'''
__tracebackhide__ = True
global _local_task_in_debug, _global_actor_in_debug
in_debug = _global_actor_in_debug
actor = tractor.current_actor()
# root actor branch that reports whether or not a child
# has locked debugger.
if (
is_root_process()
and in_debug
):
name = in_debug[0]
if name != 'root':
log.pdb(
f"Ignoring SIGINT while child in debug mode: `{in_debug}`"
)
else:
log.pdb(
"Ignoring SIGINT while in debug mode"
)
# child actor that has locked the debugger
elif (
not is_root_process()
):
task = _local_task_in_debug
if task:
log.pdb(
f"Ignoring SIGINT while task in debug mode: `{task}`"
)
# TODO: how to handle the case of an intermediary-child actor
# that **is not** marked in debug mode?
# elif debug_mode():
else:
log.pdb(
"Ignoring SIGINT since debug mode is enabled"
)
# noone has the debugger so raise KBI
else:
# If we haven't tried to cancel the runtime then do that instead
# of raising a KBI (which may non-gracefully destroy
# a ``trio.run()``).
if not actor._cancel_called:
actor.cancel_soon()
# If the runtime is already cancelled it likely means the user
# hit ctrl-c again because teardown didn't full take place in
# which case we do the "hard" raising of a local KBI.
else:
raise KeyboardInterrupt
# maybe redraw/print last REPL output to console
if pdb_obj:
# TODO: make this work like sticky mode where if there is output
# detected as written to the tty we redraw this part underneath
# and erase the past draw of this same bit above?
# pdb_obj.sticky = True
# pdb_obj._print_if_sticky()
# also see these links for an approach from ``ptk``:
# https://github.com/goodboy/tractor/issues/130#issuecomment-663752040
# https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py
pdb_obj.do_longlist(None)
print(pdb_obj.prompt, end='', flush=True)
def _set_trace(actor=None):
pdb = _mk_pdb()
def _set_trace(
actor: Optional[tractor._actor.Actor] = None,
pdb: Optional[MultiActorPdb] = None,
):
__tracebackhide__ = True
actor = actor or tractor.current_actor()
if actor is not None:
# XXX: on latest ``pdbpp`` i guess we don't need this?
# frame = sys._getframe()
# last_f = frame.f_back
# last_f.f_globals['__tracebackhide__'] = True
# start 2 levels up in user code
frame: FrameType = sys._getframe()
if frame:
frame = frame.f_back.f_back # type: ignore
if pdb and actor is not None:
log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n")
pdb.set_trace(
# start 2 levels up in user code
frame=sys._getframe().f_back.f_back,
)
else:
# we entered the global ``breakpoint()`` built-in from sync code
pdb, undo_sigint = mk_mpdb()
# we entered the global ``breakpoint()`` built-in from sync code?
global _local_task_in_debug, _pdb_release_hook
_local_task_in_debug = 'sync'
def nuttin():
pass
_pdb_release_hook = nuttin
pdb.set_trace(
# start 2 levels up in user code
frame=sys._getframe().f_back,
)
pdb.set_trace(frame=frame)
breakpoint = partial(
@ -525,11 +688,40 @@ breakpoint = partial(
)
def _post_mortem(actor):
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
pdb = _mk_pdb()
def _post_mortem(
actor: tractor._actor.Actor,
pdb: MultiActorPdb,
) -> None:
'''
Enter the ``pdbpp`` port mortem entrypoint using our custom
debugger instance.
'''
log.pdb(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
# XXX: on py3.10 if you don't have latest ``pdbpp`` installed.
# The exception looks something like:
# Traceback (most recent call last):
# File ".../tractor/_debug.py", line 729, in _post_mortem
# for _ in range(100):
# File "../site-packages/pdb.py", line 1227, in xpm
# post_mortem(info[2], Pdb)
# File "../site-packages/pdb.py", line 1175, in post_mortem
# p.interaction(None, t)
# File "../site-packages/pdb.py", line 216, in interaction
# ret = self.setup(frame, traceback)
# File "../site-packages/pdb.py", line 259, in setup
# ret = super(Pdb, self).setup(frame, tb)
# File "/usr/lib/python3.10/pdb.py", line 217, in setup
# self.curframe = self.stack[self.curindex][0]
# IndexError: list index out of range
# NOTE: you need ``pdbpp`` master (at least this commit
# https://github.com/pdbpp/pdbpp/commit/b757794857f98d53e3ebbe70879663d7d843a6c2)
# to fix this and avoid the hang it causes XD.
# see also: https://github.com/pdbpp/pdbpp/issues/480
# custom Pdb post-mortem entry
pdbpp.xpm(Pdb=lambda: pdb)

View File

@ -24,7 +24,8 @@ import importlib
import inspect
from typing import (
Any, Optional,
Callable, AsyncGenerator
Callable, AsyncGenerator,
Type,
)
from functools import partial
from dataclasses import dataclass
@ -442,6 +443,10 @@ class Portal:
_err: Optional[BaseException] = None
ctx._portal = self
uid = self.channel.uid
cid = ctx.cid
etype: Optional[Type[BaseException]] = None
# deliver context instance and .started() msg value in open tuple.
try:
async with trio.open_nursery() as scope_nursery:
@ -477,13 +482,24 @@ class Portal:
# KeyboardInterrupt,
) as err:
_err = err
etype = type(err)
# the context cancels itself on any cancel
# causing error.
log.cancel(
f'Context to {self.channel.uid} sending cancel request..')
await ctx.cancel()
if ctx.chan.connected():
log.cancel(
'Context cancelled for task, sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
await ctx.cancel()
else:
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
f'actor:{uid}'
)
raise
finally:
@ -492,7 +508,13 @@ class Portal:
# sure we get the error the underlying feeder mem chan.
# if it's not raised here it *should* be raised from the
# msg loop nursery right?
result = await ctx.result()
if ctx.chan.connected():
log.info(
'Waiting on final context-task result for\n'
f'task:{cid}\n'
f'actor:{uid}'
)
result = await ctx.result()
# though it should be impossible for any tasks
# operating *in* this scope to have survived
@ -502,14 +524,17 @@ class Portal:
# should we encapsulate this in the context api?
await ctx._recv_chan.aclose()
if _err:
if etype:
if ctx._cancel_called:
log.cancel(
f'Context {fn_name} cancelled by caller with\n{_err}'
f'Context {fn_name} cancelled by caller with\n{etype}'
)
elif _err is not None:
log.cancel(
f'Context {fn_name} cancelled by callee with\n{_err}'
f'Context for task cancelled by callee with {etype}\n'
f'target: `{fn_name}`\n'
f'task:{cid}\n'
f'actor:{uid}'
)
else:
log.runtime(

View File

@ -18,17 +18,30 @@
Machinery for actor process spawning using multiple backends.
"""
from __future__ import annotations
import sys
import multiprocessing as mp
import platform
from typing import (
Any, Optional, Callable, TypeVar, TYPE_CHECKING
Any, Dict, Optional, Callable,
TypeVar,
)
from collections.abc import Awaitable
import trio
from trio_typing import TaskStatus
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override
from ._debug import (
maybe_wait_for_debugger,
acquire_debug_lock,
@ -47,11 +60,8 @@ from ._entry import _mp_main
from ._exceptions import ActorFailure
if TYPE_CHECKING:
import multiprocessing as mp
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
log = get_logger('tractor')
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
# placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None
@ -60,7 +70,6 @@ _spawn_method: str = "trio"
if platform.system() == 'Windows':
import multiprocessing as mp
_ctx = mp.get_context("spawn")
async def proc_waiter(proc: mp.Process) -> None:
@ -83,7 +92,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
``subprocess.Popen``.
'''
import multiprocessing as mp
global _ctx
global _spawn_method
@ -100,7 +108,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
f"Spawn method `{name}` is invalid please choose one of {methods}"
)
elif name == 'forkserver':
from . import _forkserver_override
_forkserver_override.override_stdlib()
_ctx = mp.get_context(name)
elif name == 'trio':
@ -148,7 +155,7 @@ async def cancel_on_completion(
portal: Portal,
actor: Actor,
errors: dict[tuple[str, str], Exception],
errors: Dict[Tuple[str, str], Exception],
) -> None:
'''
@ -251,12 +258,12 @@ async def new_proc(
name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor,
errors: dict[tuple[str, str], Exception],
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
@ -288,7 +295,7 @@ async def new_proc(
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
str(subactor.uid),
str(uid),
# Address the child must connect to on startup
"--parent_addr",
str(parent_addr)
@ -314,8 +321,7 @@ async def new_proc(
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
event, chan = await actor_nursery._actor.wait_for_peer(uid)
except trio.Cancelled:
cancelled_during_spawn = True
@ -356,10 +362,54 @@ async def new_proc(
task_status.started(portal)
# wait for ActorNursery.wait() to be called
n_exited = actor_nursery._join_procs
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
await n_exited.wait()
async with trio.open_nursery() as nursery:
async def soft_wait_and_maybe_cancel_ria_task():
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
if n_exited.is_set():
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}"
)
nursery.cancel_scope.cancel()
else:
log.warning(
f'Process for actor {uid} terminated before'
'nursery exit. ' 'This may mean an IPC'
'connection failed!'
)
nursery.start_soon(soft_wait_and_maybe_cancel_ria_task)
# TODO: when we finally remove the `.run_in_actor()` api
# we should be able to entirely drop these 2 blocking calls:
# - we don't need to wait on nursery exit to capture
# process-spawn-machinery level errors (and propagate them).
# - we don't need to wait on final results from ria portals
# since this will be done in some higher level wrapper API.
# XXX: interestingly we can't put this here bc it causes
# the pub-sub tests to fail? wth.. should probably drop
# those XD
# wait for ActorNursery.wait() to be called
# with trio.CancelScope(shield=True):
# await n_exited.wait()
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
@ -368,22 +418,6 @@ async def new_proc(
errors
)
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()
finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
@ -400,9 +434,10 @@ async def new_proc(
await proc.wait()
if is_root_process():
await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False),
'_debug_mode', False)
)
if proc.poll() is None:
@ -441,30 +476,20 @@ async def mp_new_proc(
name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor,
errors: dict[tuple[str, str], Exception],
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
# uggh zone
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
assert _ctx
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
from multiprocessing import forkserver # type: ignore
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
fs = forkserver._forkserver

View File

@ -604,7 +604,8 @@ class Context:
self._portal._streams.remove(rchan)
async def result(self) -> Any:
'''From a caller side, wait for and return the final result from
'''
From a caller side, wait for and return the final result from
the callee side task.
'''