Do immediate remote task cancels
As for `Actor.cancel()` requests, do the same for `Actor._cancel_task()` but use `_invoke()` to ensure correct msg transactions with caller. Don't cancel task cancels on a cancel-all-tasks operation in attempt at more determinism.zombie_lord_infinite
							parent
							
								
									5048c3534f
								
							
						
					
					
						commit
						a3cdba0577
					
				| 
						 | 
					@ -685,29 +685,30 @@ class Actor:
 | 
				
			||||||
                                await pdb_complete.wait()
 | 
					                                await pdb_complete.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # we immediately start the runtime machinery shutdown
 | 
					                            # we immediately start the runtime machinery shutdown
 | 
				
			||||||
                            await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
 | 
					                            with trio.CancelScope(shield=True):
 | 
				
			||||||
 | 
					                                # self.cancel() was called so kill this msg loop
 | 
				
			||||||
 | 
					                                # and break out into ``_async_main()``
 | 
				
			||||||
 | 
					                                log.cancel(
 | 
				
			||||||
 | 
					                                    f"Actor {self.uid} was remotely cancelled; "
 | 
				
			||||||
 | 
					                                    "waiting on cancellation completion..")
 | 
				
			||||||
 | 
					                                await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
 | 
				
			||||||
 | 
					                                # await self._cancel_complete.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # self.cancel() was called so kill this msg loop
 | 
					 | 
				
			||||||
                            # and break out into ``_async_main()``
 | 
					 | 
				
			||||||
                            log.cancel(
 | 
					 | 
				
			||||||
                                f"Actor {self.uid} was remotely cancelled; "
 | 
					 | 
				
			||||||
                                "waiting on cancellation completion..")
 | 
					 | 
				
			||||||
                            await self._cancel_complete.wait()
 | 
					 | 
				
			||||||
                            loop_cs.cancel()
 | 
					                            loop_cs.cancel()
 | 
				
			||||||
                            break
 | 
					                            break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        elif funcname == '_cancel_task':
 | 
					                        if funcname == '_cancel_task':
 | 
				
			||||||
 | 
					 | 
				
			||||||
                            # XXX: a special case is made here for
 | 
					 | 
				
			||||||
                            # remote calls since we don't want the
 | 
					 | 
				
			||||||
                            # remote actor have to know which channel
 | 
					 | 
				
			||||||
                            # the task is associated with and we can't
 | 
					 | 
				
			||||||
                            # pass non-primitive types between actors.
 | 
					 | 
				
			||||||
                            # This means you can use:
 | 
					 | 
				
			||||||
                            #    Portal.run('self', '_cancel_task, cid=did)
 | 
					 | 
				
			||||||
                            # without passing the `chan` arg.
 | 
					 | 
				
			||||||
                            kwargs['chan'] = chan
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            # we immediately start the runtime machinery shutdown
 | 
				
			||||||
 | 
					                            with trio.CancelScope(shield=True):
 | 
				
			||||||
 | 
					                                # self.cancel() was called so kill this msg loop
 | 
				
			||||||
 | 
					                                # and break out into ``_async_main()``
 | 
				
			||||||
 | 
					                                kwargs['chan'] = chan
 | 
				
			||||||
 | 
					                                log.cancel(
 | 
				
			||||||
 | 
					                                    f"Actor {self.uid} was remotely cancelled; "
 | 
				
			||||||
 | 
					                                    "waiting on cancellation completion..")
 | 
				
			||||||
 | 
					                                await _invoke(self, cid, chan, func, kwargs, is_rpc=False)
 | 
				
			||||||
 | 
					                                continue
 | 
				
			||||||
                    else:
 | 
					                    else:
 | 
				
			||||||
                        # complain to client about restricted modules
 | 
					                        # complain to client about restricted modules
 | 
				
			||||||
                        try:
 | 
					                        try:
 | 
				
			||||||
| 
						 | 
					@ -998,7 +999,7 @@ class Actor:
 | 
				
			||||||
            raise
 | 
					            raise
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        finally:
 | 
					        finally:
 | 
				
			||||||
            log.info("Root nursery complete")
 | 
					            log.info("Runtime nursery complete")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # tear down all lifetime contexts if not in guest mode
 | 
					            # tear down all lifetime contexts if not in guest mode
 | 
				
			||||||
            # XXX: should this just be in the entrypoint?
 | 
					            # XXX: should this just be in the entrypoint?
 | 
				
			||||||
| 
						 | 
					@ -1097,7 +1098,7 @@ class Actor:
 | 
				
			||||||
        self._service_n.start_soon(self.cancel)
 | 
					        self._service_n.start_soon(self.cancel)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def cancel(self) -> bool:
 | 
					    async def cancel(self) -> bool:
 | 
				
			||||||
        """Cancel this actor.
 | 
					        """Cancel this actor's runtime.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        The "deterministic" teardown sequence in order is:
 | 
					        The "deterministic" teardown sequence in order is:
 | 
				
			||||||
            - cancel all ongoing rpc tasks by cancel scope
 | 
					            - cancel all ongoing rpc tasks by cancel scope
 | 
				
			||||||
| 
						 | 
					@ -1190,18 +1191,20 @@ class Actor:
 | 
				
			||||||
        registered for each.
 | 
					        registered for each.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        tasks = self._rpc_tasks
 | 
					        tasks = self._rpc_tasks
 | 
				
			||||||
        log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
 | 
					        if tasks:
 | 
				
			||||||
        for (chan, cid) in tasks.copy():
 | 
					            log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
 | 
				
			||||||
            if only_chan is not None:
 | 
					            for (chan, cid), (scope, func, is_complete) in tasks.copy().items():
 | 
				
			||||||
                if only_chan != chan:
 | 
					                if only_chan is not None:
 | 
				
			||||||
                    continue
 | 
					                    if only_chan != chan:
 | 
				
			||||||
 | 
					                        continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: this should really done in a nursery batch
 | 
					                # TODO: this should really done in a nursery batch
 | 
				
			||||||
            await self._cancel_task(cid, chan)
 | 
					                if func != self._cancel_task:
 | 
				
			||||||
 | 
					                    await self._cancel_task(cid, chan)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        log.cancel(
 | 
					            log.cancel(
 | 
				
			||||||
            f"Waiting for remaining rpc tasks to complete {tasks}")
 | 
					                f"Waiting for remaining rpc tasks to complete {tasks}")
 | 
				
			||||||
        await self._ongoing_rpc_tasks.wait()
 | 
					            await self._ongoing_rpc_tasks.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def cancel_server(self) -> None:
 | 
					    def cancel_server(self) -> None:
 | 
				
			||||||
        """Cancel the internal channel server nursery thereby
 | 
					        """Cancel the internal channel server nursery thereby
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue