From 7643bbf183aecc13c68146cadc48c74065e36f14 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Oct 2021 23:13:47 -0400 Subject: [PATCH 01/25] Make actor runtime cancellation immediate --- tractor/_actor.py | 90 ++++++++++++++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 6aafa11..a19623b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -503,8 +503,8 @@ class Actor: log.runtime(f"Peers is {self._peers}") if not self._peers: # no more channels connected - self._no_more_peers.set() log.runtime("Signalling no more peer channels") + self._no_more_peers.set() # # XXX: is this necessary (GC should do it?) if chan.connected(): @@ -671,7 +671,28 @@ class Actor: f"{ns}.{funcname}({kwargs})") if ns == 'self': func = getattr(self, funcname) - if funcname == '_cancel_task': + if funcname == 'cancel': + + # don't start entire actor runtime cancellation if this + # actor is in debug mode + pdb_complete = _debug._local_pdb_complete + if pdb_complete: + await pdb_complete.wait() + + # we immediately start the runtime machinery shutdown + await _invoke(self, cid, chan, func, kwargs) + + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + log.cancel( + f"Actor {self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await self._cancel_complete.wait() + loop_cs.cancel() + break + + elif funcname == '_cancel_task': + # XXX: a special case is made here for # remote calls since we don't want the # remote actor have to know which channel @@ -681,6 +702,7 @@ class Actor: # Portal.run('self', '_cancel_task, cid=did) # without passing the `chan` arg. kwargs['chan'] = chan + else: # complain to client about restricted modules try: @@ -706,37 +728,28 @@ class Actor: # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) - if func != self.cancel: - if isinstance(cs, Exception): - log.warning( - f"Task for RPC func {func} failed with" - f"{cs}") - else: - # mark that we have ongoing rpc tasks - self._ongoing_rpc_tasks = trio.Event() - log.runtime(f"RPC func is {func}") - # store cancel scope such that the rpc task can be - # cancelled gracefully if requested - self._rpc_tasks[(chan, cid)] = ( - cs, func, trio.Event()) - else: - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` + # if func != self.cancel: + if isinstance(cs, Exception): log.warning( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await self._cancel_complete.wait() - loop_cs.cancel() - break + f"Task for RPC func {func} failed with" + f"{cs}") + else: + # mark that we have ongoing rpc tasks + self._ongoing_rpc_tasks = trio.Event() + log.runtime(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + self._rpc_tasks[(chan, cid)] = ( + cs, func, trio.Event()) log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") - else: - # channel disconnect - log.runtime( - f"{chan} for {chan.uid} disconnected, cancelling tasks" - ) - await self.cancel_rpc_tasks(chan) + + # end of async for, channel disconnect vis ``trio.EndOfChannel`` + log.runtime( + f"{chan} for {chan.uid} disconnected, cancelling tasks" + ) + await self.cancel_rpc_tasks(chan) except ( TransportClosed, @@ -947,6 +960,9 @@ class Actor: # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) except Exception as err: + log.info("Closing all actor lifetime contexts") + _lifetime_stack.close() + if not registered_with_arbiter: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger @@ -976,11 +992,21 @@ class Actor: raise finally: - log.runtime("Root nursery complete") + log.info("Root nursery complete") # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? - log.cancel("Closing all actor lifetime contexts") + log.info("Closing all actor lifetime contexts") + + # TODO: we can't actually do this bc the debugger + # uses the _service_n to spawn the lock task, BUT, + # in theory if we had the root nursery surround this finally + # block it might be actually possible to debug THIS + # machinery in the same way as user task code? + # if self.name == 'brokerd.ib': + # with trio.CancelScope(shield=True): + # await _debug.breakpoint() + _lifetime_stack.close() # Unregister actor from the arbiter @@ -1099,7 +1125,7 @@ class Actor: if self._service_n: self._service_n.cancel_scope.cancel() - log.cancel(f"{self.uid} was sucessfullly cancelled") + log.cancel(f"{self.uid} called `Actor.cancel()`") self._cancel_complete.set() return True From 41f0992445da003ab54ce68e7526549bd28bc548 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Oct 2021 18:20:08 -0400 Subject: [PATCH 02/25] Don't whine about ; it ain't rpc --- tractor/_actor.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index a19623b..9416260 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -49,6 +49,7 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: Dict[str, Any], + is_rpc: bool = True, task_status: TaskStatus[ Union[trio.CancelScope, BaseException] ] = trio.TASK_STATUS_IGNORED, @@ -243,10 +244,11 @@ async def _invoke( scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) is_complete.set() except KeyError: - # If we're cancelled before the task returns then the - # cancel scope will not have been inserted yet - log.warning( - f"Task {func} likely errored or cancelled before it started") + if is_rpc: + # If we're cancelled before the task returns then the + # cancel scope will not have been inserted yet + log.warning( + f"Task {func} likely errored or cancelled before it started") finally: if not actor._rpc_tasks: log.runtime("All RPC tasks have completed") @@ -680,7 +682,7 @@ class Actor: await pdb_complete.wait() # we immediately start the runtime machinery shutdown - await _invoke(self, cid, chan, func, kwargs) + await _invoke(self, cid, chan, func, kwargs, is_rpc=False) # self.cancel() was called so kill this msg loop # and break out into ``_async_main()`` @@ -721,10 +723,11 @@ class Actor: partial(_invoke, self, cid, chan, func, kwargs), name=funcname, ) - except RuntimeError: + except (RuntimeError, trio.MultiError): # avoid reporting a benign race condition # during actor runtime teardown. nursery_cancelled_before_task = True + break # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) From bb9d9c74b1b9f9ae053f75831dd4716ae2addb84 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Oct 2021 11:42:32 -0400 Subject: [PATCH 03/25] Do immediate remote task cancels As for `Actor.cancel()` requests, do the same for `Actor._cancel_task()` but use `_invoke()` to ensure correct msg transactions with caller. Don't cancel task cancels on a cancel-all-tasks operation in attempt at more determinism. --- tractor/_actor.py | 63 +++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 9416260..e85b16b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -682,29 +682,30 @@ class Actor: await pdb_complete.wait() # we immediately start the runtime machinery shutdown - await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + with trio.CancelScope(shield=True): + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + log.cancel( + f"Actor {self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + # await self._cancel_complete.wait() - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` - log.cancel( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await self._cancel_complete.wait() loop_cs.cancel() break - elif funcname == '_cancel_task': - - # XXX: a special case is made here for - # remote calls since we don't want the - # remote actor have to know which channel - # the task is associated with and we can't - # pass non-primitive types between actors. - # This means you can use: - # Portal.run('self', '_cancel_task, cid=did) - # without passing the `chan` arg. - kwargs['chan'] = chan + if funcname == '_cancel_task': + # we immediately start the runtime machinery shutdown + with trio.CancelScope(shield=True): + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + kwargs['chan'] = chan + log.cancel( + f"Actor {self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await _invoke(self, cid, chan, func, kwargs, is_rpc=False) + continue else: # complain to client about restricted modules try: @@ -995,7 +996,7 @@ class Actor: raise finally: - log.info("Root nursery complete") + log.info("Runtime nursery complete") # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? @@ -1094,7 +1095,7 @@ class Actor: self._service_n.start_soon(self.cancel) async def cancel(self) -> bool: - """Cancel this actor. + """Cancel this actor's runtime. The "deterministic" teardown sequence in order is: - cancel all ongoing rpc tasks by cancel scope @@ -1187,18 +1188,20 @@ class Actor: registered for each. """ tasks = self._rpc_tasks - log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") - for (chan, cid) in tasks.copy(): - if only_chan is not None: - if only_chan != chan: - continue + if tasks: + log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") + for (chan, cid), (scope, func, is_complete) in tasks.copy().items(): + if only_chan is not None: + if only_chan != chan: + continue - # TODO: this should really done in a nursery batch - await self._cancel_task(cid, chan) + # TODO: this should really done in a nursery batch + if func != self._cancel_task: + await self._cancel_task(cid, chan) - log.cancel( - f"Waiting for remaining rpc tasks to complete {tasks}") - await self._ongoing_rpc_tasks.wait() + log.cancel( + f"Waiting for remaining rpc tasks to complete {tasks}") + await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: """Cancel the internal channel server nursery thereby From 46ff558556398cb89ec62bf8363f11976a32b8d4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Oct 2021 23:59:39 -0400 Subject: [PATCH 04/25] Unwind process opening and shield hard reap --- tractor/_spawn.py | 163 ++++++++++++++++++++-------------------------- 1 file changed, 72 insertions(+), 91 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 0d43b1a..943cccb 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -174,48 +174,6 @@ async def do_hard_kill( proc.kill() -@asynccontextmanager -async def spawn_subactor( - subactor: 'Actor', - parent_addr: Tuple[str, int], -): - spawn_cmd = [ - sys.executable, - "-m", - # Hardcode this (instead of using ``_child.__name__`` to avoid a - # double import warning: https://stackoverflow.com/a/45070583 - "tractor._child", - # We provide the child's unique identifier on this exec/spawn - # line for debugging purposes when viewing the process tree from - # the OS; it otherwise can be passed via the parent channel if - # we prefer in the future (for privacy). - "--uid", - str(subactor.uid), - # Address the child must connect to on startup - "--parent_addr", - str(parent_addr) - ] - - if subactor.loglevel: - spawn_cmd += [ - "--loglevel", - subactor.loglevel - ] - - proc = await trio.open_process(spawn_cmd) - try: - yield proc - - finally: - log.runtime(f"Attempting to kill {proc}") - - # XXX: do this **after** cancellation/tearfown - # to avoid killing the process too early - # since trio does this internally on ``__aexit__()`` - - await do_hard_kill(proc) - - async def new_proc( name: str, actor_nursery: 'ActorNursery', # type: ignore # noqa @@ -228,8 +186,10 @@ async def new_proc( *, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: - """Create a new ``multiprocessing.Process`` using the + """ + Create a new ``multiprocessing.Process`` using the spawn method as configured using ``try_set_start_method()``. + """ cancel_scope = None @@ -237,43 +197,66 @@ async def new_proc( subactor._spawn_method = _spawn_method if _spawn_method == 'trio': - async with trio.open_nursery() as nursery: - async with spawn_subactor( - subactor, - parent_addr, - ) as proc: - log.runtime(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) - portal = Portal(chan) - actor_nursery._children[subactor.uid] = ( - subactor, proc, portal) + spawn_cmd = [ + sys.executable, + "-m", + # Hardcode this (instead of using ``_child.__name__`` to avoid a + # double import warning: https://stackoverflow.com/a/45070583 + "tractor._child", + # We provide the child's unique identifier on this exec/spawn + # line for debugging purposes when viewing the process tree from + # the OS; it otherwise can be passed via the parent channel if + # we prefer in the future (for privacy). + "--uid", + str(subactor.uid), + # Address the child must connect to on startup + "--parent_addr", + str(parent_addr) + ] - # send additional init params - await chan.send({ - "_parent_main_data": subactor._parent_main_data, - "enable_modules": subactor.enable_modules, - "_arb_addr": subactor._arb_addr, - "bind_host": bind_addr[0], - "bind_port": bind_addr[1], - "_runtime_vars": _runtime_vars, - }) + if subactor.loglevel: + spawn_cmd += [ + "--loglevel", + subactor.loglevel + ] - # track subactor in current nursery - curr_actor = current_actor() - curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + try: + proc = await trio.open_process(spawn_cmd) - # resume caller at next checkpoint now that child is up - task_status.started(portal) + log.runtime(f"Started {proc}") - # wait for ActorNursery.wait() to be called - with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = ( + subactor, proc, portal) + # send additional init params + await chan.send({ + "_parent_main_data": subactor._parent_main_data, + "enable_modules": subactor.enable_modules, + "_arb_addr": subactor._arb_addr, + "bind_host": bind_addr[0], + "bind_port": bind_addr[1], + "_runtime_vars": _runtime_vars, + }) + + # track subactor in current nursery + curr_actor = current_actor() + curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + + # resume caller at next checkpoint now that child is up + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() + + async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: cancel_scope = await nursery.start( cancel_on_completion, @@ -285,25 +268,10 @@ async def new_proc( # Wait for proc termination but **dont' yet** call # ``trio.Process.__aexit__()`` (it tears down stdio # which will kill any waiting remote pdb trace). - - # TODO: No idea how we can enforce zombie - # reaping more stringently without the shield - # we used to have below... - - # with trio.CancelScope(shield=True): - # async with proc: - - # Always "hard" join sub procs since no actor zombies - # are allowed! - - # this is a "light" (cancellable) join, the hard join is - # in the enclosing scope (see above). + # This is a "soft" (cancellable) join/reap. await proc.wait() - log.debug(f"Joined {proc}") - # pop child entry to indicate we no longer managing this subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) - + finally: # cancel result waiter that may have been spawned in # tandem if not done already if cancel_scope: @@ -311,6 +279,19 @@ async def new_proc( "Cancelling existing result waiter task for " f"{subactor.uid}") cancel_scope.cancel() + + log.runtime(f"Attempting to kill {proc}") + + # The "hard" reap since no actor zombies are allowed! + # XXX: do this **after** cancellation/tearfown to avoid + # killing the process too early. + with trio.CancelScope(shield=True): + await do_hard_kill(proc) + + log.debug(f"Joined {proc}") + # pop child entry to indicate we no longer managing this subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + else: # `multiprocessing` # async with trio.open_nursery() as nursery: From 2df16c155765bd5d1334f8d2ef583dacc942b1f5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Oct 2021 12:03:57 -0400 Subject: [PATCH 05/25] Lol, fix sub-actor case --- tests/test_pubsub.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index a548537..6243a4e 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -180,6 +180,7 @@ def test_multi_actor_subs_arbiter_pub( 'streamer', enable_modules=[__name__], ) + name = 'streamer' even_portal = await n.run_in_actor( subs, From 77ec29008de93fe2bab1c56e6cc2123ad45ea688 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 13 Oct 2021 01:09:16 -0400 Subject: [PATCH 06/25] Simplify to soft and hard reap sequences This is actually surprisingly easy to grok having gone through a lot of pain understanding edge cases in the zombie lord dev branch. Basically we just need to make sure actors are managed in a 2 step reap sequence. In the "soft" reap phase we wait for the process to terminate on its own concurrently with (maybe) waiting for its portal's final result (if it's a `.run_in_actor()`). If this path is cancelled or errors, then we do a "hard" reap where we timeout and send a signal to the proc to terminate immediately. The only last remaining trick is to tie in the root-is-debugger-aware logic to yet again avoid tty clobbers. --- tractor/_spawn.py | 297 +++++++++++++++++++++------------------------- 1 file changed, 138 insertions(+), 159 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 943cccb..55ac9a3 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -8,7 +8,6 @@ from typing import Any, Dict, Optional import trio from trio_typing import TaskStatus -from async_generator import asynccontextmanager try: from multiprocessing import semaphore_tracker # type: ignore @@ -123,34 +122,32 @@ async def cancel_on_completion( portal: Portal, actor: Actor, errors: Dict[Tuple[str, str], Exception], - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Cancel actor gracefully once it's "main" portal's + """ + Cancel actor gracefully once it's "main" portal's result arrives. Should only be called for actors spawned with `run_in_actor()`. + """ - with trio.CancelScope() as cs: + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request + result = await exhaust_portal(portal, actor) + if isinstance(result, Exception): + errors[actor.uid] = result + log.warning( + f"Cancelling {portal.channel.uid} after error {result}" + ) - task_status.started(cs) + else: + log.runtime( + f"Cancelling {portal.channel.uid} gracefully " + f"after result {result}") - # if this call errors we store the exception for later - # in ``errors`` which will be reraised inside - # a MultiError and we still send out a cancel request - result = await exhaust_portal(portal, actor) - if isinstance(result, Exception): - errors[actor.uid] = result - log.warning( - f"Cancelling {portal.channel.uid} after error {result}" - ) - - else: - log.runtime( - f"Cancelling {portal.channel.uid} gracefully " - f"after result {result}") - - # cancel the process now that we have a final result - await portal.cancel_actor() + # cancel the process now that we have a final result + await portal.cancel_actor() async def do_hard_kill( @@ -191,8 +188,6 @@ async def new_proc( spawn method as configured using ``try_set_start_method()``. """ - cancel_scope = None - # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method @@ -258,7 +253,8 @@ async def new_proc( async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: - cancel_scope = await nursery.start( + # cancel_scope = await nursery.start( + nursery.start_soon( cancel_on_completion, portal, subactor, @@ -271,22 +267,22 @@ async def new_proc( # This is a "soft" (cancellable) join/reap. await proc.wait() - finally: - # cancel result waiter that may have been spawned in - # tandem if not done already - if cancel_scope: + # cancel result waiter that may have been spawned in + # tandem if not done already log.warning( "Cancelling existing result waiter task for " f"{subactor.uid}") - cancel_scope.cancel() + nursery.cancel_scope.cancel() - log.runtime(f"Attempting to kill {proc}") + finally: + if proc.poll() is None: + log.cancel(f"Attempting to hard kill {proc}") - # The "hard" reap since no actor zombies are allowed! - # XXX: do this **after** cancellation/tearfown to avoid - # killing the process too early. - with trio.CancelScope(shield=True): - await do_hard_kill(proc) + # The "hard" reap since no actor zombies are allowed! + # XXX: do this **after** cancellation/tearfown to avoid + # killing the process too early. + with trio.CancelScope(shield=True): + await do_hard_kill(proc) log.debug(f"Joined {proc}") # pop child entry to indicate we no longer managing this subactor @@ -322,141 +318,124 @@ async def mp_new_proc( task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: - async with trio.open_nursery() as nursery: - assert _ctx - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver - # only once and pass its ipc info to downstream - # children - # forkserver.set_forkserver_preload(enable_modules) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr( - resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info + + assert _ctx + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver + # only once and pass its ipc info to downstream + # children + # forkserver.set_forkserver_preload(enable_modules) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr( + resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) else: - fs_info = (None, None, None, None, None) + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) - proc: mp.Process = _ctx.Process( # type: ignore - target=_mp_main, - args=( - subactor, - bind_addr, - fs_info, - start_method, - parent_addr, - ), - # daemon=True, - name=name, - ) - # `multiprocessing` only (since no async interface): - # register the process before start in case we get a cancel - # request before the actor has fully spawned - then we can wait - # for it to fully come up before sending a cancel request - actor_nursery._children[subactor.uid] = (subactor, proc, None) + proc: mp.Process = _ctx.Process( # type: ignore + target=_mp_main, + args=( + subactor, + bind_addr, + fs_info, + start_method, + parent_addr, + ), + # daemon=True, + name=name, + ) + # `multiprocessing` only (since no async interface): + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + actor_nursery._children[subactor.uid] = (subactor, proc, None) - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") - log.runtime(f"Started {proc}") + log.runtime(f"Started {proc}") - try: - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) - portal = Portal(chan) - actor_nursery._children[subactor.uid] = (subactor, proc, portal) + try: + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + # except: + # TODO: in the case we were cancelled before the sub-proc + # registered itself back we must be sure to try and clean + # any process we may have started. - # unblock parent task - task_status.started(portal) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = (subactor, proc, portal) - # wait for ``ActorNursery`` block to signal that - # subprocesses can be waited upon. - # This is required to ensure synchronization - # with user code that may want to manually await results - # from nursery spawned sub-actors. We don't want the - # containing nurseries here to collect results or error - # while user code is still doing it's thing. Only after the - # nursery block closes do we allow subactor results to be - # awaited and reported upwards to the supervisor. + # unblock parent task + task_status.started(portal) + + # wait for ``ActorNursery`` block to signal that + # subprocesses can be waited upon. + # This is required to ensure synchronization + # with user code that may want to manually await results + # from nursery spawned sub-actors. We don't want the + # containing nurseries here to collect results or error + # while user code is still doing it's thing. Only after the + # nursery block closes do we allow subactor results to be + # awaited and reported upwards to the supervisor. + with trio.CancelScope(shield=True): await actor_nursery._join_procs.wait() - finally: - # XXX: in the case we were cancelled before the sub-proc - # registered itself back we must be sure to try and clean - # any process we may have started. - - reaping_cancelled: bool = False - cancel_scope: Optional[trio.CancelScope] = None - cancel_exc: Optional[trio.Cancelled] = None - + async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: - try: - # async with trio.open_nursery() as n: - # n.cancel_scope.shield = True - cancel_scope = await nursery.start( - cancel_on_completion, - portal, - subactor, - errors - ) - except trio.Cancelled as err: - cancel_exc = err + nursery.start_soon( + cancel_on_completion, + portal, + subactor, + errors + ) - # if the reaping task was cancelled we may have hit - # a race where the subproc disconnected before we - # could send it a message to cancel (classic 2 generals) - # in that case, wait shortly then kill the process. - reaping_cancelled = True - - if proc.is_alive(): - with trio.move_on_after(0.1) as cs: - cs.shield = True - await proc_waiter(proc) - - if cs.cancelled_caught: - proc.terminate() - - if not reaping_cancelled and proc.is_alive(): - await proc_waiter(proc) - - # TODO: timeout block here? - proc.join() - - log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + await proc_waiter(proc) # cancel result waiter that may have been spawned in # tandem if not done already - if cancel_scope: - log.warning( - "Cancelling existing result waiter task for " - f"{subactor.uid}") - cancel_scope.cancel() + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.uid}") + nursery.cancel_scope.cancel() - elif reaping_cancelled: # let the cancellation bubble up - assert cancel_exc - raise cancel_exc + finally: + # hard reap sequence + if proc.is_alive(): + log.cancel(f"Attempting to hard kill {proc}") + with trio.move_on_after(0.1) as cs: + cs.shield = True + await proc_waiter(proc) + + if cs.cancelled_caught: + proc.terminate() + + proc.join() + log.debug(f"Joined {proc}") + + # pop child entry to indicate we are no longer managing subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) From 893bad72d5d2b630c6a669014b12cbfc1fae6bb0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Oct 2021 18:13:55 -0400 Subject: [PATCH 07/25] Add a maybe-open-debugger helper --- tractor/_debug.py | 68 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 6e1d7f0..62f81d1 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -219,7 +219,8 @@ async def _hijack_stdin_for_child( subactor_uid: Tuple[str, str] ) -> str: - '''Hijack the tty in the root process of an actor tree such that + ''' + Hijack the tty in the root process of an actor tree such that the pdbpp debugger console can be allocated to a sub-actor for repl bossing. @@ -254,6 +255,8 @@ async def _hijack_stdin_for_child( # assert await stream.receive() == 'pdb_unlock' except ( + # BaseException, + trio.MultiError, trio.BrokenResourceError, trio.Cancelled, # by local cancellation trio.ClosedResourceError, # by self._rx_chan @@ -268,8 +271,9 @@ async def _hijack_stdin_for_child( if isinstance(err, trio.Cancelled): raise - - log.debug(f"TTY lock released, remote task: {task_name}:{subactor_uid}") + finally: + log.pdb("TTY lock released, remote task:" + f"{task_name}:{subactor_uid}") return "pdb_unlock_complete" @@ -407,11 +411,10 @@ async def _breakpoint( 'Root actor attempting to shield-acquire active tty lock' f' owned by {_global_actor_in_debug}') + # must shield here to avoid hitting a ``Cancelled`` and + # a child getting stuck bc we clobbered the tty with trio.CancelScope(shield=True): - # must shield here to avoid hitting a ``Cancelled`` and - # a child getting stuck bc we clobbered the tty await _debug_lock.acquire() - else: # may be cancelled await _debug_lock.acquire() @@ -524,3 +527,56 @@ async def _maybe_enter_pm(err): else: return False + + +async def maybe_wait_for_debugger() -> None: + + global _no_remote_has_tty, _global_actor_in_debug + + # If we error in the root but the debugger is + # engaged we don't want to prematurely kill (and + # thus clobber access to) the local tty since it + # will make the pdb repl unusable. + # Instead try to wait for pdb to be released before + # tearing down. + if ( + _state.debug_mode() and + is_root_process() + ): + + # TODO: could this make things more deterministic? + # wait to see if a sub-actor task will be + # scheduled and grab the tty lock on the next + # tick? + # await trio.testing.wait_all_tasks_blocked() + + sub_in_debug = None + if _global_actor_in_debug: + sub_in_debug = tuple(_global_actor_in_debug) + + for _ in range(2): + with trio.CancelScope(shield=True): + + log.warning( + 'Root polling for debug') + await trio.sleep(0.01) + + debug_complete = _no_remote_has_tty + if ( + (debug_complete and + not debug_complete.is_set()) + ): + log.warning( + 'Root has errored but pdb is in use by ' + f'child {sub_in_debug}\n' + 'Waiting on tty lock to release..') + + await debug_complete.wait() + + await trio.sleep(0.01) + continue + else: + log.warning( + 'Root acquired DEBUGGER' + ) + return From 62035078ce6a11410323dc662bbd1b199b46c353 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 13 Oct 2021 09:33:33 -0400 Subject: [PATCH 08/25] Reduce some loglevels, stick in comment about blocking till next tick --- tractor/_debug.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 62f81d1..b9b470b 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -139,7 +139,7 @@ async def _acquire_debug_lock( task_name = trio.lowlevel.current_task().name - log.pdb( + log.debug( f"Attempting to acquire TTY lock, remote task: {task_name}:{uid}" ) @@ -187,7 +187,7 @@ async def _acquire_debug_lock( if ( not stats.owner ): - log.pdb(f"No more tasks waiting on tty lock! says {uid}") + log.debug(f"No more tasks waiting on tty lock! says {uid}") _no_remote_has_tty.set() _no_remote_has_tty = None @@ -272,7 +272,7 @@ async def _hijack_stdin_for_child( if isinstance(err, trio.Cancelled): raise finally: - log.pdb("TTY lock released, remote task:" + log.debug("TTY lock released, remote task:" f"{task_name}:{subactor_uid}") return "pdb_unlock_complete" @@ -315,8 +315,6 @@ async def _breakpoint( try: async with get_root() as portal: - log.pdb('got portal') - # this syncs to child's ``Context.started()`` call. async with portal.open_context( @@ -561,6 +559,12 @@ async def maybe_wait_for_debugger() -> None: 'Root polling for debug') await trio.sleep(0.01) + # TODO: could this make things more deterministic? + # wait to see if a sub-actor task will be + # scheduled and grab the tty lock on the next + # tick? + # await trio.testing.wait_all_tasks_blocked() + debug_complete = _no_remote_has_tty if ( (debug_complete and From f3a6ab62af0a9a323fcdc6e753f529181e0f6692 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 13 Oct 2021 09:36:09 -0400 Subject: [PATCH 09/25] Use debugger helper in nursery and spawn tasks --- tractor/_spawn.py | 13 ++++++++----- tractor/_trionics.py | 22 ++-------------------- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 55ac9a3..a6edfec 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -21,6 +21,7 @@ from multiprocessing import forkserver # type: ignore from typing import Tuple from . import _forkserver_override +from ._debug import maybe_wait_for_debugger from ._state import ( current_actor, is_main_process, @@ -275,13 +276,15 @@ async def new_proc( nursery.cancel_scope.cancel() finally: + # The "hard" reap since no actor zombies are allowed! + # XXX: do this **after** cancellation/tearfown to avoid + # killing the process too early. if proc.poll() is None: - log.cancel(f"Attempting to hard kill {proc}") - - # The "hard" reap since no actor zombies are allowed! - # XXX: do this **after** cancellation/tearfown to avoid - # killing the process too early. with trio.CancelScope(shield=True): + # don't clobber an ongoing pdb + await maybe_wait_for_debugger() + + log.cancel(f"Attempting to hard kill {proc}") await do_hard_kill(proc) log.debug(f"Joined {proc}") diff --git a/tractor/_trionics.py b/tractor/_trionics.py index e29bf5e..1259be6 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -12,6 +12,7 @@ import trio from async_generator import asynccontextmanager from . import _debug +from ._debug import maybe_wait_for_debugger from ._state import current_actor, is_main_process, is_root_process from .log import get_logger, get_loglevel from ._actor import Actor @@ -280,26 +281,7 @@ async def _open_and_supervise_one_cancels_all_nursery( # will make the pdb repl unusable. # Instead try to wait for pdb to be released before # tearing down. - if is_root_process(): - - # TODO: could this make things more deterministic? - # wait to see if a sub-actor task will be - # scheduled and grab the tty lock on the next - # tick? - # await trio.testing.wait_all_tasks_blocked() - - debug_complete = _debug._no_remote_has_tty - if ( - debug_complete and - not debug_complete.is_set() - ): - log.warning( - 'Root has errored but pdb is in use by ' - f'child {_debug._global_actor_in_debug}\n' - 'Waiting on tty lock to release..') - - # with trio.CancelScope(shield=True): - await debug_complete.wait() + await maybe_wait_for_debugger() # if the caller's scope errored then we activate our # one-cancels-all supervisor strategy (don't From d30ce96740404d18d84b94360b238916454b05b8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 13 Oct 2021 23:08:58 -0400 Subject: [PATCH 10/25] Breakout `wait_for_parent_stdin_hijack()`, increase root pdb checker poll time --- tractor/_debug.py | 149 ++++++++++++++++++++++++++-------------------- 1 file changed, 83 insertions(+), 66 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index b9b470b..0ae00a7 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -272,12 +272,72 @@ async def _hijack_stdin_for_child( if isinstance(err, trio.Cancelled): raise finally: - log.debug("TTY lock released, remote task:" - f"{task_name}:{subactor_uid}") + log.debug( + "TTY lock released, remote task:" + f"{task_name}:{subactor_uid}") return "pdb_unlock_complete" +async def wait_for_parent_stdin_hijack( + actor: 'Actor', # noqa + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED +): + ''' + Connect to the root actor via a ctx and invoke a task which locks a root-local + TTY lock. + + This function is used by any sub-actor to acquire mutex access to + pdb and the root's TTY for interactive debugging (see below inside + ``_breakpoint()``). It can be used to ensure that an intermediate + nursery-owning actor does not clobber its children if they are in + debug (see below inside ``maybe_wait_for_debugger()``). + + ''' + global _debugger_request_cs + + with trio.CancelScope(shield=True) as cs: + _debugger_request_cs = cs + + try: + async with get_root() as portal: + + # this syncs to child's ``Context.started()`` call. + async with portal.open_context( + + tractor._debug._hijack_stdin_for_child, + subactor_uid=actor.uid, + + ) as (ctx, val): + + log.pdb('locked context') + assert val == 'Locked' + + async with ctx.open_stream() as stream: + # unblock local caller + task_status.started(cs) + + try: + await _local_pdb_complete.wait() + + finally: + # TODO: shielding currently can cause hangs... + with trio.CancelScope(shield=True): + await stream.send('pdb_unlock') + + # sync with callee termination + assert await ctx.result() == "pdb_unlock_complete" + + except tractor.ContextCancelled: + log.warning('Root actor cancelled debug lock') + + finally: + log.debug(f"Exiting debugger for actor {actor.uid}") + global _local_task_in_debug + _local_task_in_debug = None + log.debug(f"Child {actor.uid} released parent stdio lock") + + async def _breakpoint( debug_func, @@ -304,54 +364,6 @@ async def _breakpoint( await trio.lowlevel.checkpoint() - async def wait_for_parent_stdin_hijack( - task_status=trio.TASK_STATUS_IGNORED - ): - global _debugger_request_cs - - with trio.CancelScope(shield=True) as cs: - _debugger_request_cs = cs - - try: - async with get_root() as portal: - - # this syncs to child's ``Context.started()`` call. - async with portal.open_context( - - tractor._debug._hijack_stdin_for_child, - subactor_uid=actor.uid, - - ) as (ctx, val): - - log.pdb('locked context') - assert val == 'Locked' - - async with ctx.open_stream() as stream: - - log.error('opened stream') - # unblock local caller - task_status.started() - - try: - await _local_pdb_complete.wait() - - finally: - # TODO: shielding currently can cause hangs... - with trio.CancelScope(shield=True): - await stream.send('pdb_unlock') - - # sync with callee termination - assert await ctx.result() == "pdb_unlock_complete" - - except tractor.ContextCancelled: - log.warning('Root actor cancelled debug lock') - - finally: - log.debug(f"Exiting debugger for actor {actor}") - global _local_task_in_debug - _local_task_in_debug = None - log.debug(f"Child {actor} released parent stdio lock") - if not _local_pdb_complete or _local_pdb_complete.is_set(): _local_pdb_complete = trio.Event() @@ -388,7 +400,10 @@ async def _breakpoint( # cancel on this task start? I *think* this works below? # actor._service_n.cancel_scope.shield = shield with trio.CancelScope(shield=True): - await actor._service_n.start(wait_for_parent_stdin_hijack) + await actor._service_n.start( + wait_for_parent_stdin_hijack, + actor, + ) elif is_root_process(): @@ -529,7 +544,7 @@ async def _maybe_enter_pm(err): async def maybe_wait_for_debugger() -> None: - global _no_remote_has_tty, _global_actor_in_debug + global _no_remote_has_tty, _global_actor_in_debug, _wait_all_tasks_lock # If we error in the root but the debugger is # engaged we don't want to prematurely kill (and @@ -538,7 +553,7 @@ async def maybe_wait_for_debugger() -> None: # Instead try to wait for pdb to be released before # tearing down. if ( - _state.debug_mode() and + _state.debug_mode() or is_root_process() ): @@ -549,21 +564,23 @@ async def maybe_wait_for_debugger() -> None: # await trio.testing.wait_all_tasks_blocked() sub_in_debug = None - if _global_actor_in_debug: - sub_in_debug = tuple(_global_actor_in_debug) for _ in range(2): + + if _global_actor_in_debug: + sub_in_debug = tuple(_global_actor_in_debug) + + log.warning( + 'Root polling for debug') + with trio.CancelScope(shield=True): + await trio.sleep(0.1) - log.warning( - 'Root polling for debug') - await trio.sleep(0.01) - - # TODO: could this make things more deterministic? - # wait to see if a sub-actor task will be - # scheduled and grab the tty lock on the next - # tick? - # await trio.testing.wait_all_tasks_blocked() + # TODO: could this make things more deterministic? wait + # to see if a sub-actor task will be scheduled and grab + # the tty lock on the next tick? + # XXX: doesn't seem to work + # await trio.testing.wait_all_tasks_blocked(cushion=0) debug_complete = _no_remote_has_tty if ( @@ -577,10 +594,10 @@ async def maybe_wait_for_debugger() -> None: await debug_complete.wait() - await trio.sleep(0.01) + await trio.sleep(0.1) continue else: log.warning( - 'Root acquired DEBUGGER' + 'Root acquired TTY LOCK' ) return From 4b2710b8a599a445235c29b8e96164ee128207da Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 13 Oct 2021 23:32:02 -0400 Subject: [PATCH 11/25] Add tty lock acquire ctx mngr --- tractor/_debug.py | 54 ++++++++++++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 0ae00a7..6edfd20 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -5,7 +5,7 @@ Multi-core debugging for da peeps! import bdb import sys from functools import partial -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from typing import Tuple, Optional, Callable, AsyncIterator import tractor @@ -122,7 +122,7 @@ class PdbwTeardown(pdbpp.Pdb): # break -@asynccontextmanager +@acm async def _acquire_debug_lock( uid: Tuple[str, str] @@ -542,30 +542,46 @@ async def _maybe_enter_pm(err): return False -async def maybe_wait_for_debugger() -> None: +@acm +async def acquire_debug_lock( + subactor: Actor, +) -> None: + ''' + Grab root's debug lock on entry, release on exit. - global _no_remote_has_tty, _global_actor_in_debug, _wait_all_tasks_lock + ''' + async with trio.open_nursery() as n: + cs = await n.start( + wait_for_parent_stdin_hijack, + subactor, + ) + yield + cs.cancel() + + +async def maybe_wait_for_debugger( + poll_steps: int = 2, + poll_delay: float = 0.1, +) -> None: + + if not _state.debug_mode(): + return - # If we error in the root but the debugger is - # engaged we don't want to prematurely kill (and - # thus clobber access to) the local tty since it - # will make the pdb repl unusable. - # Instead try to wait for pdb to be released before - # tearing down. if ( - _state.debug_mode() or is_root_process() ): + global _no_remote_has_tty, _global_actor_in_debug, _wait_all_tasks_lock - # TODO: could this make things more deterministic? - # wait to see if a sub-actor task will be - # scheduled and grab the tty lock on the next - # tick? - # await trio.testing.wait_all_tasks_blocked() + # If we error in the root but the debugger is + # engaged we don't want to prematurely kill (and + # thus clobber access to) the local tty since it + # will make the pdb repl unusable. + # Instead try to wait for pdb to be released before + # tearing down. sub_in_debug = None - for _ in range(2): + for _ in range(poll_steps): if _global_actor_in_debug: sub_in_debug = tuple(_global_actor_in_debug) @@ -574,7 +590,7 @@ async def maybe_wait_for_debugger() -> None: 'Root polling for debug') with trio.CancelScope(shield=True): - await trio.sleep(0.1) + await trio.sleep(poll_delay) # TODO: could this make things more deterministic? wait # to see if a sub-actor task will be scheduled and grab @@ -594,7 +610,7 @@ async def maybe_wait_for_debugger() -> None: await debug_complete.wait() - await trio.sleep(0.1) + await trio.sleep(poll_delay) continue else: log.warning( From daa28ea0e92486833b9ea59e95dad320a4a1069c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 13 Oct 2021 23:33:31 -0400 Subject: [PATCH 12/25] Handle depth > 1 nursery owners which use debug mode --- tractor/_spawn.py | 54 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a6edfec..b55cad7 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -1,5 +1,6 @@ """ Machinery for actor process spawning using multiple backends. + """ import sys import multiprocessing as mp @@ -21,10 +22,14 @@ from multiprocessing import forkserver # type: ignore from typing import Tuple from . import _forkserver_override -from ._debug import maybe_wait_for_debugger +from ._debug import ( + maybe_wait_for_debugger, + acquire_debug_lock, +) from ._state import ( current_actor, is_main_process, + is_root_process, ) from .log import get_logger @@ -153,12 +158,13 @@ async def cancel_on_completion( async def do_hard_kill( proc: trio.Process, + terminate_after: int = 3, ) -> None: # NOTE: this timeout used to do nothing since we were shielding # the ``.wait()`` inside ``new_proc()`` which will pretty much # never release until the process exits, now it acts as # a hard-kill time ultimatum. - with trio.move_on_after(3) as cs: + with trio.move_on_after(terminate_after) as cs: # NOTE: This ``__aexit__()`` shields internally. async with proc: # calls ``trio.Process.aclose()`` @@ -173,16 +179,20 @@ async def do_hard_kill( async def new_proc( + name: str, actor_nursery: 'ActorNursery', # type: ignore # noqa subactor: Actor, errors: Dict[Tuple[str, str], Exception], + # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child + *, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED + ) -> None: """ Create a new ``multiprocessing.Process`` using the @@ -191,6 +201,7 @@ async def new_proc( """ # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method + uid = subactor.uid if _spawn_method == 'trio': @@ -217,6 +228,7 @@ async def new_proc( subactor.loglevel ] + cancel_during_spawn: bool = False try: proc = await trio.open_process(spawn_cmd) @@ -225,8 +237,24 @@ async def new_proc( # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) + try: + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + except trio.Cancelled: + cancel_during_spawn = True + # we may cancel before the child connects back in which + # case avoid clobbering the pdb tty. + with trio.CancelScope(shield=True): + # don't clobber an ongoing pdb + if is_root_process(): + await maybe_wait_for_debugger() + else: + async with acquire_debug_lock(): + # soft wait on the proc to terminate + with trio.move_on_after(0.5): + await proc.wait() + raise + portal = Portal(chan) actor_nursery._children[subactor.uid] = ( subactor, proc, portal) @@ -254,7 +282,6 @@ async def new_proc( async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: - # cancel_scope = await nursery.start( nursery.start_soon( cancel_on_completion, portal, @@ -279,11 +306,20 @@ async def new_proc( # The "hard" reap since no actor zombies are allowed! # XXX: do this **after** cancellation/tearfown to avoid # killing the process too early. - if proc.poll() is None: - with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - await maybe_wait_for_debugger() + log.cancel(f'Hard reap sequence starting for {uid}') + with trio.CancelScope(shield=True): + # don't clobber an ongoing pdb + await maybe_wait_for_debugger() + + if cancel_during_spawn: + + # Try again to avoid TTY clobbering. + async with acquire_debug_lock(): + with trio.move_on_after(0.5): + await proc.wait() + + if proc.poll() is None: log.cancel(f"Attempting to hard kill {proc}") await do_hard_kill(proc) From b14699d40ba18af58dff9236b55d101bfb9de86a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 13 Oct 2021 23:34:25 -0400 Subject: [PATCH 13/25] Adjust debugger tests to expect depth > 1 crashes With the new fixes to the trio spawner we can expect that both root *and* depth > 1 nursery owning actors will now not clobber any children that are in debug (either via breakpoint or through crashing). The tests changed now include more checks which ensure the 2nd level parent-ish actors also bubble up through into `pdb` and don't kill any of their (crashed) children before they're done themselves debugging. --- .../debugging/multi_subactor_root_errors.py | 6 +++ tests/test_debugger.py | 52 ++++++++++++++++--- 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/examples/debugging/multi_subactor_root_errors.py b/examples/debugging/multi_subactor_root_errors.py index 6c69618..640f222 100644 --- a/examples/debugging/multi_subactor_root_errors.py +++ b/examples/debugging/multi_subactor_root_errors.py @@ -1,3 +1,8 @@ +''' +Test that a nested nursery will avoid clobbering +the debugger latched by a broken child. + +''' import trio import tractor @@ -35,6 +40,7 @@ async def main(): """ async with tractor.open_nursery( debug_mode=True, + # loglevel='cancel', ) as n: # spawn both actors diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 87bce1b..bd1cba6 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -236,7 +236,8 @@ def test_subactor_breakpoint(spawn): def test_multi_subactors(spawn): - """Multiple subactors, both erroring and breakpointing as well as + """ + Multiple subactors, both erroring and breakpointing as well as a nested subactor erroring. """ child = spawn(r'multi_subactors') @@ -259,6 +260,7 @@ def test_multi_subactors(spawn): # first name_error failure child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('name_error'" in before assert "NameError" in before # continue again @@ -267,6 +269,7 @@ def test_multi_subactors(spawn): # 2nd name_error failure child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('name_error_1'" in before assert "NameError" in before # breakpoint loop should re-engage @@ -275,6 +278,19 @@ def test_multi_subactors(spawn): before = str(child.before.decode()) assert "Attaching pdb to actor: ('breakpoint_forever'" in before + # wait for spawn error to show up + while 'breakpoint_forever' in before: + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) + + # 2nd depth nursery should trigger + # child.sendline('c') + # child.expect(r"\(Pdb\+\+\)") + # before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('spawn_error'" in before + assert "RemoteActorError: ('name_error_1'" in before + # now run some "continues" to show re-entries for _ in range(5): child.sendline('c') @@ -284,16 +300,24 @@ def test_multi_subactors(spawn): child.sendline('q') child.expect(r"\(Pdb\+\+\)") before = str(child.before.decode()) + # debugger attaches to root assert "Attaching to pdb in crashed actor: ('root'" in before + # expect a multierror with exceptions for each sub-actor assert "RemoteActorError: ('breakpoint_forever'" in before + assert "RemoteActorError: ('name_error'" in before + assert "RemoteActorError: ('spawn_error'" in before + assert "RemoteActorError: ('name_error_1'" in before assert 'bdb.BdbQuit' in before # process should exit child.sendline('c') child.expect(pexpect.EOF) - + # repeat of previous multierror for final output before = str(child.before.decode()) assert "RemoteActorError: ('breakpoint_forever'" in before + assert "RemoteActorError: ('name_error'" in before + assert "RemoteActorError: ('spawn_error'" in before + assert "RemoteActorError: ('name_error_1'" in before assert 'bdb.BdbQuit' in before @@ -387,16 +411,29 @@ def test_multi_subactors_root_errors(spawn): before = str(child.before.decode()) assert "NameError: name 'doggypants' is not defined" in before - # continue again + # continue again to catch 2nd name error from + # actor 'name_error_1' (which is 2nd depth). child.sendline('c') child.expect(r"\(Pdb\+\+\)") - - # should now get attached in root with assert error before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('name_error_1'" in before + assert "NameError" in before - # should have come just after priot prompt + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) + assert "Attaching to pdb in crashed actor: ('spawn_error'" in before + # boxed error from previous step + assert "RemoteActorError: ('name_error_1'" in before + assert "NameError" in before + + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + before = str(child.before.decode()) assert "Attaching to pdb in crashed actor: ('root'" in before - assert "AssertionError" in before + # boxed error from first level failure + assert "RemoteActorError: ('name_error'" in before + assert "NameError" in before # warnings assert we probably don't need # assert "Cancelling nursery in ('spawn_error'," in before @@ -406,6 +443,7 @@ def test_multi_subactors_root_errors(spawn): child.expect(pexpect.EOF) before = str(child.before.decode()) + # error from root actor and root task that created top level nursery assert "AssertionError" in before From 6f5c35dd1bbb38a7b88002623f9f29e8fff6a70d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Oct 2021 11:38:44 -0400 Subject: [PATCH 14/25] Fix missing task status type --- tractor/_debug.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 6edfd20..451570e 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -10,6 +10,7 @@ from typing import Tuple, Optional, Callable, AsyncIterator import tractor import trio +from trio_typing import TaskStatus from .log import get_logger from . import _state @@ -284,8 +285,8 @@ async def wait_for_parent_stdin_hijack( task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED ): ''' - Connect to the root actor via a ctx and invoke a task which locks a root-local - TTY lock. + Connect to the root actor via a ctx and invoke a task which locks + a root-local TTY lock. This function is used by any sub-actor to acquire mutex access to pdb and the root's TTY for interactive debugging (see below inside @@ -544,7 +545,7 @@ async def _maybe_enter_pm(err): @acm async def acquire_debug_lock( - subactor: Actor, + subactor: 'Actor', # noqa ) -> None: ''' Grab root's debug lock on entry, release on exit. From fa317d160091973ee8109249b849889b87f5260b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Oct 2021 12:07:09 -0400 Subject: [PATCH 15/25] Change lock helper to take an actor uid tuple --- tractor/_debug.py | 25 ++++++++++++++++--------- tractor/_spawn.py | 4 ++-- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 451570e..37b46bd 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -6,7 +6,13 @@ import bdb import sys from functools import partial from contextlib import asynccontextmanager as acm -from typing import Tuple, Optional, Callable, AsyncIterator +from typing import ( + Tuple, + Optional, + Callable, + AsyncIterator, + AsyncGenerator, +) import tractor import trio @@ -281,7 +287,7 @@ async def _hijack_stdin_for_child( async def wait_for_parent_stdin_hijack( - actor: 'Actor', # noqa + actor_uid: Tuple[str, str], task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED ): ''' @@ -307,7 +313,7 @@ async def wait_for_parent_stdin_hijack( async with portal.open_context( tractor._debug._hijack_stdin_for_child, - subactor_uid=actor.uid, + subactor_uid=actor_uid, ) as (ctx, val): @@ -319,6 +325,7 @@ async def wait_for_parent_stdin_hijack( task_status.started(cs) try: + assert _local_pdb_complete await _local_pdb_complete.wait() finally: @@ -333,10 +340,10 @@ async def wait_for_parent_stdin_hijack( log.warning('Root actor cancelled debug lock') finally: - log.debug(f"Exiting debugger for actor {actor.uid}") + log.debug(f"Exiting debugger for actor {actor_uid}") global _local_task_in_debug _local_task_in_debug = None - log.debug(f"Child {actor.uid} released parent stdio lock") + log.debug(f"Child {actor_uid} released parent stdio lock") async def _breakpoint( @@ -545,8 +552,8 @@ async def _maybe_enter_pm(err): @acm async def acquire_debug_lock( - subactor: 'Actor', # noqa -) -> None: + subactor_uid: Tuple[str, str] +) -> AsyncGenerator[None, tuple]: ''' Grab root's debug lock on entry, release on exit. @@ -554,9 +561,9 @@ async def acquire_debug_lock( async with trio.open_nursery() as n: cs = await n.start( wait_for_parent_stdin_hijack, - subactor, + subactor_uid, ) - yield + yield None cs.cancel() diff --git a/tractor/_spawn.py b/tractor/_spawn.py index b55cad7..4f496e4 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -249,7 +249,7 @@ async def new_proc( if is_root_process(): await maybe_wait_for_debugger() else: - async with acquire_debug_lock(): + async with acquire_debug_lock(uid): # soft wait on the proc to terminate with trio.move_on_after(0.5): await proc.wait() @@ -315,7 +315,7 @@ async def new_proc( if cancel_during_spawn: # Try again to avoid TTY clobbering. - async with acquire_debug_lock(): + async with acquire_debug_lock(uid): with trio.move_on_after(0.5): await proc.wait() From 9d83ef82b25fef6132f0f4c01ad981a76cf22176 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Oct 2021 12:07:36 -0400 Subject: [PATCH 16/25] Remove union type for root getter --- tractor/_discovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index bcfcc84..bac4110 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -44,7 +44,7 @@ async def get_arbiter( @asynccontextmanager async def get_root( **kwargs, -) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: +) -> typing.AsyncGenerator[Portal, None]: host, port = _runtime_vars['_root_mailbox'] assert host is not None From 7ee121aeaf5db5e1dd2077321776a013d44d3fc7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Oct 2021 12:12:13 -0400 Subject: [PATCH 17/25] Try to handle variable windows errors --- tests/test_cancellation.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 5da87ce..4271d8f 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -365,7 +365,8 @@ async def test_nested_multierrors(loglevel, start_method): # to happen before an actor is spawned if isinstance(subexc, trio.Cancelled): continue - else: + + elif isinstance(subexc, tractor.RemoteActorError): # on windows it seems we can't exactly be sure wtf # will happen.. assert subexc.type in ( @@ -373,6 +374,14 @@ async def test_nested_multierrors(loglevel, start_method): trio.Cancelled, trio.MultiError ) + + elif isinstance(subexc, trio.MultiError): + for subsub in subexc.exceptions: + assert subsub.type in ( + tractor.RemoteActorError, + trio.Cancelled, + trio.MultiError + ) else: assert isinstance(subexc, tractor.RemoteActorError) @@ -448,6 +457,7 @@ def test_cancel_via_SIGINT_other_task( with pytest.raises(KeyboardInterrupt): trio.run(main) + async def spin_for(period=3): "Sync sleep." time.sleep(period) From 51259c480901957df789fc486c9e5d0b0e46f1f0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Oct 2021 13:46:27 -0400 Subject: [PATCH 18/25] Pass uid not actor object --- tractor/_debug.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 37b46bd..8a0d414 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -410,7 +410,7 @@ async def _breakpoint( with trio.CancelScope(shield=True): await actor._service_n.start( wait_for_parent_stdin_hijack, - actor, + actor.uid, ) elif is_root_process(): @@ -552,7 +552,7 @@ async def _maybe_enter_pm(err): @acm async def acquire_debug_lock( - subactor_uid: Tuple[str, str] + subactor_uid: Tuple[str, str], ) -> AsyncGenerator[None, tuple]: ''' Grab root's debug lock on entry, release on exit. From 533457c64d9399dddff751a7a4ba9803f82418e8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Oct 2021 09:16:51 -0400 Subject: [PATCH 19/25] Handle nested multierror case on windows --- tests/test_cancellation.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 4271d8f..3c39956 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -1,5 +1,6 @@ """ Cancellation and error propagation + """ import os import signal @@ -377,10 +378,13 @@ async def test_nested_multierrors(loglevel, start_method): elif isinstance(subexc, trio.MultiError): for subsub in subexc.exceptions: - assert subsub.type in ( - tractor.RemoteActorError, + + if subsub in (tractor.RemoteActorError,): + subsub = subsub.type + + assert subsub in ( trio.Cancelled, - trio.MultiError + trio.MultiError, ) else: assert isinstance(subexc, tractor.RemoteActorError) From a42ec1f571c9b4097f49a8c9303576bacc147778 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Oct 2021 09:28:45 -0400 Subject: [PATCH 20/25] Add nooz --- newsfragments/245.feature.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 newsfragments/245.feature.rst diff --git a/newsfragments/245.feature.rst b/newsfragments/245.feature.rst new file mode 100644 index 0000000..45e7451 --- /dev/null +++ b/newsfragments/245.feature.rst @@ -0,0 +1,13 @@ +Change the core message loop to handle task and actor-runtime cancel +requests immediately instead of scheduling them as is done for rpc-task +requests. + +In order to obtain more reliable teardown mechanics for (complex) actor +trees it's important that we specially treat cancel requests as having +higher priority. Previously, it was possible that task cancel requests +could actually also themselves be cancelled if a "actor-runtime" cancel +request was received (can happen during messy multi actor crashes that +propagate). Instead cancels now block the msg loop until serviced and +a response is relayed back to the requester. This alsoa allowed for +improved debugger support since we have determinism guarantees about +which processes must wait before hard killing their children. From e4ed0fd2b3ef7898b9e056685c0a2486e89fefba Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Oct 2021 09:29:25 -0400 Subject: [PATCH 21/25] Right, only worry about pdb lock when in debug mode --- tractor/_debug.py | 6 +++--- tractor/_spawn.py | 26 ++++++++++++++------------ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/tractor/_debug.py b/tractor/_debug.py index 8a0d414..67485af 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -21,7 +21,7 @@ from trio_typing import TaskStatus from .log import get_logger from . import _state from ._discovery import get_root -from ._state import is_root_process +from ._state import is_root_process, debug_mode from ._exceptions import is_multi_cancelled try: @@ -525,7 +525,7 @@ post_mortem = partial( async def _maybe_enter_pm(err): if ( - _state.debug_mode() + debug_mode() # NOTE: don't enter debug mode recursively after quitting pdb # Iow, don't re-enter the repl if the `quit` command was issued @@ -572,7 +572,7 @@ async def maybe_wait_for_debugger( poll_delay: float = 0.1, ) -> None: - if not _state.debug_mode(): + if not debug_mode(): return if ( diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 4f496e4..1f22310 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -30,6 +30,7 @@ from ._state import ( current_actor, is_main_process, is_root_process, + debug_mode, ) from .log import get_logger @@ -242,17 +243,19 @@ async def new_proc( subactor.uid) except trio.Cancelled: cancel_during_spawn = True + # we may cancel before the child connects back in which # case avoid clobbering the pdb tty. - with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - if is_root_process(): - await maybe_wait_for_debugger() - else: - async with acquire_debug_lock(uid): - # soft wait on the proc to terminate - with trio.move_on_after(0.5): - await proc.wait() + if debug_mode(): + with trio.CancelScope(shield=True): + # don't clobber an ongoing pdb + if is_root_process(): + await maybe_wait_for_debugger() + else: + async with acquire_debug_lock(uid): + # soft wait on the proc to terminate + with trio.move_on_after(0.5): + await proc.wait() raise portal = Portal(chan) @@ -312,9 +315,8 @@ async def new_proc( # don't clobber an ongoing pdb await maybe_wait_for_debugger() - if cancel_during_spawn: - - # Try again to avoid TTY clobbering. + # Try again to avoid TTY clobbering. + if cancel_during_spawn and debug_mode(): async with acquire_debug_lock(uid): with trio.move_on_after(0.5): await proc.wait() From 4f222a5f9c264d17ab632c718f805b276f30735a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Oct 2021 10:07:45 -0400 Subject: [PATCH 22/25] Use type match of expected error --- tests/test_cancellation.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 3c39956..a589f81 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -382,7 +382,7 @@ async def test_nested_multierrors(loglevel, start_method): if subsub in (tractor.RemoteActorError,): subsub = subsub.type - assert subsub in ( + assert type(subsub) in ( trio.Cancelled, trio.MultiError, ) @@ -394,13 +394,14 @@ async def test_nested_multierrors(loglevel, start_method): # on windows sometimes spawning is just too slow and # we get back the (sent) cancel signal instead if platform.system() == 'Windows': - assert (subexc.type is trio.MultiError) or ( - subexc.type is tractor.RemoteActorError) + if isinstance(subexc, tractor.RemoteActorError): + assert subexc.type in (trio.MultiError, tractor.RemoteActorError) + else: + assert isinstance(subexc, trio.MultiError) else: assert subexc.type is trio.MultiError else: - assert (subexc.type is tractor.RemoteActorError) or ( - subexc.type is trio.Cancelled) + assert subexc.type in (tractor.RemoteActorError, trio.Cancelled) @no_windows From 5d827f78e2b7d748eb2a547f201d090a19f1564d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Oct 2021 11:42:57 -0400 Subject: [PATCH 23/25] Fix pluggy readme link and typo --- newsfragments/245.feature.rst | 2 +- newsfragments/HOWTO.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/newsfragments/245.feature.rst b/newsfragments/245.feature.rst index 45e7451..e2754c3 100644 --- a/newsfragments/245.feature.rst +++ b/newsfragments/245.feature.rst @@ -8,6 +8,6 @@ higher priority. Previously, it was possible that task cancel requests could actually also themselves be cancelled if a "actor-runtime" cancel request was received (can happen during messy multi actor crashes that propagate). Instead cancels now block the msg loop until serviced and -a response is relayed back to the requester. This alsoa allowed for +a response is relayed back to the requester. This also allows for improved debugger support since we have determinism guarantees about which processes must wait before hard killing their children. diff --git a/newsfragments/HOWTO.rst b/newsfragments/HOWTO.rst index a0eccc6..f132f0c 100644 --- a/newsfragments/HOWTO.rst +++ b/newsfragments/HOWTO.rst @@ -4,5 +4,5 @@ now and use the default `fragment set`_. .. _towncrier docs: https://github.com/twisted/towncrier#quick-start -.. _pluggy release readme: https://github.com/twisted/towncrier#quick-start +.. _pluggy release readme: https://github.com/pytest-dev/pluggy/blob/main/changelog/README.rst .. _fragment set: https://github.com/twisted/towncrier#news-fragments From 5cfac58873995a1e80712a1e4b9462a28ef84a1f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Oct 2021 17:54:45 -0400 Subject: [PATCH 24/25] Don't pop a child entry that was never inserted --- tractor/_spawn.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 1f22310..eb4616c 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -326,8 +326,11 @@ async def new_proc( await do_hard_kill(proc) log.debug(f"Joined {proc}") - # pop child entry to indicate we no longer managing this subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + + if not cancelled_during_spawn: + # pop child entry to indicate we no longer managing this + # subactor + actor_nursery._children.pop(subactor.uid) else: # `multiprocessing` From b3c4851ffb0b3cdf84721674804ccd64560411cc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Oct 2021 17:53:12 -0400 Subject: [PATCH 25/25] Grab lock if cancelled during spawn before hard kill --- tractor/_spawn.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index eb4616c..bca812d 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -229,7 +229,7 @@ async def new_proc( subactor.loglevel ] - cancel_during_spawn: bool = False + cancelled_during_spawn: bool = False try: proc = await trio.open_process(spawn_cmd) @@ -242,8 +242,7 @@ async def new_proc( event, chan = await actor_nursery._actor.wait_for_peer( subactor.uid) except trio.Cancelled: - cancel_during_spawn = True - + cancelled_during_spawn = True # we may cancel before the child connects back in which # case avoid clobbering the pdb tty. if debug_mode(): @@ -312,15 +311,17 @@ async def new_proc( log.cancel(f'Hard reap sequence starting for {uid}') with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - await maybe_wait_for_debugger() - # Try again to avoid TTY clobbering. - if cancel_during_spawn and debug_mode(): + # don't clobber an ongoing pdb + if cancelled_during_spawn: + # Try again to avoid TTY clobbering. async with acquire_debug_lock(uid): with trio.move_on_after(0.5): await proc.wait() + if is_root_process(): + await maybe_wait_for_debugger() + if proc.poll() is None: log.cancel(f"Attempting to hard kill {proc}") await do_hard_kill(proc)