diff --git a/tractor/_actor.py b/tractor/_actor.py index f98c695..e426be0 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -167,9 +167,10 @@ class Actor: """ is_arbiter: bool = False - # placeholders filled in by `_async_main` after fork - _root_nursery: trio.Nursery - _server_nursery: trio.Nursery + # nursery placeholders filled in by `_async_main()` after fork + _root_n: trio.Nursery = None + _service_n: trio.Nursery = None + _server_n: Optional[trio.Nursery] = None # Information about `__main__` from parent _parent_main_data: Dict[str, str] @@ -293,7 +294,7 @@ class Actor: ) -> None: """Entry point for new inbound connections to the channel server. """ - self._no_more_peers = trio.Event() + self._no_more_peers = trio.Event() # unset chan = Channel(stream=stream) log.info(f"New connection to us {chan}") @@ -427,13 +428,13 @@ class Actor: msg = None log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - with trio.CancelScope(shield=shield) as cs: + with trio.CancelScope(shield=shield) as loop_cs: # this internal scope allows for keeping this message # loop running despite the current task having been # cancelled (eg. `open_portal()` may call this method from # a locally spawned task) and recieve this scope using # ``scope = Nursery.start()`` - task_status.started(cs) + task_status.started(loop_cs) async for msg in chan: if msg is None: # loop terminate sentinel log.debug( @@ -496,7 +497,7 @@ class Actor: # spin up a task for the requested function log.debug(f"Spawning task for {func}") - cs = await self._root_nursery.start( + cs = await self._service_n.start( partial(_invoke, self, cid, chan, func, kwargs), name=funcname, ) @@ -514,6 +515,13 @@ class Actor: # cancelled gracefully if requested self._rpc_tasks[(chan, cid)] = ( cs, func, trio.Event()) + else: + # self.cancel() was called so kill this msg loop + # and break out into ``_async_main()`` + log.warning(f"{self.uid} was remotely cancelled") + loop_cs.cancel() + break + log.debug( f"Waiting on next msg for {chan} from {chan.uid}") else: @@ -540,6 +548,46 @@ class Actor: f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") + async def _chan_to_parent( + self, + parent_addr: Optional[Tuple[str, int]], + ) -> Channel: + try: + # Connect back to the parent actor and conduct initial + # handshake. From this point on if we error, we + # attempt to ship the exception back to the parent. + chan = Channel( + destaddr=parent_addr, + ) + await chan.connect() + + # Initial handshake: swap names. + await self._do_handshake(chan) + + if self._spawn_method == "trio": + # Receive runtime state from our parent + parent_data = await chan.recv() + log.debug( + "Recieved state from parent:\n" + f"{parent_data}" + ) + accept_addr = ( + parent_data.pop('bind_host'), + parent_data.pop('bind_port'), + ) + for attr, value in parent_data.items(): + setattr(self, attr, value) + + return chan, accept_addr + + except OSError: # failed to connect + log.warning( + f"Failed to connect to parent @ {parent_addr}," + " closing server") + await self.cancel() + # self._parent_chan = None + raise + async def _async_main( self, accept_addr: Optional[Tuple[str, int]] = None, @@ -561,88 +609,84 @@ class Actor: """ registered_with_arbiter = False try: - async with trio.open_nursery() as nursery: - self._root_nursery = nursery - # TODO: just make `parent_addr` a bool system (see above)? - if parent_addr is not None: - try: - # Connect back to the parent actor and conduct initial - # handshake. From this point on if we error, we - # attempt to ship the exception back to the parent. - chan = self._parent_chan = Channel( - destaddr=parent_addr, + # establish primary connection with immediate parent + self._parent_chan = None + if parent_addr is not None: + self._parent_chan, accept_addr = await self._chan_to_parent( + parent_addr) + + # load exposed/allowed RPC modules + # XXX: do this **after** establishing a channel to the parent + # but **before** starting the message loop for that channel + # such that import errors are properly propagated upwards + self.load_modules() + + # The "root" nursery ensures the channel with the immediate + # parent is kept alive as a reslient service until + # cancellation steps have (mostly) ocurred in + # a deterministic way. + async with trio.open_nursery() as root_nursery: + self._root_n = root_nursery + + async with trio.open_nursery() as service_nursery: + # This nursery is used to handle all inbound + # connections to us such that if the TCP server + # is killed, connections can continue to process + # in the background until this nursery is cancelled. + self._service_n = service_nursery + + # Startup up the channel server with, + # - subactor: the bind address sent to us by our parent + # over our established channel + # - root actor: the ``accept_addr`` passed to this method + assert accept_addr + host, port = accept_addr + self._server_n = await service_nursery.start( + partial( + self._serve_forever, + service_nursery, + accept_host=host, + accept_port=port ) - await chan.connect() - - # Initial handshake: swap names. - await self._do_handshake(chan) - - if self._spawn_method == "trio": - # Receive runtime state from our parent - parent_data = await chan.recv() - log.debug( - "Recieved state from parent:\n" - f"{parent_data}" - ) - accept_addr = ( - parent_data.pop('bind_host'), - parent_data.pop('bind_port'), - ) - for attr, value in parent_data.items(): - setattr(self, attr, value) - - except OSError: # failed to connect - log.warning( - f"Failed to connect to parent @ {parent_addr}," - " closing server") - await self.cancel() - self._parent_chan = None - raise - - # load exposed/allowed RPC modules - # XXX: do this **after** establishing a channel to the parent - # but **before** starting the message loop for that channel - # such that import errors are properly propagated upwards - self.load_modules() - - # Startup up channel server with, - # - subactor: the bind address sent to us by our parent - # over our established channel - # - root actor: the ``accept_addr`` passed to this method - assert accept_addr - host, port = accept_addr - await nursery.start( - partial( - self._serve_forever, - accept_host=host, - accept_port=port ) - ) - # Begin handling our new connection back to parent. - # This is done here since we don't want to start - # processing parent requests until our server is - # 100% up and running. - if self._parent_chan: - nursery.start_soon( - self._process_messages, self._parent_chan) + # Register with the arbiter if we're told its addr + log.debug(f"Registering {self} for role `{self.name}`") + assert isinstance(self._arb_addr, tuple) + + async with get_arbiter(*self._arb_addr) as arb_portal: + await arb_portal.run( + 'self', + 'register_actor', + uid=self.uid, + sockaddr=self.accept_addr, + ) - # Register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") - assert isinstance(self._arb_addr, tuple) - async with get_arbiter(*self._arb_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - uid=self.uid, sockaddr=self.accept_addr) registered_with_arbiter = True - task_status.started() - log.debug("Waiting on root nursery to complete") + # init steps complete + task_status.started() - # Blocks here as expected until the channel server is + # Begin handling our new connection back to our + # parent. This is done last since we don't want to + # start processing parent requests until our channel + # server is 100% up and running. + if self._parent_chan: + await root_nursery.start( + partial( + self._process_messages, + self._parent_chan, + shield=True, + ) + ) + log.info("Waiting on service nursery to complete") + log.info("Service nursery complete") + log.info("Waiting on root nursery to complete") + + # Blocks here as expected until the root nursery is # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception as err: + except (trio.MultiError, Exception) as err: if not registered_with_arbiter: # TODO: I guess we could try to connect back # to the parent through a channel and engage a debugger @@ -658,47 +702,57 @@ class Actor: ) if self._parent_chan: - try: - # internal error so ship to parent without cid - await self._parent_chan.send(pack_error(err)) - except trio.ClosedResourceError: - log.error( - f"Failed to ship error to parent " - f"{self._parent_chan.uid}, channel was closed") - log.exception("Actor errored:") + with trio.CancelScope(shield=True): + try: + # internal error so ship to parent without cid + await self._parent_chan.send(pack_error(err)) + except trio.ClosedResourceError: + log.error( + f"Failed to ship error to parent " + f"{self._parent_chan.uid}, channel was closed") + log.exception("Actor errored:") - if isinstance(err, ModuleNotFoundError): - raise - else: - # XXX wait, why? - # causes a hang if I always raise.. - # A parent process does something weird here? - # i'm so lost now.. - raise + # always! + raise finally: - if registered_with_arbiter: - with trio.move_on_after(3) as cs: - cs.shield = True - await self._do_unreg(self._arb_addr) + log.info("Root nursery complete") - # terminate actor once all it's peers (actors that connected - # to it as clients) have disappeared + # UNregister actor from the arbiter + if registered_with_arbiter and ( + self._arb_addr is not None + ): + failed = False + with trio.move_on_after(5) as cs: + cs.shield = True + try: + async with get_arbiter(*self._arb_addr) as arb_portal: + await arb_portal.run( + 'self', 'unregister_actor', uid=self.uid) + except OSError: + failed = True + if cs.cancelled_caught: + failed = True + if failed: + log.warning( + f"Failed to unregister {self.name} from arbiter") + + # Ensure all peers (actors connected to us as clients) are finished if not self._no_more_peers.is_set(): if any( chan.connected() for chan in chain(*self._peers.values()) ): log.debug( f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() + with trio.CancelScope(shield=True): + await self._no_more_peers.wait() log.debug("All peer channels are complete") - # tear down channel server no matter what since we errored - # or completed - self.cancel_server() + log.debug("Runtime completed") async def _serve_forever( self, + handler_nursery: trio.Nursery, *, # (host, port) to bind for channel server accept_host: Tuple[str, int] = None, @@ -710,48 +764,55 @@ class Actor: This will cause an actor to continue living (blocking) until ``cancel_server()`` is called. """ - async with trio.open_nursery() as nursery: - self._server_nursery = nursery - # TODO: might want to consider having a separate nursery - # for the stream handler such that the server can be cancelled - # whilst leaving existing channels up - listeners: List[trio.abc.Listener] = await nursery.start( - partial( - trio.serve_tcp, - self._stream_handler, - # new connections will stay alive even if this server - # is cancelled - handler_nursery=self._root_nursery, - port=accept_port, host=accept_host, - ) - ) - log.debug("Started tcp server(s) on" # type: ignore - f" {[l.socket for l in listeners]}") - self._listeners.extend(listeners) - task_status.started() - - async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None: - # UNregister actor from the arbiter + self._server_down = trio.Event() try: - if arbiter_addr is not None: - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'unregister_actor', uid=self.uid) - except OSError: - log.warning(f"Unable to unregister {self.name} from arbiter") + async with trio.open_nursery() as server_n: + listeners: List[trio.abc.Listener] = await server_n.start( + partial( + trio.serve_tcp, + self._stream_handler, + # new connections will stay alive even if this server + # is cancelled + handler_nursery=handler_nursery, + port=accept_port, + host=accept_host, + ) + ) + log.debug("Started tcp server(s) on" # type: ignore + f" {[l.socket for l in listeners]}") + self._listeners.extend(listeners) + task_status.started(server_n) + finally: + # signal the server is down since nursery above terminated + self._server_down.set() async def cancel(self) -> None: """Cancel this actor. - The sequence in order is: - - cancelling all rpc tasks - - cancelling the channel server - - cancel the "root" nursery + The "deterministic" teardown sequence in order is: + - cancel all ongoing rpc tasks by cancel scope + - cancel the channel server to prevent new inbound + connections + - cancel the "service" nursery reponsible for + spawning new rpc tasks + - return control the parent channel message loop """ # cancel all ongoing rpc tasks - await self.cancel_rpc_tasks() - self.cancel_server() - self._root_nursery.cancel_scope.cancel() + with trio.CancelScope(shield=True): + await self.cancel_rpc_tasks() + self.cancel_server() + await self._server_down.wait() + self._service_n.cancel_scope.cancel() + + return True + + # XXX: hard kill logic if needed? + # def _hard_mofo_kill(self): + # # If we're the root actor or zombied kill everything + # if self._parent_chan is None: # TODO: more robust check + # root = trio.lowlevel.current_root_task() + # for n in root.child_nurseries: + # n.cancel_scope.cancel() async def _cancel_task(self, cid, chan): """Cancel a local task by call-id / channel. @@ -804,11 +865,12 @@ class Actor: """Cancel the internal channel server nursery thereby preventing any new inbound connections from being established. """ - log.debug("Shutting down channel server") - self._server_nursery.cancel_scope.cancel() + if self._server_n: + log.debug("Shutting down channel server") + self._server_n.cancel_scope.cancel() @property - def accept_addr(self) -> Tuple[str, int]: + def accept_addr(self) -> Optional[Tuple[str, int]]: """Primary address to which the channel server is bound. """ # throws OSError on failure