diff --git a/tractor/__init__.py b/tractor/__init__.py index 22fe303..2342a0a 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -176,6 +176,8 @@ class Actor: self._peers = defaultdict(list) self._peer_connected = {} self._no_more_peers = trio.Event() + self._main_complete = trio.Event() + self._main_scope = None self._no_more_peers.set() self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._listeners = [] @@ -443,28 +445,32 @@ class Actor: self._process_messages, self._parent_chan) if self.main: - try: - if self._parent_chan: - log.debug(f"Starting main task `{self.main}`") - # spawned subactor so deliver "main" task result(s) - # back to parent - await nursery.start( - _invoke, 'main', - self._parent_chan, self.main, {}, - False, True # treat_as_gen, raise_errs params - ) - else: - # run directly - we are an "unspawned actor" - log.debug(f"Running `{self.main}` directly") - result = await self.main() - - finally: - # tear down channel server in order to ensure - # we exit normally when the main task is done - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() + with trio.open_cancel_scope() as main_scope: + self._main_scope = main_scope + try: + if self._parent_chan: + log.debug(f"Starting main task `{self.main}`") + # spawned subactor so deliver "main" task result(s) + # back to parent + await nursery.start( + _invoke, 'main', + self._parent_chan, self.main, {}, + False, True # treat_as_gen, raise_errs params + ) + else: + # run directly - we are an "unspawned actor" + log.debug(f"Running `{self.main}` directly") + result = await self.main() + finally: + # tear down channel server in order to ensure + # we exit normally when the main task is done + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() + if main_scope.cancelled_caught: + log.debug("Main task was cancelled sucessfully") + self._main_complete.set() log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -550,6 +556,10 @@ class Actor: cancelling (for all intents and purposes) this actor. """ self.cancel_server() + if self._main_scope: + self._main_scope.cancel() + log.debug("Waiting on main task to complete") + await self._main_complete.wait() self._root_nursery.cancel_scope.cancel() def cancel_server(self): @@ -684,9 +694,11 @@ class Portal: cancel_scope.shield = True # send cancel cmd - might not get response await self.run('self', 'cancel') + return True except trio.ClosedStreamError: log.warn( - f"{self.channel} for {self.channel.uid} was alreaday closed?") + f"{self.channel} for {self.channel.uid} was already closed?") + return False @asynccontextmanager @@ -795,7 +807,8 @@ class ActorNursery: return portal async def wait(self): - + """Wait for all subactors to complete. + """ async def wait_for_proc(proc, actor, portal): # TODO: timeout block here? if proc.is_alive(): @@ -1022,6 +1035,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): actor = Actor( name or 'anonymous', main=main, + arbiter_addr=arbiter_addr, **kwargs ) host, port = (host, 0)