forked from goodboy/tractor
Merge pull request #170 from goodboy/pdb_madness
End the `pdb` SIGINT handling madnessstream_channel_shield
commit
a510eb0b2b
|
@ -39,7 +39,8 @@ async def main():
|
||||||
portal = await n.run_in_actor('spawner0', spawn_until, depth=0)
|
portal = await n.run_in_actor('spawner0', spawn_until, depth=0)
|
||||||
portal1 = await n.run_in_actor('spawner1', spawn_until, depth=1)
|
portal1 = await n.run_in_actor('spawner1', spawn_until, depth=1)
|
||||||
|
|
||||||
# nursery cancellation should be triggered due to propagated error
|
# nursery cancellation should be triggered due to propagated
|
||||||
|
# error from child.
|
||||||
await portal.result()
|
await portal.result()
|
||||||
await portal1.result()
|
await portal1.result()
|
||||||
|
|
||||||
|
|
|
@ -365,7 +365,7 @@ def test_multi_nested_subactors_error_through_nurseries(spawn):
|
||||||
assert "NameError" in before
|
assert "NameError" in before
|
||||||
|
|
||||||
|
|
||||||
def test_root_nursery_cancels_before_child_releases_tty_lock(spawn):
|
def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method):
|
||||||
"""Test that when the root sends a cancel message before a nested
|
"""Test that when the root sends a cancel message before a nested
|
||||||
child has unblocked (which can happen when it has the tty lock and
|
child has unblocked (which can happen when it has the tty lock and
|
||||||
is engaged in pdb) it is indeed cancelled after exiting the debugger.
|
is engaged in pdb) it is indeed cancelled after exiting the debugger.
|
||||||
|
@ -380,7 +380,15 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn):
|
||||||
child.sendline('c')
|
child.sendline('c')
|
||||||
|
|
||||||
for _ in range(4):
|
for _ in range(4):
|
||||||
|
try:
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
except TimeoutError:
|
||||||
|
if start_method == 'mp':
|
||||||
|
# appears to be some little races that might result in the
|
||||||
|
# last couple acts tearing down early
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
before = str(child.before.decode())
|
before = str(child.before.decode())
|
||||||
assert "NameError: name 'doggypants' is not defined" in before
|
assert "NameError: name 'doggypants' is not defined" in before
|
||||||
|
|
|
@ -117,9 +117,13 @@ async def _main(
|
||||||
# Note that if the current actor is the arbiter it is desirable
|
# Note that if the current actor is the arbiter it is desirable
|
||||||
# for it to stay up indefinitely until a re-election process has
|
# for it to stay up indefinitely until a re-election process has
|
||||||
# taken place - which is not implemented yet FYI).
|
# taken place - which is not implemented yet FYI).
|
||||||
|
|
||||||
|
try:
|
||||||
return await _start_actor(
|
return await _start_actor(
|
||||||
actor, main, host, port, arbiter_addr=arbiter_addr
|
actor, main, host, port, arbiter_addr=arbiter_addr
|
||||||
)
|
)
|
||||||
|
finally:
|
||||||
|
logger.info("Root actor terminated")
|
||||||
|
|
||||||
|
|
||||||
def run(
|
def run(
|
||||||
|
|
|
@ -767,8 +767,8 @@ class Actor:
|
||||||
finally:
|
finally:
|
||||||
log.info("Root nursery complete")
|
log.info("Root nursery complete")
|
||||||
|
|
||||||
# tear down all lifetime contexts
|
# tear down all lifetime contexts if not in guest mode
|
||||||
# api idea: ``tractor.open_context()``
|
# XXX: should this just be in the entrypoint?
|
||||||
log.warning("Closing all actor lifetime contexts")
|
log.warning("Closing all actor lifetime contexts")
|
||||||
self._lifetime_stack.close()
|
self._lifetime_stack.close()
|
||||||
|
|
||||||
|
@ -821,7 +821,7 @@ class Actor:
|
||||||
self._server_down = trio.Event()
|
self._server_down = trio.Event()
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as server_n:
|
async with trio.open_nursery() as server_n:
|
||||||
listeners: List[trio.abc.Listener] = await server_n.start(
|
l: List[trio.abc.Listener] = await server_n.start(
|
||||||
partial(
|
partial(
|
||||||
trio.serve_tcp,
|
trio.serve_tcp,
|
||||||
self._stream_handler,
|
self._stream_handler,
|
||||||
|
@ -832,9 +832,10 @@ class Actor:
|
||||||
host=accept_host,
|
host=accept_host,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log.debug("Started tcp server(s) on" # type: ignore
|
log.debug(
|
||||||
f" {[l.socket for l in listeners]}")
|
"Started tcp server(s) on"
|
||||||
self._listeners.extend(listeners)
|
f" {[getattr(l, 'socket', 'unknown socket') for l in l]}")
|
||||||
|
self._listeners.extend(l)
|
||||||
task_status.started(server_n)
|
task_status.started(server_n)
|
||||||
finally:
|
finally:
|
||||||
# signal the server is down since nursery above terminated
|
# signal the server is down since nursery above terminated
|
||||||
|
|
|
@ -6,12 +6,10 @@ import sys
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
|
from typing import Awaitable, Tuple, Optional, Callable, AsyncIterator
|
||||||
# import signal
|
|
||||||
|
|
||||||
from async_generator import aclosing
|
from async_generator import aclosing
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
from trio.testing import wait_all_tasks_blocked
|
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from . import _state
|
from . import _state
|
||||||
|
@ -132,19 +130,6 @@ async def _acquire_debug_lock(uid: Tuple[str, str]) -> AsyncIterator[None]:
|
||||||
log.error(f"TTY lock released by {task_name}:{uid}")
|
log.error(f"TTY lock released by {task_name}:{uid}")
|
||||||
|
|
||||||
|
|
||||||
def handler(signum, frame):
|
|
||||||
"""Block SIGINT while in debug to avoid deadlocks with cancellation.
|
|
||||||
"""
|
|
||||||
print(
|
|
||||||
"tractor ignores SIGINT while in debug mode\n"
|
|
||||||
"If you have a special need for it please open an issue.\n"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# don't allow those stdlib mofos to mess with sigint handler
|
|
||||||
pdbpp.pdb.Pdb.sigint_handler = handler
|
|
||||||
|
|
||||||
|
|
||||||
# @contextmanager
|
# @contextmanager
|
||||||
# def _disable_sigint():
|
# def _disable_sigint():
|
||||||
# try:
|
# try:
|
||||||
|
@ -269,14 +254,29 @@ def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
log.debug("Entering the synchronous world of pdb")
|
log.debug("Entering the synchronous world of pdb")
|
||||||
debug_func(actor)
|
debug_func(actor)
|
||||||
|
|
||||||
|
|
||||||
# user code **must** await this!
|
# user code **must** await this!
|
||||||
return _bp()
|
return _bp()
|
||||||
|
|
||||||
|
|
||||||
|
def _mk_pdb():
|
||||||
|
# XXX: setting these flags on the pdb instance are absolutely
|
||||||
|
# critical to having ctrl-c work in the ``trio`` standard way!
|
||||||
|
# The stdlib's pdb supports entering the current sync frame
|
||||||
|
# on a SIGINT, with ``trio`` we pretty much never want this
|
||||||
|
# and we did we can handle it in the ``tractor`` task runtime.
|
||||||
|
|
||||||
|
pdb = PdbwTeardown()
|
||||||
|
pdb.allow_kbdint = True
|
||||||
|
pdb.nosigint = True
|
||||||
|
|
||||||
|
return pdb
|
||||||
|
|
||||||
|
|
||||||
def _set_trace(actor):
|
def _set_trace(actor):
|
||||||
log.critical(f"\nAttaching pdb to actor: {actor.uid}\n")
|
log.critical(f"\nAttaching pdb to actor: {actor.uid}\n")
|
||||||
PdbwTeardown().set_trace(
|
|
||||||
|
pdb = _mk_pdb()
|
||||||
|
pdb.set_trace(
|
||||||
# start 2 levels up in user code
|
# start 2 levels up in user code
|
||||||
frame=sys._getframe().f_back.f_back,
|
frame=sys._getframe().f_back.f_back,
|
||||||
)
|
)
|
||||||
|
@ -290,8 +290,10 @@ breakpoint = partial(
|
||||||
|
|
||||||
def _post_mortem(actor):
|
def _post_mortem(actor):
|
||||||
log.critical(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
|
log.critical(f"\nAttaching to pdb in crashed actor: {actor.uid}\n")
|
||||||
|
pdb = _mk_pdb()
|
||||||
|
|
||||||
# custom Pdb post-mortem entry
|
# custom Pdb post-mortem entry
|
||||||
pdbpp.xpm(Pdb=PdbwTeardown)
|
pdbpp.xpm(Pdb=lambda: pdb)
|
||||||
|
|
||||||
|
|
||||||
post_mortem = partial(
|
post_mortem = partial(
|
||||||
|
|
|
@ -7,7 +7,6 @@ import signal
|
||||||
|
|
||||||
import trio # type: ignore
|
import trio # type: ignore
|
||||||
|
|
||||||
from ._actor import Actor
|
|
||||||
from .log import get_console_log, get_logger
|
from .log import get_console_log, get_logger
|
||||||
from . import _state
|
from . import _state
|
||||||
|
|
||||||
|
@ -16,7 +15,7 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def _mp_main(
|
def _mp_main(
|
||||||
actor: 'Actor',
|
actor: 'Actor', # type: ignore
|
||||||
accept_addr: Tuple[str, int],
|
accept_addr: Tuple[str, int],
|
||||||
forkserver_info: Tuple[Any, Any, Any, Any, Any],
|
forkserver_info: Tuple[Any, Any, Any, Any, Any],
|
||||||
start_method: str,
|
start_method: str,
|
||||||
|
@ -49,12 +48,15 @@ def _mp_main(
|
||||||
trio.run(trio_main)
|
trio.run(trio_main)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass # handle it the same way trio does?
|
pass # handle it the same way trio does?
|
||||||
|
|
||||||
|
finally:
|
||||||
log.info(f"Actor {actor.uid} terminated")
|
log.info(f"Actor {actor.uid} terminated")
|
||||||
|
|
||||||
|
|
||||||
def _trio_main(
|
def _trio_main(
|
||||||
actor: 'Actor',
|
actor: 'Actor', # type: ignore
|
||||||
parent_addr: Tuple[str, int] = None
|
*,
|
||||||
|
parent_addr: Tuple[str, int] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Entry point for a `trio_run_in_process` subactor.
|
"""Entry point for a `trio_run_in_process` subactor.
|
||||||
"""
|
"""
|
||||||
|
@ -86,4 +88,5 @@ def _trio_main(
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
log.warning(f"Actor {actor.uid} received KBI")
|
log.warning(f"Actor {actor.uid} received KBI")
|
||||||
|
|
||||||
|
finally:
|
||||||
log.info(f"Actor {actor.uid} terminated")
|
log.info(f"Actor {actor.uid} terminated")
|
||||||
|
|
Loading…
Reference in New Issue