Merge pull request #245 from goodboy/immediate_remote_cancels

Immediate remote cancels
pubsub_startup_response_msg
goodboy 2021-10-17 08:16:50 -04:00 committed by GitHub
commit 828754dbb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 591 additions and 398 deletions

View File

@ -1,3 +1,8 @@
'''
Test that a nested nursery will avoid clobbering
the debugger latched by a broken child.
'''
import trio
import tractor
@ -35,6 +40,7 @@ async def main():
"""
async with tractor.open_nursery(
debug_mode=True,
# loglevel='cancel',
) as n:
# spawn both actors

View File

@ -0,0 +1,13 @@
Change the core message loop to handle task and actor-runtime cancel
requests immediately instead of scheduling them as is done for rpc-task
requests.
In order to obtain more reliable teardown mechanics for (complex) actor
trees it's important that we specially treat cancel requests as having
higher priority. Previously, it was possible that task cancel requests
could actually also themselves be cancelled if a "actor-runtime" cancel
request was received (can happen during messy multi actor crashes that
propagate). Instead cancels now block the msg loop until serviced and
a response is relayed back to the requester. This also allows for
improved debugger support since we have determinism guarantees about
which processes must wait before hard killing their children.

View File

@ -4,5 +4,5 @@ now and use the default `fragment set`_.
.. _towncrier docs: https://github.com/twisted/towncrier#quick-start
.. _pluggy release readme: https://github.com/twisted/towncrier#quick-start
.. _pluggy release readme: https://github.com/pytest-dev/pluggy/blob/main/changelog/README.rst
.. _fragment set: https://github.com/twisted/towncrier#news-fragments

View File

@ -1,5 +1,6 @@
"""
Cancellation and error propagation
"""
import os
import signal
@ -365,7 +366,8 @@ async def test_nested_multierrors(loglevel, start_method):
# to happen before an actor is spawned
if isinstance(subexc, trio.Cancelled):
continue
else:
elif isinstance(subexc, tractor.RemoteActorError):
# on windows it seems we can't exactly be sure wtf
# will happen..
assert subexc.type in (
@ -373,6 +375,17 @@ async def test_nested_multierrors(loglevel, start_method):
trio.Cancelled,
trio.MultiError
)
elif isinstance(subexc, trio.MultiError):
for subsub in subexc.exceptions:
if subsub in (tractor.RemoteActorError,):
subsub = subsub.type
assert type(subsub) in (
trio.Cancelled,
trio.MultiError,
)
else:
assert isinstance(subexc, tractor.RemoteActorError)
@ -381,13 +394,14 @@ async def test_nested_multierrors(loglevel, start_method):
# on windows sometimes spawning is just too slow and
# we get back the (sent) cancel signal instead
if platform.system() == 'Windows':
assert (subexc.type is trio.MultiError) or (
subexc.type is tractor.RemoteActorError)
if isinstance(subexc, tractor.RemoteActorError):
assert subexc.type in (trio.MultiError, tractor.RemoteActorError)
else:
assert isinstance(subexc, trio.MultiError)
else:
assert subexc.type is trio.MultiError
else:
assert (subexc.type is tractor.RemoteActorError) or (
subexc.type is trio.Cancelled)
assert subexc.type in (tractor.RemoteActorError, trio.Cancelled)
@no_windows
@ -448,6 +462,7 @@ def test_cancel_via_SIGINT_other_task(
with pytest.raises(KeyboardInterrupt):
trio.run(main)
async def spin_for(period=3):
"Sync sleep."
time.sleep(period)

View File

@ -236,7 +236,8 @@ def test_subactor_breakpoint(spawn):
def test_multi_subactors(spawn):
"""Multiple subactors, both erroring and breakpointing as well as
"""
Multiple subactors, both erroring and breakpointing as well as
a nested subactor erroring.
"""
child = spawn(r'multi_subactors')
@ -259,6 +260,7 @@ def test_multi_subactors(spawn):
# first name_error failure
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error'" in before
assert "NameError" in before
# continue again
@ -267,6 +269,7 @@ def test_multi_subactors(spawn):
# 2nd name_error failure
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error_1'" in before
assert "NameError" in before
# breakpoint loop should re-engage
@ -275,6 +278,19 @@ def test_multi_subactors(spawn):
before = str(child.before.decode())
assert "Attaching pdb to actor: ('breakpoint_forever'" in before
# wait for spawn error to show up
while 'breakpoint_forever' in before:
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
# 2nd depth nursery should trigger
# child.sendline('c')
# child.expect(r"\(Pdb\+\+\)")
# before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('spawn_error'" in before
assert "RemoteActorError: ('name_error_1'" in before
# now run some "continues" to show re-entries
for _ in range(5):
child.sendline('c')
@ -284,16 +300,24 @@ def test_multi_subactors(spawn):
child.sendline('q')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
# debugger attaches to root
assert "Attaching to pdb in crashed actor: ('root'" in before
# expect a multierror with exceptions for each sub-actor
assert "RemoteActorError: ('breakpoint_forever'" in before
assert "RemoteActorError: ('name_error'" in before
assert "RemoteActorError: ('spawn_error'" in before
assert "RemoteActorError: ('name_error_1'" in before
assert 'bdb.BdbQuit' in before
# process should exit
child.sendline('c')
child.expect(pexpect.EOF)
# repeat of previous multierror for final output
before = str(child.before.decode())
assert "RemoteActorError: ('breakpoint_forever'" in before
assert "RemoteActorError: ('name_error'" in before
assert "RemoteActorError: ('spawn_error'" in before
assert "RemoteActorError: ('name_error_1'" in before
assert 'bdb.BdbQuit' in before
@ -387,16 +411,29 @@ def test_multi_subactors_root_errors(spawn):
before = str(child.before.decode())
assert "NameError: name 'doggypants' is not defined" in before
# continue again
# continue again to catch 2nd name error from
# actor 'name_error_1' (which is 2nd depth).
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
# should now get attached in root with assert error
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('name_error_1'" in before
assert "NameError" in before
# should have come just after priot prompt
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('spawn_error'" in before
# boxed error from previous step
assert "RemoteActorError: ('name_error_1'" in before
assert "NameError" in before
child.sendline('c')
child.expect(r"\(Pdb\+\+\)")
before = str(child.before.decode())
assert "Attaching to pdb in crashed actor: ('root'" in before
assert "AssertionError" in before
# boxed error from first level failure
assert "RemoteActorError: ('name_error'" in before
assert "NameError" in before
# warnings assert we probably don't need
# assert "Cancelling nursery in ('spawn_error'," in before
@ -406,6 +443,7 @@ def test_multi_subactors_root_errors(spawn):
child.expect(pexpect.EOF)
before = str(child.before.decode())
# error from root actor and root task that created top level nursery
assert "AssertionError" in before

View File

@ -180,6 +180,7 @@ def test_multi_actor_subs_arbiter_pub(
'streamer',
enable_modules=[__name__],
)
name = 'streamer'
even_portal = await n.run_in_actor(
subs,

View File

@ -49,6 +49,7 @@ async def _invoke(
chan: Channel,
func: typing.Callable,
kwargs: Dict[str, Any],
is_rpc: bool = True,
task_status: TaskStatus[
Union[trio.CancelScope, BaseException]
] = trio.TASK_STATUS_IGNORED,
@ -243,10 +244,11 @@ async def _invoke(
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
is_complete.set()
except KeyError:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warning(
f"Task {func} likely errored or cancelled before it started")
if is_rpc:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warning(
f"Task {func} likely errored or cancelled before it started")
finally:
if not actor._rpc_tasks:
log.runtime("All RPC tasks have completed")
@ -503,8 +505,8 @@ class Actor:
log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.runtime("Signalling no more peer channels")
self._no_more_peers.set()
# # XXX: is this necessary (GC should do it?)
if chan.connected():
@ -671,16 +673,39 @@ class Actor:
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(self, funcname)
if funcname == 'cancel':
# don't start entire actor runtime cancellation if this
# actor is in debug mode
pdb_complete = _debug._local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery shutdown
with trio.CancelScope(shield=True):
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
# await self._cancel_complete.wait()
loop_cs.cancel()
break
if funcname == '_cancel_task':
# XXX: a special case is made here for
# remote calls since we don't want the
# remote actor have to know which channel
# the task is associated with and we can't
# pass non-primitive types between actors.
# This means you can use:
# Portal.run('self', '_cancel_task, cid=did)
# without passing the `chan` arg.
kwargs['chan'] = chan
# we immediately start the runtime machinery shutdown
with trio.CancelScope(shield=True):
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
kwargs['chan'] = chan
log.cancel(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
continue
else:
# complain to client about restricted modules
try:
@ -699,44 +724,36 @@ class Actor:
partial(_invoke, self, cid, chan, func, kwargs),
name=funcname,
)
except RuntimeError:
except (RuntimeError, trio.MultiError):
# avoid reporting a benign race condition
# during actor runtime teardown.
nursery_cancelled_before_task = True
break
# never allow cancelling cancel requests (results in
# deadlock and other weird behaviour)
if func != self.cancel:
if isinstance(cs, Exception):
log.warning(
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
else:
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
# if func != self.cancel:
if isinstance(cs, Exception):
log.warning(
f"Actor {self.uid} was remotely cancelled; "
"waiting on cancellation completion..")
await self._cancel_complete.wait()
loop_cs.cancel()
break
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event()
log.runtime(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
else:
# channel disconnect
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
await self.cancel_rpc_tasks(chan)
# end of async for, channel disconnect vis ``trio.EndOfChannel``
log.runtime(
f"{chan} for {chan.uid} disconnected, cancelling tasks"
)
await self.cancel_rpc_tasks(chan)
except (
TransportClosed,
@ -947,6 +964,9 @@ class Actor:
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err:
log.info("Closing all actor lifetime contexts")
_lifetime_stack.close()
if not registered_with_arbiter:
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
@ -976,11 +996,21 @@ class Actor:
raise
finally:
log.runtime("Root nursery complete")
log.info("Runtime nursery complete")
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
log.cancel("Closing all actor lifetime contexts")
log.info("Closing all actor lifetime contexts")
# TODO: we can't actually do this bc the debugger
# uses the _service_n to spawn the lock task, BUT,
# in theory if we had the root nursery surround this finally
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if self.name == 'brokerd.ib':
# with trio.CancelScope(shield=True):
# await _debug.breakpoint()
_lifetime_stack.close()
# Unregister actor from the arbiter
@ -1065,7 +1095,7 @@ class Actor:
self._service_n.start_soon(self.cancel)
async def cancel(self) -> bool:
"""Cancel this actor.
"""Cancel this actor's runtime.
The "deterministic" teardown sequence in order is:
- cancel all ongoing rpc tasks by cancel scope
@ -1099,7 +1129,7 @@ class Actor:
if self._service_n:
self._service_n.cancel_scope.cancel()
log.cancel(f"{self.uid} was sucessfullly cancelled")
log.cancel(f"{self.uid} called `Actor.cancel()`")
self._cancel_complete.set()
return True
@ -1158,18 +1188,20 @@ class Actor:
registered for each.
"""
tasks = self._rpc_tasks
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (chan, cid) in tasks.copy():
if only_chan is not None:
if only_chan != chan:
continue
if tasks:
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
if only_chan is not None:
if only_chan != chan:
continue
# TODO: this should really done in a nursery batch
await self._cancel_task(cid, chan)
# TODO: this should really done in a nursery batch
if func != self._cancel_task:
await self._cancel_task(cid, chan)
log.cancel(
f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait()
log.cancel(
f"Waiting for remaining rpc tasks to complete {tasks}")
await self._ongoing_rpc_tasks.wait()
def cancel_server(self) -> None:
"""Cancel the internal channel server nursery thereby

View File

@ -5,16 +5,23 @@ Multi-core debugging for da peeps!
import bdb
import sys
from functools import partial
from contextlib import asynccontextmanager
from typing import Tuple, Optional, Callable, AsyncIterator
from contextlib import asynccontextmanager as acm
from typing import (
Tuple,
Optional,
Callable,
AsyncIterator,
AsyncGenerator,
)
import tractor
import trio
from trio_typing import TaskStatus
from .log import get_logger
from . import _state
from ._discovery import get_root
from ._state import is_root_process
from ._state import is_root_process, debug_mode
from ._exceptions import is_multi_cancelled
try:
@ -122,7 +129,7 @@ class PdbwTeardown(pdbpp.Pdb):
# break
@asynccontextmanager
@acm
async def _acquire_debug_lock(
uid: Tuple[str, str]
@ -139,7 +146,7 @@ async def _acquire_debug_lock(
task_name = trio.lowlevel.current_task().name
log.pdb(
log.debug(
f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}"
)
@ -187,7 +194,7 @@ async def _acquire_debug_lock(
if (
not stats.owner
):
log.pdb(f"No more tasks waiting on tty lock! says {uid}")
log.debug(f"No more tasks waiting on tty lock! says {uid}")
_no_remote_has_tty.set()
_no_remote_has_tty = None
@ -219,7 +226,8 @@ async def _hijack_stdin_for_child(
subactor_uid: Tuple[str, str]
) -> str:
'''Hijack the tty in the root process of an actor tree such that
'''
Hijack the tty in the root process of an actor tree such that
the pdbpp debugger console can be allocated to a sub-actor for repl
bossing.
@ -254,6 +262,8 @@ async def _hijack_stdin_for_child(
# assert await stream.receive() == 'pdb_unlock'
except (
# BaseException,
trio.MultiError,
trio.BrokenResourceError,
trio.Cancelled, # by local cancellation
trio.ClosedResourceError, # by self._rx_chan
@ -268,12 +278,74 @@ async def _hijack_stdin_for_child(
if isinstance(err, trio.Cancelled):
raise
log.debug(f"TTY lock released, remote task: {task_name}:{subactor_uid}")
finally:
log.debug(
"TTY lock released, remote task:"
f"{task_name}:{subactor_uid}")
return "pdb_unlock_complete"
async def wait_for_parent_stdin_hijack(
actor_uid: Tuple[str, str],
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED
):
'''
Connect to the root actor via a ctx and invoke a task which locks
a root-local TTY lock.
This function is used by any sub-actor to acquire mutex access to
pdb and the root's TTY for interactive debugging (see below inside
``_breakpoint()``). It can be used to ensure that an intermediate
nursery-owning actor does not clobber its children if they are in
debug (see below inside ``maybe_wait_for_debugger()``).
'''
global _debugger_request_cs
with trio.CancelScope(shield=True) as cs:
_debugger_request_cs = cs
try:
async with get_root() as portal:
# this syncs to child's ``Context.started()`` call.
async with portal.open_context(
tractor._debug._hijack_stdin_for_child,
subactor_uid=actor_uid,
) as (ctx, val):
log.pdb('locked context')
assert val == 'Locked'
async with ctx.open_stream() as stream:
# unblock local caller
task_status.started(cs)
try:
assert _local_pdb_complete
await _local_pdb_complete.wait()
finally:
# TODO: shielding currently can cause hangs...
with trio.CancelScope(shield=True):
await stream.send('pdb_unlock')
# sync with callee termination
assert await ctx.result() == "pdb_unlock_complete"
except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock')
finally:
log.debug(f"Exiting debugger for actor {actor_uid}")
global _local_task_in_debug
_local_task_in_debug = None
log.debug(f"Child {actor_uid} released parent stdio lock")
async def _breakpoint(
debug_func,
@ -300,56 +372,6 @@ async def _breakpoint(
await trio.lowlevel.checkpoint()
async def wait_for_parent_stdin_hijack(
task_status=trio.TASK_STATUS_IGNORED
):
global _debugger_request_cs
with trio.CancelScope(shield=True) as cs:
_debugger_request_cs = cs
try:
async with get_root() as portal:
log.pdb('got portal')
# this syncs to child's ``Context.started()`` call.
async with portal.open_context(
tractor._debug._hijack_stdin_for_child,
subactor_uid=actor.uid,
) as (ctx, val):
log.pdb('locked context')
assert val == 'Locked'
async with ctx.open_stream() as stream:
log.error('opened stream')
# unblock local caller
task_status.started()
try:
await _local_pdb_complete.wait()
finally:
# TODO: shielding currently can cause hangs...
with trio.CancelScope(shield=True):
await stream.send('pdb_unlock')
# sync with callee termination
assert await ctx.result() == "pdb_unlock_complete"
except tractor.ContextCancelled:
log.warning('Root actor cancelled debug lock')
finally:
log.debug(f"Exiting debugger for actor {actor}")
global _local_task_in_debug
_local_task_in_debug = None
log.debug(f"Child {actor} released parent stdio lock")
if not _local_pdb_complete or _local_pdb_complete.is_set():
_local_pdb_complete = trio.Event()
@ -386,7 +408,10 @@ async def _breakpoint(
# cancel on this task start? I *think* this works below?
# actor._service_n.cancel_scope.shield = shield
with trio.CancelScope(shield=True):
await actor._service_n.start(wait_for_parent_stdin_hijack)
await actor._service_n.start(
wait_for_parent_stdin_hijack,
actor.uid,
)
elif is_root_process():
@ -407,11 +432,10 @@ async def _breakpoint(
'Root actor attempting to shield-acquire active tty lock'
f' owned by {_global_actor_in_debug}')
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
with trio.CancelScope(shield=True):
# must shield here to avoid hitting a ``Cancelled`` and
# a child getting stuck bc we clobbered the tty
await _debug_lock.acquire()
else:
# may be cancelled
await _debug_lock.acquire()
@ -501,7 +525,7 @@ post_mortem = partial(
async def _maybe_enter_pm(err):
if (
_state.debug_mode()
debug_mode()
# NOTE: don't enter debug mode recursively after quitting pdb
# Iow, don't re-enter the repl if the `quit` command was issued
@ -524,3 +548,80 @@ async def _maybe_enter_pm(err):
else:
return False
@acm
async def acquire_debug_lock(
subactor_uid: Tuple[str, str],
) -> AsyncGenerator[None, tuple]:
'''
Grab root's debug lock on entry, release on exit.
'''
async with trio.open_nursery() as n:
cs = await n.start(
wait_for_parent_stdin_hijack,
subactor_uid,
)
yield None
cs.cancel()
async def maybe_wait_for_debugger(
poll_steps: int = 2,
poll_delay: float = 0.1,
) -> None:
if not debug_mode():
return
if (
is_root_process()
):
global _no_remote_has_tty, _global_actor_in_debug, _wait_all_tasks_lock
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
sub_in_debug = None
for _ in range(poll_steps):
if _global_actor_in_debug:
sub_in_debug = tuple(_global_actor_in_debug)
log.warning(
'Root polling for debug')
with trio.CancelScope(shield=True):
await trio.sleep(poll_delay)
# TODO: could this make things more deterministic? wait
# to see if a sub-actor task will be scheduled and grab
# the tty lock on the next tick?
# XXX: doesn't seem to work
# await trio.testing.wait_all_tasks_blocked(cushion=0)
debug_complete = _no_remote_has_tty
if (
(debug_complete and
not debug_complete.is_set())
):
log.warning(
'Root has errored but pdb is in use by '
f'child {sub_in_debug}\n'
'Waiting on tty lock to release..')
await debug_complete.wait()
await trio.sleep(poll_delay)
continue
else:
log.warning(
'Root acquired TTY LOCK'
)
return

View File

@ -44,7 +44,7 @@ async def get_arbiter(
@asynccontextmanager
async def get_root(
**kwargs,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
) -> typing.AsyncGenerator[Portal, None]:
host, port = _runtime_vars['_root_mailbox']
assert host is not None

View File

@ -1,5 +1,6 @@
"""
Machinery for actor process spawning using multiple backends.
"""
import sys
import multiprocessing as mp
@ -8,7 +9,6 @@ from typing import Any, Dict, Optional
import trio
from trio_typing import TaskStatus
from async_generator import asynccontextmanager
try:
from multiprocessing import semaphore_tracker # type: ignore
@ -22,9 +22,15 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override
from ._debug import (
maybe_wait_for_debugger,
acquire_debug_lock,
)
from ._state import (
current_actor,
is_main_process,
is_root_process,
debug_mode,
)
from .log import get_logger
@ -123,44 +129,43 @@ async def cancel_on_completion(
portal: Portal,
actor: Actor,
errors: Dict[Tuple[str, str], Exception],
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Cancel actor gracefully once it's "main" portal's
"""
Cancel actor gracefully once it's "main" portal's
result arrives.
Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.CancelScope() as cs:
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
)
task_status.started(cs)
else:
log.runtime(
f"Cancelling {portal.channel.uid} gracefully "
f"after result {result}")
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
)
else:
log.runtime(
f"Cancelling {portal.channel.uid} gracefully "
f"after result {result}")
# cancel the process now that we have a final result
await portal.cancel_actor()
# cancel the process now that we have a final result
await portal.cancel_actor()
async def do_hard_kill(
proc: trio.Process,
terminate_after: int = 3,
) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
with trio.move_on_after(3) as cs:
with trio.move_on_after(terminate_after) as cs:
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
@ -174,108 +179,112 @@ async def do_hard_kill(
proc.kill()
@asynccontextmanager
async def spawn_subactor(
subactor: 'Actor',
parent_addr: Tuple[str, int],
):
spawn_cmd = [
sys.executable,
"-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# We provide the child's unique identifier on this exec/spawn
# line for debugging purposes when viewing the process tree from
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
str(parent_addr)
]
if subactor.loglevel:
spawn_cmd += [
"--loglevel",
subactor.loglevel
]
proc = await trio.open_process(spawn_cmd)
try:
yield proc
finally:
log.runtime(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early
# since trio does this internally on ``__aexit__()``
await do_hard_kill(proc)
async def new_proc(
name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor,
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
cancel_scope = None
) -> None:
"""
Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
uid = subactor.uid
if _spawn_method == 'trio':
async with trio.open_nursery() as nursery:
async with spawn_subactor(
subactor,
parent_addr,
) as proc:
log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
spawn_cmd = [
sys.executable,
"-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# We provide the child's unique identifier on this exec/spawn
# line for debugging purposes when viewing the process tree from
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
str(parent_addr)
]
if subactor.loglevel:
spawn_cmd += [
"--loglevel",
subactor.loglevel
]
cancelled_during_spawn: bool = False
try:
proc = await trio.open_process(spawn_cmd)
log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
try:
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (
subactor, proc, portal)
except trio.Cancelled:
cancelled_during_spawn = True
# we may cancel before the child connects back in which
# case avoid clobbering the pdb tty.
if debug_mode():
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if is_root_process():
await maybe_wait_for_debugger()
else:
async with acquire_debug_lock(uid):
# soft wait on the proc to terminate
with trio.move_on_after(0.5):
await proc.wait()
raise
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
})
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (
subactor, proc, portal)
# track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
})
# resume caller at next checkpoint now that child is up
task_status.started(portal)
# track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# wait for ActorNursery.wait() to be called
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
# resume caller at next checkpoint now that child is up
task_status.started(portal)
# wait for ActorNursery.wait() to be called
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
cancel_scope = await nursery.start(
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
@ -285,32 +294,45 @@ async def new_proc(
# Wait for proc termination but **dont' yet** call
# ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace).
# TODO: No idea how we can enforce zombie
# reaping more stringently without the shield
# we used to have below...
# with trio.CancelScope(shield=True):
# async with proc:
# Always "hard" join sub procs since no actor zombies
# are allowed!
# this is a "light" (cancellable) join, the hard join is
# in the enclosing scope (see above).
# This is a "soft" (cancellable) join/reap.
await proc.wait()
log.debug(f"Joined {proc}")
# pop child entry to indicate we no longer managing this subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel()
nursery.cancel_scope.cancel()
finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early.
log.cancel(f'Hard reap sequence starting for {uid}')
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if cancelled_during_spawn:
# Try again to avoid TTY clobbering.
async with acquire_debug_lock(uid):
with trio.move_on_after(0.5):
await proc.wait()
if is_root_process():
await maybe_wait_for_debugger()
if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}")
await do_hard_kill(proc)
log.debug(f"Joined {proc}")
if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this
# subactor
actor_nursery._children.pop(subactor.uid)
else:
# `multiprocessing`
# async with trio.open_nursery() as nursery:
@ -341,141 +363,124 @@ async def mp_new_proc(
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
async with trio.open_nursery() as nursery:
assert _ctx
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream
# children
# forkserver.set_forkserver_preload(enable_modules)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(
resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd,
)
else:
assert curr_actor._forkserver_info
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
fs._forkserver_pid,
resource_tracker._resource_tracker._pid,
resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info
assert _ctx
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream
# children
# forkserver.set_forkserver_preload(enable_modules)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(
resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd,
)
else:
fs_info = (None, None, None, None, None)
assert curr_actor._forkserver_info
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
fs._forkserver_pid,
resource_tracker._resource_tracker._pid,
resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info
else:
fs_info = (None, None, None, None, None)
proc: mp.Process = _ctx.Process( # type: ignore
target=_mp_main,
args=(
subactor,
bind_addr,
fs_info,
start_method,
parent_addr,
),
# daemon=True,
name=name,
)
# `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
actor_nursery._children[subactor.uid] = (subactor, proc, None)
proc: mp.Process = _ctx.Process( # type: ignore
target=_mp_main,
args=(
subactor,
bind_addr,
fs_info,
start_method,
parent_addr,
),
# daemon=True,
name=name,
)
# `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
actor_nursery._children[subactor.uid] = (subactor, proc, None)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.runtime(f"Started {proc}")
log.runtime(f"Started {proc}")
try:
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (subactor, proc, portal)
try:
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
# except:
# TODO: in the case we were cancelled before the sub-proc
# registered itself back we must be sure to try and clean
# any process we may have started.
# unblock parent task
task_status.started(portal)
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (subactor, proc, portal)
# wait for ``ActorNursery`` block to signal that
# subprocesses can be waited upon.
# This is required to ensure synchronization
# with user code that may want to manually await results
# from nursery spawned sub-actors. We don't want the
# containing nurseries here to collect results or error
# while user code is still doing it's thing. Only after the
# nursery block closes do we allow subactor results to be
# awaited and reported upwards to the supervisor.
# unblock parent task
task_status.started(portal)
# wait for ``ActorNursery`` block to signal that
# subprocesses can be waited upon.
# This is required to ensure synchronization
# with user code that may want to manually await results
# from nursery spawned sub-actors. We don't want the
# containing nurseries here to collect results or error
# while user code is still doing it's thing. Only after the
# nursery block closes do we allow subactor results to be
# awaited and reported upwards to the supervisor.
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
finally:
# XXX: in the case we were cancelled before the sub-proc
# registered itself back we must be sure to try and clean
# any process we may have started.
reaping_cancelled: bool = False
cancel_scope: Optional[trio.CancelScope] = None
cancel_exc: Optional[trio.Cancelled] = None
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
try:
# async with trio.open_nursery() as n:
# n.cancel_scope.shield = True
cancel_scope = await nursery.start(
cancel_on_completion,
portal,
subactor,
errors
)
except trio.Cancelled as err:
cancel_exc = err
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
# if the reaping task was cancelled we may have hit
# a race where the subproc disconnected before we
# could send it a message to cancel (classic 2 generals)
# in that case, wait shortly then kill the process.
reaping_cancelled = True
if proc.is_alive():
with trio.move_on_after(0.1) as cs:
cs.shield = True
await proc_waiter(proc)
if cs.cancelled_caught:
proc.terminate()
if not reaping_cancelled and proc.is_alive():
await proc_waiter(proc)
# TODO: timeout block here?
proc.join()
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
await proc_waiter(proc)
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel()
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()
elif reaping_cancelled: # let the cancellation bubble up
assert cancel_exc
raise cancel_exc
finally:
# hard reap sequence
if proc.is_alive():
log.cancel(f"Attempting to hard kill {proc}")
with trio.move_on_after(0.1) as cs:
cs.shield = True
await proc_waiter(proc)
if cs.cancelled_caught:
proc.terminate()
proc.join()
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)

View File

@ -12,6 +12,7 @@ import trio
from async_generator import asynccontextmanager
from . import _debug
from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process, is_root_process
from .log import get_logger, get_loglevel
from ._actor import Actor
@ -280,26 +281,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
if is_root_process():
# TODO: could this make things more deterministic?
# wait to see if a sub-actor task will be
# scheduled and grab the tty lock on the next
# tick?
# await trio.testing.wait_all_tasks_blocked()
debug_complete = _debug._no_remote_has_tty
if (
debug_complete and
not debug_complete.is_set()
):
log.warning(
'Root has errored but pdb is in use by '
f'child {_debug._global_actor_in_debug}\n'
'Waiting on tty lock to release..')
# with trio.CancelScope(shield=True):
await debug_complete.wait()
await maybe_wait_for_debugger()
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't