From 2afbc3898f1eaa9fc9fce1793119dcb1276d3093 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Oct 2021 23:13:47 -0400 Subject: [PATCH] Make actor runtime cancellation immediate --- tractor/_actor.py | 90 ++++++++++++++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 661376d..927812b 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -506,8 +506,8 @@ class Actor: log.runtime(f"Peers is {self._peers}") if not self._peers: # no more channels connected - self._no_more_peers.set() log.runtime("Signalling no more peer channels") + self._no_more_peers.set() # # XXX: is this necessary (GC should do it?) if chan.connected(): @@ -674,7 +674,28 @@ class Actor: f"{ns}.{funcname}({kwargs})") if ns == 'self': func = getattr(self, funcname) - if funcname == '_cancel_task': + if funcname == 'cancel': + + # don't start entire actor runtime cancellation if this + # actor is in debug mode + pdb_complete = _debug._local_pdb_complete + if pdb_complete: + await pdb_complete.wait() + + # we immediately start the runtime machinery shutdown + await _invoke(self, cid, chan, func, kwargs) + + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + log.cancel( + f"Actor {self.uid} was remotely cancelled; " + "waiting on cancellation completion..") + await self._cancel_complete.wait() + loop_cs.cancel() + break + + elif funcname == '_cancel_task': + # XXX: a special case is made here for # remote calls since we don't want the # remote actor have to know which channel @@ -684,6 +705,7 @@ class Actor: # Portal.run('self', '_cancel_task, cid=did) # without passing the `chan` arg. kwargs['chan'] = chan + else: # complain to client about restricted modules try: @@ -709,37 +731,28 @@ class Actor: # never allow cancelling cancel requests (results in # deadlock and other weird behaviour) - if func != self.cancel: - if isinstance(cs, Exception): - log.warning( - f"Task for RPC func {func} failed with" - f"{cs}") - else: - # mark that we have ongoing rpc tasks - self._ongoing_rpc_tasks = trio.Event() - log.runtime(f"RPC func is {func}") - # store cancel scope such that the rpc task can be - # cancelled gracefully if requested - self._rpc_tasks[(chan, cid)] = ( - cs, func, trio.Event()) - else: - # self.cancel() was called so kill this msg loop - # and break out into ``_async_main()`` + # if func != self.cancel: + if isinstance(cs, Exception): log.warning( - f"Actor {self.uid} was remotely cancelled; " - "waiting on cancellation completion..") - await self._cancel_complete.wait() - loop_cs.cancel() - break + f"Task for RPC func {func} failed with" + f"{cs}") + else: + # mark that we have ongoing rpc tasks + self._ongoing_rpc_tasks = trio.Event() + log.runtime(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + self._rpc_tasks[(chan, cid)] = ( + cs, func, trio.Event()) log.runtime( f"Waiting on next msg for {chan} from {chan.uid}") - else: - # channel disconnect - log.runtime( - f"{chan} for {chan.uid} disconnected, cancelling tasks" - ) - await self.cancel_rpc_tasks(chan) + + # end of async for, channel disconnect vis ``trio.EndOfChannel`` + log.runtime( + f"{chan} for {chan.uid} disconnected, cancelling tasks" + ) + await self.cancel_rpc_tasks(chan) except ( TransportClosed, @@ -950,6 +963,9 @@ class Actor: # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) except Exception as err: + log.info("Closing all actor lifetime contexts") + _lifetime_stack.close() + if not registered_with_arbiter: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger @@ -979,11 +995,21 @@ class Actor: raise finally: - log.runtime("Root nursery complete") + log.info("Root nursery complete") # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? - log.cancel("Closing all actor lifetime contexts") + log.info("Closing all actor lifetime contexts") + + # TODO: we can't actually do this bc the debugger + # uses the _service_n to spawn the lock task, BUT, + # in theory if we had the root nursery surround this finally + # block it might be actually possible to debug THIS + # machinery in the same way as user task code? + # if self.name == 'brokerd.ib': + # with trio.CancelScope(shield=True): + # await _debug.breakpoint() + _lifetime_stack.close() # Unregister actor from the arbiter @@ -1102,7 +1128,7 @@ class Actor: if self._service_n: self._service_n.cancel_scope.cancel() - log.cancel(f"{self.uid} was sucessfullly cancelled") + log.cancel(f"{self.uid} called `Actor.cancel()`") self._cancel_complete.set() return True