Add `Actor.cancel_soon()` for sync self destruct

Add a sync method that can be used to cancel the current actor from
a synchronous context. This is useful in debugging situations where
sync debugger code may need to kill the process tree.

Also, make the internal "lifetime stack" a global var; easier to manage
from client code that may was to add callbacks prior to the actor
runtime being fully setup.
sync_cancel
Tyler Goodlet 2020-11-15 23:54:42 -05:00
parent a13b3fe0a5
commit be22a2526a
1 changed files with 16 additions and 4 deletions

View File

@ -178,6 +178,10 @@ def _get_mod_abspath(module):
return os.path.abspath(module.__file__) return os.path.abspath(module.__file__)
# process-global stack closed at end on actor runtime teardown
_lifetime_stack: ExitStack = ExitStack()
class Actor: class Actor:
"""The fundamental concurrency primitive. """The fundamental concurrency primitive.
@ -192,7 +196,6 @@ class Actor:
_root_n: Optional[trio.Nursery] = None _root_n: Optional[trio.Nursery] = None
_service_n: Optional[trio.Nursery] = None _service_n: Optional[trio.Nursery] = None
_server_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None
_lifetime_stack: ExitStack = ExitStack()
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: Dict[str, str] _parent_main_data: Dict[str, str]
@ -545,8 +548,9 @@ class Actor:
# deadlock and other weird behaviour) # deadlock and other weird behaviour)
if func != self.cancel: if func != self.cancel:
if isinstance(cs, Exception): if isinstance(cs, Exception):
log.warning(f"Task for RPC func {func} failed with" log.warning(
f"{cs}") f"Task for RPC func {func} failed with"
f"{cs}")
else: else:
# mark that we have ongoing rpc tasks # mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
@ -784,7 +788,7 @@ class Actor:
# tear down all lifetime contexts if not in guest mode # tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint? # XXX: should this just be in the entrypoint?
log.warning("Closing all actor lifetime contexts") log.warning("Closing all actor lifetime contexts")
self._lifetime_stack.close() _lifetime_stack.close()
# Unregister actor from the arbiter # Unregister actor from the arbiter
if registered_with_arbiter and ( if registered_with_arbiter and (
@ -858,6 +862,14 @@ class Actor:
# signal the server is down since nursery above terminated # signal the server is down since nursery above terminated
self._server_down.set() self._server_down.set()
def cancel_soon(self) -> None:
"""Cancel this actor asap; can be called from a sync context.
Schedules `.cancel()` to be run immediately just like when
cancelled by the parent.
"""
self._service_n.start_soon(self.cancel)
async def cancel(self) -> bool: async def cancel(self) -> bool:
"""Cancel this actor. """Cancel this actor.