From 209a6a209680d7e3c2aa64b4f3de3938c6b0f8fd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jul 2018 00:20:50 -0400 Subject: [PATCH] Add a separate cancel scope for the main task Cancellation requires that each actor cancel it's spawned subactors before cancelling its own root (nursery's) cancel scope to avoid breaking channel connections before kill commands (`Actor.cancel()`) have been sent off to peers. To solve this, ensure each main task is cancelled to completion first (which will guarantee that all actor nurseries have completed their cancellation steps) before cancelling the actor's "core" tasks under the "root" scope. --- tractor/__init__.py | 60 ++++++++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 22fe303..2342a0a 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -176,6 +176,8 @@ class Actor: self._peers = defaultdict(list) self._peer_connected = {} self._no_more_peers = trio.Event() + self._main_complete = trio.Event() + self._main_scope = None self._no_more_peers.set() self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._listeners = [] @@ -443,28 +445,32 @@ class Actor: self._process_messages, self._parent_chan) if self.main: - try: - if self._parent_chan: - log.debug(f"Starting main task `{self.main}`") - # spawned subactor so deliver "main" task result(s) - # back to parent - await nursery.start( - _invoke, 'main', - self._parent_chan, self.main, {}, - False, True # treat_as_gen, raise_errs params - ) - else: - # run directly - we are an "unspawned actor" - log.debug(f"Running `{self.main}` directly") - result = await self.main() - - finally: - # tear down channel server in order to ensure - # we exit normally when the main task is done - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() + with trio.open_cancel_scope() as main_scope: + self._main_scope = main_scope + try: + if self._parent_chan: + log.debug(f"Starting main task `{self.main}`") + # spawned subactor so deliver "main" task result(s) + # back to parent + await nursery.start( + _invoke, 'main', + self._parent_chan, self.main, {}, + False, True # treat_as_gen, raise_errs params + ) + else: + # run directly - we are an "unspawned actor" + log.debug(f"Running `{self.main}` directly") + result = await self.main() + finally: + # tear down channel server in order to ensure + # we exit normally when the main task is done + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() + if main_scope.cancelled_caught: + log.debug("Main task was cancelled sucessfully") + self._main_complete.set() log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -550,6 +556,10 @@ class Actor: cancelling (for all intents and purposes) this actor. """ self.cancel_server() + if self._main_scope: + self._main_scope.cancel() + log.debug("Waiting on main task to complete") + await self._main_complete.wait() self._root_nursery.cancel_scope.cancel() def cancel_server(self): @@ -684,9 +694,11 @@ class Portal: cancel_scope.shield = True # send cancel cmd - might not get response await self.run('self', 'cancel') + return True except trio.ClosedStreamError: log.warn( - f"{self.channel} for {self.channel.uid} was alreaday closed?") + f"{self.channel} for {self.channel.uid} was already closed?") + return False @asynccontextmanager @@ -795,7 +807,8 @@ class ActorNursery: return portal async def wait(self): - + """Wait for all subactors to complete. + """ async def wait_for_proc(proc, actor, portal): # TODO: timeout block here? if proc.is_alive(): @@ -1022,6 +1035,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): actor = Actor( name or 'anonymous', main=main, + arbiter_addr=arbiter_addr, **kwargs ) host, port = (host, 0)