diff --git a/tractor/_actor.py b/tractor/_actor.py index 94eab7f..d0c0ac2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -168,8 +168,8 @@ class Actor: is_arbiter: bool = False # nursery placeholders filled in by `_async_main()` after fork - _root_n: trio.Nursery = None - _service_n: trio.Nursery = None + _root_n: Optional[trio.Nursery] = None + _service_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None # Information about `__main__` from parent @@ -497,6 +497,7 @@ class Actor: # spin up a task for the requested function log.debug(f"Spawning task for {func}") + assert self._service_n cs = await self._service_n.start( partial(_invoke, self, cid, chan, func, kwargs), name=funcname, @@ -551,7 +552,7 @@ class Actor: async def _chan_to_parent( self, parent_addr: Optional[Tuple[str, int]], - ) -> Channel: + ) -> Tuple[Channel, Optional[Tuple[str, int]]]: try: # Connect back to the parent actor and conduct initial # handshake. From this point on if we error, we @@ -564,6 +565,8 @@ class Actor: # Initial handshake: swap names. await self._do_handshake(chan) + accept_addr: Optional[Tuple[str, int]] = None + if self._spawn_method == "trio": # Receive runtime state from our parent parent_data = await chan.recv() @@ -578,9 +581,6 @@ class Actor: for attr, value in parent_data.items(): setattr(self, attr, value) - else: # mp - accept_addr = None - return chan, accept_addr except OSError: # failed to connect @@ -636,6 +636,7 @@ class Actor: # a deterministic way. async with trio.open_nursery() as root_nursery: self._root_n = root_nursery + assert self._root_n async with trio.open_nursery() as service_nursery: # This nursery is used to handle all inbound @@ -643,6 +644,7 @@ class Actor: # is killed, connections can continue to process # in the background until this nursery is cancelled. self._service_n = service_nursery + assert self._service_n # Startup up the channel server with, # - subactor: the bind address sent to us by our parent @@ -650,6 +652,7 @@ class Actor: # - root actor: the ``accept_addr`` passed to this method assert accept_addr host, port = accept_addr + self._server_n = await service_nursery.start( partial( self._serve_forever, @@ -765,7 +768,7 @@ class Actor: # (host, port) to bind for channel server accept_host: Tuple[str, int] = None, accept_port: int = 0, - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: """Start the channel server, begin listening for new connections. @@ -794,7 +797,7 @@ class Actor: # signal the server is down since nursery above terminated self._server_down.set() - async def cancel(self) -> None: + async def cancel(self) -> bool: """Cancel this actor. The "deterministic" teardown sequence in order is: @@ -807,10 +810,16 @@ class Actor: """ # cancel all ongoing rpc tasks with trio.CancelScope(shield=True): + # kill all ongoing tasks await self.cancel_rpc_tasks() + + # stop channel server self.cancel_server() await self._server_down.wait() - self._service_n.cancel_scope.cancel() + + # rekt all channel loops + if self._service_n: + self._service_n.cancel_scope.cancel() return True diff --git a/tractor/_portal.py b/tractor/_portal.py index 5181af1..e749ec6 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -338,9 +338,9 @@ async def open_portal( if channel.uid is None: await actor._do_handshake(channel) - msg_loop_cs = None + msg_loop_cs: Optional[trio.CancelScope] = None if start_msg_loop: - msg_loop_cs: trio.CancelScope = await nursery.start( + msg_loop_cs = await nursery.start( partial( actor._process_messages, channel,