diff --git a/examples/debugging/multi_subactor_root_errors.py b/examples/debugging/multi_subactor_root_errors.py index 6c69618..640f222 100644 --- a/examples/debugging/multi_subactor_root_errors.py +++ b/examples/debugging/multi_subactor_root_errors.py @@ -1,3 +1,8 @@ +''' +Test that a nested nursery will avoid clobbering +the debugger latched by a broken child. + +''' import trio import tractor @@ -35,6 +40,7 @@ async def main(): """ async with tractor.open_nursery( debug_mode=True, + # loglevel='cancel', ) as n: # spawn both actors diff --git a/newsfragments/245.feature.rst b/newsfragments/245.feature.rst new file mode 100644 index 0000000..e2754c3 --- /dev/null +++ b/newsfragments/245.feature.rst @@ -0,0 +1,13 @@ +Change the core message loop to handle task and actor-runtime cancel +requests immediately instead of scheduling them as is done for rpc-task +requests. + +In order to obtain more reliable teardown mechanics for (complex) actor +trees it's important that we specially treat cancel requests as having +higher priority. Previously, it was possible that task cancel requests +could actually also themselves be cancelled if a "actor-runtime" cancel +request was received (can happen during messy multi actor crashes that +propagate). Instead cancels now block the msg loop until serviced and +a response is relayed back to the requester. This also allows for +improved debugger support since we have determinism guarantees about +which processes must wait before hard killing their children. diff --git a/newsfragments/HOWTO.rst b/newsfragments/HOWTO.rst index a0eccc6..f132f0c 100644 --- a/newsfragments/HOWTO.rst +++ b/newsfragments/HOWTO.rst @@ -4,5 +4,5 @@ now and use the default `fragment set`_. .. _towncrier docs: https://github.com/twisted/towncrier#quick-start -.. _pluggy release readme: https://github.com/twisted/towncrier#quick-start +.. _pluggy release readme: https://github.com/pytest-dev/pluggy/blob/main/changelog/README.rst .. _fragment set: https://github.com/twisted/towncrier#news-fragments diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 5da87ce..a589f81 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -1,5 +1,6 @@ """ Cancellation and error propagation + """ import os import signal @@ -365,7 +366,8 @@ async def test_nested_multierrors(loglevel, start_method): # to happen before an actor is spawned if isinstance(subexc, trio.Cancelled): continue - else: + + elif isinstance(subexc, tractor.RemoteActorError): # on windows it seems we can't exactly be sure wtf # will happen.. assert subexc.type in ( @@ -373,6 +375,17 @@ async def test_nested_multierrors(loglevel, start_method): trio.Cancelled, trio.MultiError ) + + elif isinstance(subexc, trio.MultiError): + for subsub in subexc.exceptions: + + if subsub in (tractor.RemoteActorError,): + subsub = subsub.type + + assert type(subsub) in ( + trio.Cancelled, + trio.MultiError, + ) else: assert isinstance(subexc, tractor.RemoteActorError) @@ -381,13 +394,14 @@ async def test_nested_multierrors(loglevel, start_method): # on windows sometimes spawning is just too slow and # we get back the (sent) cancel signal instead if platform.system() == 'Windows': - assert (subexc.type is trio.MultiError) or ( - subexc.type is tractor.RemoteActorError) + if isinstance(subexc, tractor.RemoteActorError): + assert subexc.type in (trio.MultiError, tractor.RemoteActorError) + else: + assert isinstance(subexc, trio.MultiError) else: assert subexc.type is trio.MultiError else: - assert (subexc.type is tractor.RemoteActorError) or ( - subexc.type is trio.Cancelled) + assert subexc.type in (tractor.RemoteActorError, trio.Cancelled) @no_windows @@ -448,6 +462,7 @@ def test_cancel_via_SIGINT_other_task( with pytest.raises(KeyboardInterrupt): trio.run(main) + async def spin_for(period=3): "Sync sleep." time.sleep(period) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 87bce1b..bd1cba6 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -236,7 +236,8 @@ def test_subactor_breakpoint(spawn): def test_multi_subactors(spawn): - """Multiple subactors, both erroring and breakpointing as well as + """ + Multiple subactors, both erroring and breakpointing as well as a nested subactor erroring. """ child = spawn(r'multi_subactors') @@ -259,6 +260,7 @@ def test_multi_subactors(spawn): # first name_error failure child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "NameError" in before # continue again @@ -267,6 +269,7 @@ def test_multi_subactors(spawn): # 2nd name_error failure child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('name_error_1'" in before assert "NameError" in before # breakpoint loop should re-engage @@ -275,6 +278,19 @@ def test_multi_subactors(spawn): before = str(child.before.decode()) assert "Attaching pdb to actor: ('breakpoint_forever'" in before + # wait for spawn error to show up + while 'breakpoint_forever' in before: + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) + + # 2nd depth nursery should trigger + # child.sendline('c') + # child.expect(r"\(Pdb\+\+\)") + # before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('spawn_error'" in before + assert "RemoteActorError: ('name_error_1'" in before + # now run some "continues" to show re-entries for _ in range(5): child.sendline('c') @@ -284,16 +300,24 @@ def test_multi_subactors(spawn): child.sendline('q') child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) + # debugger attaches to root assert "Attaching to pdb in crashed actor: ('root'" in before + # expect a multierror with exceptions for each sub-actor assert "RemoteActorError: ('breakpoint_forever'" in before + assert "RemoteActorError: ('name_error'" in before + assert "RemoteActorError: ('spawn_error'" in before + assert "RemoteActorError: ('name_error_1'" in before assert 'bdb.BdbQuit' in before # process should exit child.sendline('c') child.expect(pexpect.EOF) - + # repeat of previous multierror for final output before = str(child.before.decode()) assert "RemoteActorError: ('breakpoint_forever'" in before + assert "RemoteActorError: ('name_error'" in before + assert "RemoteActorError: ('spawn_error'" in before + assert "RemoteActorError: ('name_error_1'" in before assert 'bdb.BdbQuit' in before @@ -387,16 +411,29 @@ def test_multi_subactors_root_errors(spawn): before = str(child.before.decode()) assert "NameError: name 'doggypants' is not defined" in before - # continue again + # continue again to catch 2nd name error from + # actor 'name_error_1' (which is 2nd depth). child.sendline('c') child.expect(r"\(Pdb\+\+\)") - - # should now get attached in root with assert error before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('name_error_1'" in before + assert "NameError" in before - # should have come just after priot prompt + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('spawn_error'" in before + # boxed error from previous step + assert "RemoteActorError: ('name_error_1'" in before + assert "NameError" in before + + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) assert "Attaching to pdb in crashed actor: ('root'" in before - assert "AssertionError" in before + # boxed error from first level failure + assert "RemoteActorError: ('name_error'" in before + assert "NameError" in before # warnings assert we probably don't need # assert "Cancelling nursery in ('spawn_error'," in before @@ -406,6 +443,7 @@ def test_multi_subactors_root_errors(spawn): child.expect(pexpect.EOF) before = str(child.before.decode()) + # error from root actor and root task that created top level nursery assert "AssertionError" in before diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index a548537..6243a4e 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -180,6 +180,7 @@ def test_multi_actor_subs_arbiter_pub( 'streamer', enable_modules=[__name__], ) + name = 'streamer' even_portal = await n.run_in_actor( subs, diff --git a/tractor/_actor.py b/tractor/_actor.py index 6aafa11..e85b16b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -49,6 +49,7 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: Dict[str, Any], + is_rpc: bool = True, task_status: TaskStatus[ Union[trio.CancelScope, BaseException] ] = trio.TASK_STATUS_IGNORED, @@ -243,10 +244,11 @@ async def _invoke( scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) is_complete.set() except KeyError: - # If we're cancelled before the task returns then the - # cancel scope will not have been inserted yet - log.warning( - f"Task {func} likely errored or cancelled before it started") + if is_rpc: + # If we're cancelled before the task returns then the + # cancel scope will not have been inserted yet + log.warning( + f"Task {func} likely errored or cancelled before it started") finally: if not actor._rpc_tasks: log.runtime("All RPC tasks have completed") @@ -503,8 +505,8 @@ class Actor: log.runtime(f"Peers is {self._peers}") if not self._peers: # no more channels connected - self._no_more_peers.set() log.runtime("Signalling no more peer channels") + self._no_more_peers.set() # # XXX: is this necessary (GC should do it?) if chan.connected(): @@ -671,16 +673,39 @@ class Actor: f"{ns}.{funcname}({kwargs})") if ns == 'self': func = getattr(self, funcname) + if funcname == 'cancel': + + # don't start entire actor runtime cancellation if this + # actor is in debug mode + pdb_complete = _debug._local_pdb_complete + if pdb_complete: + await pdb_complete.wait() + + # we immediately start the runtime machinery shutdown + with trio.CancelScope(shield=True): + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + log.cancel( + f"Actor {self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + # await self._cancel_complete.wait() + + loop_cs.cancel() + break + if funcname == '_cancel_task': - # XXX: a special case is made here for - # remote calls since we don't want the - # remote actor have to know which channel - # the task is associated with and we can't - # pass non-primitive types between actors. - # This means you can use: - # Portal.run('self', '_cancel_task, cid=did) - # without passing the `chan` arg. - kwargs['chan'] = chan + + # we immediately start the runtime machinery shutdown + with trio.CancelScope(shield=True): + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + kwargs['chan'] = chan + log.cancel( + f"Actor {self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + continue else: # complain to client about restricted modules try: @@ -699,44 +724,36 @@ class Actor: partial(_invoke, self, cid, chan, func, kwargs), name=funcname, ) - except RuntimeError: + except (RuntimeError, trio.MultiError): # avoid reporting a benign race condition # during actor runtime teardown. nursery_cancelled_before_task = True + break # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) - if func != self.cancel: - if isinstance(cs, Exception): - log.warning( - f"Task for RPC func {func} failed with" - f"{cs}") - else: - # mark that we have ongoing rpc tasks - self._ongoing_rpc_tasks = trio.Event() - log.runtime(f"RPC func is {func}") - # store cancel scope such that the rpc task can be - # cancelled gracefully if requested - self._rpc_tasks[(chan, cid)] = ( - cs, func, trio.Event()) - else: - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` + # if func != self.cancel: + if isinstance(cs, Exception): log.warning( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await self._cancel_complete.wait() - loop_cs.cancel() - break + f"Task for RPC func {func} failed with" + f"{cs}") + else: + # mark that we have ongoing rpc tasks + self._ongoing_rpc_tasks = trio.Event() + log.runtime(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + self._rpc_tasks[(chan, cid)] = ( + cs, func, trio.Event()) log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") - else: - # channel disconnect - log.runtime( - f"{chan} for {chan.uid} disconnected, cancelling tasks" - ) - await self.cancel_rpc_tasks(chan) + + # end of async for, channel disconnect vis ``trio.EndOfChannel`` + log.runtime( + f"{chan} for {chan.uid} disconnected, cancelling tasks" + ) + await self.cancel_rpc_tasks(chan) except ( TransportClosed, @@ -947,6 +964,9 @@ class Actor: # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) except Exception as err: + log.info("Closing all actor lifetime contexts") + _lifetime_stack.close() + if not registered_with_arbiter: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger @@ -976,11 +996,21 @@ class Actor: raise finally: - log.runtime("Root nursery complete") + log.info("Runtime nursery complete") # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? - log.cancel("Closing all actor lifetime contexts") + log.info("Closing all actor lifetime contexts") + + # TODO: we can't actually do this bc the debugger + # uses the _service_n to spawn the lock task, BUT, + # in theory if we had the root nursery surround this finally + # block it might be actually possible to debug THIS + # machinery in the same way as user task code? + # if self.name == 'brokerd.ib': + # with trio.CancelScope(shield=True): + # await _debug.breakpoint() + _lifetime_stack.close() # Unregister actor from the arbiter @@ -1065,7 +1095,7 @@ class Actor: self._service_n.start_soon(self.cancel) async def cancel(self) -> bool: - """Cancel this actor. + """Cancel this actor's runtime. The "deterministic" teardown sequence in order is: - cancel all ongoing rpc tasks by cancel scope @@ -1099,7 +1129,7 @@ class Actor: if self._service_n: self._service_n.cancel_scope.cancel() - log.cancel(f"{self.uid} was sucessfullly cancelled") + log.cancel(f"{self.uid} called `Actor.cancel()`") self._cancel_complete.set() return True @@ -1158,18 +1188,20 @@ class Actor: registered for each. """ tasks = self._rpc_tasks - log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") - for (chan, cid) in tasks.copy(): - if only_chan is not None: - if only_chan != chan: - continue + if tasks: + log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") + for (chan, cid), (scope, func, is_complete) in tasks.copy().items(): + if only_chan is not None: + if only_chan != chan: + continue - # TODO: this should really done in a nursery batch - await self._cancel_task(cid, chan) + # TODO: this should really done in a nursery batch + if func != self._cancel_task: + await self._cancel_task(cid, chan) - log.cancel( - f"Waiting for remaining rpc tasks to complete {tasks}") - await self._ongoing_rpc_tasks.wait() + log.cancel( + f"Waiting for remaining rpc tasks to complete {tasks}") + await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby diff --git a/tractor/_debug.py b/tractor/_debug.py index 6e1d7f0..67485af 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -5,16 +5,23 @@ Multi-core debugging for da peeps! import bdb import sys from functools import partial -from contextlib import asynccontextmanager -from typing import Tuple, Optional, Callable, AsyncIterator +from contextlib import asynccontextmanager as acm +from typing import ( + Tuple, + Optional, + Callable, + AsyncIterator, + AsyncGenerator, +) import tractor import trio +from trio_typing import TaskStatus from .log import get_logger from . import _state from ._discovery import get_root -from ._state import is_root_process +from ._state import is_root_process, debug_mode from ._exceptions import is_multi_cancelled try: @@ -122,7 +129,7 @@ class PdbwTeardown(pdbpp.Pdb): # break -@asynccontextmanager +@acm async def _acquire_debug_lock( uid: Tuple[str, str] @@ -139,7 +146,7 @@ async def _acquire_debug_lock( task_name = trio.lowlevel.current_task().name - log.pdb( + log.debug( f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}" ) @@ -187,7 +194,7 @@ async def _acquire_debug_lock( if ( not stats.owner ): - log.pdb(f"No more tasks waiting on tty lock! says {uid}") + log.debug(f"No more tasks waiting on tty lock! says {uid}") _no_remote_has_tty.set() _no_remote_has_tty = None @@ -219,7 +226,8 @@ async def _hijack_stdin_for_child( subactor_uid: Tuple[str, str] ) -> str: - '''Hijack the tty in the root process of an actor tree such that + ''' + Hijack the tty in the root process of an actor tree such that the pdbpp debugger console can be allocated to a sub-actor for repl bossing. @@ -254,6 +262,8 @@ async def _hijack_stdin_for_child( # assert await stream.receive() == 'pdb_unlock' except ( + # BaseException, + trio.MultiError, trio.BrokenResourceError, trio.Cancelled, # by local cancellation trio.ClosedResourceError, # by self._rx_chan @@ -268,12 +278,74 @@ async def _hijack_stdin_for_child( if isinstance(err, trio.Cancelled): raise - - log.debug(f"TTY lock released, remote task: {task_name}:{subactor_uid}") + finally: + log.debug( + "TTY lock released, remote task:" + f"{task_name}:{subactor_uid}") return "pdb_unlock_complete" +async def wait_for_parent_stdin_hijack( + actor_uid: Tuple[str, str], + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED +): + ''' + Connect to the root actor via a ctx and invoke a task which locks + a root-local TTY lock. + + This function is used by any sub-actor to acquire mutex access to + pdb and the root's TTY for interactive debugging (see below inside + ``_breakpoint()``). It can be used to ensure that an intermediate + nursery-owning actor does not clobber its children if they are in + debug (see below inside ``maybe_wait_for_debugger()``). + + ''' + global _debugger_request_cs + + with trio.CancelScope(shield=True) as cs: + _debugger_request_cs = cs + + try: + async with get_root() as portal: + + # this syncs to child's ``Context.started()`` call. + async with portal.open_context( + + tractor._debug._hijack_stdin_for_child, + subactor_uid=actor_uid, + + ) as (ctx, val): + + log.pdb('locked context') + assert val == 'Locked' + + async with ctx.open_stream() as stream: + # unblock local caller + task_status.started(cs) + + try: + assert _local_pdb_complete + await _local_pdb_complete.wait() + + finally: + # TODO: shielding currently can cause hangs... + with trio.CancelScope(shield=True): + await stream.send('pdb_unlock') + + # sync with callee termination + assert await ctx.result() == "pdb_unlock_complete" + + except tractor.ContextCancelled: + log.warning('Root actor cancelled debug lock') + + finally: + log.debug(f"Exiting debugger for actor {actor_uid}") + global _local_task_in_debug + _local_task_in_debug = None + log.debug(f"Child {actor_uid} released parent stdio lock") + + async def _breakpoint( debug_func, @@ -300,56 +372,6 @@ async def _breakpoint( await trio.lowlevel.checkpoint() - async def wait_for_parent_stdin_hijack( - task_status=trio.TASK_STATUS_IGNORED - ): - global _debugger_request_cs - - with trio.CancelScope(shield=True) as cs: - _debugger_request_cs = cs - - try: - async with get_root() as portal: - - log.pdb('got portal') - - # this syncs to child's ``Context.started()`` call. - async with portal.open_context( - - tractor._debug._hijack_stdin_for_child, - subactor_uid=actor.uid, - - ) as (ctx, val): - - log.pdb('locked context') - assert val == 'Locked' - - async with ctx.open_stream() as stream: - - log.error('opened stream') - # unblock local caller - task_status.started() - - try: - await _local_pdb_complete.wait() - - finally: - # TODO: shielding currently can cause hangs... - with trio.CancelScope(shield=True): - await stream.send('pdb_unlock') - - # sync with callee termination - assert await ctx.result() == "pdb_unlock_complete" - - except tractor.ContextCancelled: - log.warning('Root actor cancelled debug lock') - - finally: - log.debug(f"Exiting debugger for actor {actor}") - global _local_task_in_debug - _local_task_in_debug = None - log.debug(f"Child {actor} released parent stdio lock") - if not _local_pdb_complete or _local_pdb_complete.is_set(): _local_pdb_complete = trio.Event() @@ -386,7 +408,10 @@ async def _breakpoint( # cancel on this task start? I *think* this works below? # actor._service_n.cancel_scope.shield = shield with trio.CancelScope(shield=True): - await actor._service_n.start(wait_for_parent_stdin_hijack) + await actor._service_n.start( + wait_for_parent_stdin_hijack, + actor.uid, + ) elif is_root_process(): @@ -407,11 +432,10 @@ async def _breakpoint( 'Root actor attempting to shield-acquire active tty lock' f' owned by {_global_actor_in_debug}') + # must shield here to avoid hitting a ``Cancelled`` and + # a child getting stuck bc we clobbered the tty with trio.CancelScope(shield=True): - # must shield here to avoid hitting a ``Cancelled`` and - # a child getting stuck bc we clobbered the tty await _debug_lock.acquire() - else: # may be cancelled await _debug_lock.acquire() @@ -501,7 +525,7 @@ post_mortem = partial( async def _maybe_enter_pm(err): if ( - _state.debug_mode() + debug_mode() # NOTE: don't enter debug mode recursively after quitting pdb # Iow, don't re-enter the repl if the `quit` command was issued @@ -524,3 +548,80 @@ async def _maybe_enter_pm(err): else: return False + + +@acm +async def acquire_debug_lock( + subactor_uid: Tuple[str, str], +) -> AsyncGenerator[None, tuple]: + ''' + Grab root's debug lock on entry, release on exit. + + ''' + async with trio.open_nursery() as n: + cs = await n.start( + wait_for_parent_stdin_hijack, + subactor_uid, + ) + yield None + cs.cancel() + + +async def maybe_wait_for_debugger( + poll_steps: int = 2, + poll_delay: float = 0.1, +) -> None: + + if not debug_mode(): + return + + if ( + is_root_process() + ): + global _no_remote_has_tty, _global_actor_in_debug, _wait_all_tasks_lock + + # If we error in the root but the debugger is + # engaged we don't want to prematurely kill (and + # thus clobber access to) the local tty since it + # will make the pdb repl unusable. + # Instead try to wait for pdb to be released before + # tearing down. + + sub_in_debug = None + + for _ in range(poll_steps): + + if _global_actor_in_debug: + sub_in_debug = tuple(_global_actor_in_debug) + + log.warning( + 'Root polling for debug') + + with trio.CancelScope(shield=True): + await trio.sleep(poll_delay) + + # TODO: could this make things more deterministic? wait + # to see if a sub-actor task will be scheduled and grab + # the tty lock on the next tick? + # XXX: doesn't seem to work + # await trio.testing.wait_all_tasks_blocked(cushion=0) + + debug_complete = _no_remote_has_tty + if ( + (debug_complete and + not debug_complete.is_set()) + ): + log.warning( + 'Root has errored but pdb is in use by ' + f'child {sub_in_debug}\n' + 'Waiting on tty lock to release..') + + await debug_complete.wait() + + await trio.sleep(poll_delay) + continue + else: + log.warning( + 'Root acquired TTY LOCK' + ) + return diff --git a/tractor/_discovery.py b/tractor/_discovery.py index bcfcc84..bac4110 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -44,7 +44,7 @@ async def get_arbiter( @asynccontextmanager async def get_root( **kwargs, -) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: +) -> typing.AsyncGenerator[Portal, None]: host, port = _runtime_vars['_root_mailbox'] assert host is not None diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 0d43b1a..bca812d 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -1,5 +1,6 @@ """ Machinery for actor process spawning using multiple backends. + """ import sys import multiprocessing as mp @@ -8,7 +9,6 @@ from typing import Any, Dict, Optional import trio from trio_typing import TaskStatus -from async_generator import asynccontextmanager try: from multiprocessing import semaphore_tracker # type: ignore @@ -22,9 +22,15 @@ from multiprocessing import forkserver # type: ignore from typing import Tuple from . import _forkserver_override +from ._debug import ( + maybe_wait_for_debugger, + acquire_debug_lock, +) from ._state import ( current_actor, is_main_process, + is_root_process, + debug_mode, ) from .log import get_logger @@ -123,44 +129,43 @@ async def cancel_on_completion( portal: Portal, actor: Actor, errors: Dict[Tuple[str, str], Exception], - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Cancel actor gracefully once it's "main" portal's + """ + Cancel actor gracefully once it's "main" portal's result arrives. Should only be called for actors spawned with `run_in_actor()`. + """ - with trio.CancelScope() as cs: + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request + result = await exhaust_portal(portal, actor) + if isinstance(result, Exception): + errors[actor.uid] = result + log.warning( + f"Cancelling {portal.channel.uid} after error {result}" + ) - task_status.started(cs) + else: + log.runtime( + f"Cancelling {portal.channel.uid} gracefully " + f"after result {result}") - # if this call errors we store the exception for later - # in ``errors`` which will be reraised inside - # a MultiError and we still send out a cancel request - result = await exhaust_portal(portal, actor) - if isinstance(result, Exception): - errors[actor.uid] = result - log.warning( - f"Cancelling {portal.channel.uid} after error {result}" - ) - - else: - log.runtime( - f"Cancelling {portal.channel.uid} gracefully " - f"after result {result}") - - # cancel the process now that we have a final result - await portal.cancel_actor() + # cancel the process now that we have a final result + await portal.cancel_actor() async def do_hard_kill( proc: trio.Process, + terminate_after: int = 3, ) -> None: # NOTE: this timeout used to do nothing since we were shielding # the ``.wait()`` inside ``new_proc()`` which will pretty much # never release until the process exits, now it acts as # a hard-kill time ultimatum. - with trio.move_on_after(3) as cs: + with trio.move_on_after(terminate_after) as cs: # NOTE: This ``__aexit__()`` shields internally. async with proc: # calls ``trio.Process.aclose()`` @@ -174,108 +179,112 @@ async def do_hard_kill( proc.kill() -@asynccontextmanager -async def spawn_subactor( - subactor: 'Actor', - parent_addr: Tuple[str, int], -): - spawn_cmd = [ - sys.executable, - "-m", - # Hardcode this (instead of using ``_child.__name__`` to avoid a - # double import warning: https://stackoverflow.com/a/45070583 - "tractor._child", - # We provide the child's unique identifier on this exec/spawn - # line for debugging purposes when viewing the process tree from - # the OS; it otherwise can be passed via the parent channel if - # we prefer in the future (for privacy). - "--uid", - str(subactor.uid), - # Address the child must connect to on startup - "--parent_addr", - str(parent_addr) - ] - - if subactor.loglevel: - spawn_cmd += [ - "--loglevel", - subactor.loglevel - ] - - proc = await trio.open_process(spawn_cmd) - try: - yield proc - - finally: - log.runtime(f"Attempting to kill {proc}") - - # XXX: do this **after** cancellation/tearfown - # to avoid killing the process too early - # since trio does this internally on ``__aexit__()`` - - await do_hard_kill(proc) - - async def new_proc( + name: str, actor_nursery: 'ActorNursery', # type: ignore # noqa subactor: Actor, errors: Dict[Tuple[str, str], Exception], + # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child + *, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED -) -> None: - """Create a new ``multiprocessing.Process`` using the - spawn method as configured using ``try_set_start_method()``. - """ - cancel_scope = None +) -> None: + """ + Create a new ``multiprocessing.Process`` using the + spawn method as configured using ``try_set_start_method()``. + + """ # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method + uid = subactor.uid if _spawn_method == 'trio': - async with trio.open_nursery() as nursery: - async with spawn_subactor( - subactor, - parent_addr, - ) as proc: - log.runtime(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it + spawn_cmd = [ + sys.executable, + "-m", + # Hardcode this (instead of using ``_child.__name__`` to avoid a + # double import warning: https://stackoverflow.com/a/45070583 + "tractor._child", + # We provide the child's unique identifier on this exec/spawn + # line for debugging purposes when viewing the process tree from + # the OS; it otherwise can be passed via the parent channel if + # we prefer in the future (for privacy). + "--uid", + str(subactor.uid), + # Address the child must connect to on startup + "--parent_addr", + str(parent_addr) + ] + + if subactor.loglevel: + spawn_cmd += [ + "--loglevel", + subactor.loglevel + ] + + cancelled_during_spawn: bool = False + try: + proc = await trio.open_process(spawn_cmd) + + log.runtime(f"Started {proc}") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + try: event, chan = await actor_nursery._actor.wait_for_peer( subactor.uid) - portal = Portal(chan) - actor_nursery._children[subactor.uid] = ( - subactor, proc, portal) + except trio.Cancelled: + cancelled_during_spawn = True + # we may cancel before the child connects back in which + # case avoid clobbering the pdb tty. + if debug_mode(): + with trio.CancelScope(shield=True): + # don't clobber an ongoing pdb + if is_root_process(): + await maybe_wait_for_debugger() + else: + async with acquire_debug_lock(uid): + # soft wait on the proc to terminate + with trio.move_on_after(0.5): + await proc.wait() + raise - # send additional init params - await chan.send({ - "_parent_main_data": subactor._parent_main_data, - "enable_modules": subactor.enable_modules, - "_arb_addr": subactor._arb_addr, - "bind_host": bind_addr[0], - "bind_port": bind_addr[1], - "_runtime_vars": _runtime_vars, - }) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = ( + subactor, proc, portal) - # track subactor in current nursery - curr_actor = current_actor() - curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + # send additional init params + await chan.send({ + "_parent_main_data": subactor._parent_main_data, + "enable_modules": subactor.enable_modules, + "_arb_addr": subactor._arb_addr, + "bind_host": bind_addr[0], + "bind_port": bind_addr[1], + "_runtime_vars": _runtime_vars, + }) - # resume caller at next checkpoint now that child is up - task_status.started(portal) + # track subactor in current nursery + curr_actor = current_actor() + curr_actor._actoruid2nursery[subactor.uid] = actor_nursery - # wait for ActorNursery.wait() to be called - with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() + # resume caller at next checkpoint now that child is up + task_status.started(portal) + # wait for ActorNursery.wait() to be called + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() + + async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: - cancel_scope = await nursery.start( + nursery.start_soon( cancel_on_completion, portal, subactor, @@ -285,32 +294,45 @@ async def new_proc( # Wait for proc termination but **dont' yet** call # ``trio.Process.__aexit__()`` (it tears down stdio # which will kill any waiting remote pdb trace). - - # TODO: No idea how we can enforce zombie - # reaping more stringently without the shield - # we used to have below... - - # with trio.CancelScope(shield=True): - # async with proc: - - # Always "hard" join sub procs since no actor zombies - # are allowed! - - # this is a "light" (cancellable) join, the hard join is - # in the enclosing scope (see above). + # This is a "soft" (cancellable) join/reap. await proc.wait() - log.debug(f"Joined {proc}") - # pop child entry to indicate we no longer managing this subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) - - # cancel result waiter that may have been spawned in - # tandem if not done already - if cancel_scope: + # cancel result waiter that may have been spawned in + # tandem if not done already log.warning( "Cancelling existing result waiter task for " f"{subactor.uid}") - cancel_scope.cancel() + nursery.cancel_scope.cancel() + + finally: + # The "hard" reap since no actor zombies are allowed! + # XXX: do this **after** cancellation/tearfown to avoid + # killing the process too early. + log.cancel(f'Hard reap sequence starting for {uid}') + + with trio.CancelScope(shield=True): + + # don't clobber an ongoing pdb + if cancelled_during_spawn: + # Try again to avoid TTY clobbering. + async with acquire_debug_lock(uid): + with trio.move_on_after(0.5): + await proc.wait() + + if is_root_process(): + await maybe_wait_for_debugger() + + if proc.poll() is None: + log.cancel(f"Attempting to hard kill {proc}") + await do_hard_kill(proc) + + log.debug(f"Joined {proc}") + + if not cancelled_during_spawn: + # pop child entry to indicate we no longer managing this + # subactor + actor_nursery._children.pop(subactor.uid) + else: # `multiprocessing` # async with trio.open_nursery() as nursery: @@ -341,141 +363,124 @@ async def mp_new_proc( task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: - async with trio.open_nursery() as nursery: - assert _ctx - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver - # only once and pass its ipc info to downstream - # children - # forkserver.set_forkserver_preload(enable_modules) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr( - resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info + + assert _ctx + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver + # only once and pass its ipc info to downstream + # children + # forkserver.set_forkserver_preload(enable_modules) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr( + resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) else: - fs_info = (None, None, None, None, None) + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) - proc: mp.Process = _ctx.Process( # type: ignore - target=_mp_main, - args=( - subactor, - bind_addr, - fs_info, - start_method, - parent_addr, - ), - # daemon=True, - name=name, - ) - # `multiprocessing` only (since no async interface): - # register the process before start in case we get a cancel - # request before the actor has fully spawned - then we can wait - # for it to fully come up before sending a cancel request - actor_nursery._children[subactor.uid] = (subactor, proc, None) + proc: mp.Process = _ctx.Process( # type: ignore + target=_mp_main, + args=( + subactor, + bind_addr, + fs_info, + start_method, + parent_addr, + ), + # daemon=True, + name=name, + ) + # `multiprocessing` only (since no async interface): + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + actor_nursery._children[subactor.uid] = (subactor, proc, None) - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") - log.runtime(f"Started {proc}") + log.runtime(f"Started {proc}") - try: - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) - portal = Portal(chan) - actor_nursery._children[subactor.uid] = (subactor, proc, portal) + try: + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + # except: + # TODO: in the case we were cancelled before the sub-proc + # registered itself back we must be sure to try and clean + # any process we may have started. - # unblock parent task - task_status.started(portal) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = (subactor, proc, portal) - # wait for ``ActorNursery`` block to signal that - # subprocesses can be waited upon. - # This is required to ensure synchronization - # with user code that may want to manually await results - # from nursery spawned sub-actors. We don't want the - # containing nurseries here to collect results or error - # while user code is still doing it's thing. Only after the - # nursery block closes do we allow subactor results to be - # awaited and reported upwards to the supervisor. + # unblock parent task + task_status.started(portal) + + # wait for ``ActorNursery`` block to signal that + # subprocesses can be waited upon. + # This is required to ensure synchronization + # with user code that may want to manually await results + # from nursery spawned sub-actors. We don't want the + # containing nurseries here to collect results or error + # while user code is still doing it's thing. Only after the + # nursery block closes do we allow subactor results to be + # awaited and reported upwards to the supervisor. + with trio.CancelScope(shield=True): await actor_nursery._join_procs.wait() - finally: - # XXX: in the case we were cancelled before the sub-proc - # registered itself back we must be sure to try and clean - # any process we may have started. - - reaping_cancelled: bool = False - cancel_scope: Optional[trio.CancelScope] = None - cancel_exc: Optional[trio.Cancelled] = None - + async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: - try: - # async with trio.open_nursery() as n: - # n.cancel_scope.shield = True - cancel_scope = await nursery.start( - cancel_on_completion, - portal, - subactor, - errors - ) - except trio.Cancelled as err: - cancel_exc = err + nursery.start_soon( + cancel_on_completion, + portal, + subactor, + errors + ) - # if the reaping task was cancelled we may have hit - # a race where the subproc disconnected before we - # could send it a message to cancel (classic 2 generals) - # in that case, wait shortly then kill the process. - reaping_cancelled = True - - if proc.is_alive(): - with trio.move_on_after(0.1) as cs: - cs.shield = True - await proc_waiter(proc) - - if cs.cancelled_caught: - proc.terminate() - - if not reaping_cancelled and proc.is_alive(): - await proc_waiter(proc) - - # TODO: timeout block here? - proc.join() - - log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + await proc_waiter(proc) # cancel result waiter that may have been spawned in # tandem if not done already - if cancel_scope: - log.warning( - "Cancelling existing result waiter task for " - f"{subactor.uid}") - cancel_scope.cancel() + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.uid}") + nursery.cancel_scope.cancel() - elif reaping_cancelled: # let the cancellation bubble up - assert cancel_exc - raise cancel_exc + finally: + # hard reap sequence + if proc.is_alive(): + log.cancel(f"Attempting to hard kill {proc}") + with trio.move_on_after(0.1) as cs: + cs.shield = True + await proc_waiter(proc) + + if cs.cancelled_caught: + proc.terminate() + + proc.join() + log.debug(f"Joined {proc}") + + # pop child entry to indicate we are no longer managing subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index e29bf5e..1259be6 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -12,6 +12,7 @@ import trio from async_generator import asynccontextmanager from . import _debug +from ._debug import maybe_wait_for_debugger from ._state import current_actor, is_main_process, is_root_process from .log import get_logger, get_loglevel from ._actor import Actor @@ -280,26 +281,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # will make the pdb repl unusable. # Instead try to wait for pdb to be released before # tearing down. - if is_root_process(): - - # TODO: could this make things more deterministic? - # wait to see if a sub-actor task will be - # scheduled and grab the tty lock on the next - # tick? - # await trio.testing.wait_all_tasks_blocked() - - debug_complete = _debug._no_remote_has_tty - if ( - debug_complete and - not debug_complete.is_set() - ): - log.warning( - 'Root has errored but pdb is in use by ' - f'child {_debug._global_actor_in_debug}\n' - 'Waiting on tty lock to release..') - - # with trio.CancelScope(shield=True): - await debug_complete.wait() + await maybe_wait_for_debugger() # if the caller's scope errored then we activate our # one-cancels-all supervisor strategy (don't