mirror of https://github.com/skygpu/skynet.git
				
				
				
			Add table_index system in poller in order for daemon to be aware of stale data
							parent
							
								
									30eaa6c194
								
							
						
					
					
						commit
						8828fa13fc
					
				| 
						 | 
					@ -183,9 +183,16 @@ async def maybe_serve_one(
 | 
				
			||||||
async def dgpu_serve_forever(config: Config, conn: NetConnector):
 | 
					async def dgpu_serve_forever(config: Config, conn: NetConnector):
 | 
				
			||||||
    await maybe_update_tui_balance(conn)
 | 
					    await maybe_update_tui_balance(conn)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    last_poll_idx = -1
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        while True:
 | 
					        while True:
 | 
				
			||||||
            await conn.wait_data_update()
 | 
					            await conn.wait_data_update()
 | 
				
			||||||
 | 
					            if conn.poll_index == last_poll_idx:
 | 
				
			||||||
 | 
					                await trio.sleep(config.poll_time)
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            last_poll_idx = conn.poll_index
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            queue = conn._tables['queue']
 | 
					            queue = conn._tables['queue']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            random.shuffle(queue)
 | 
					            random.shuffle(queue)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -67,6 +67,8 @@ class NetConnector:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.ipfs_client = AsyncIPFSHTTP(config.ipfs_url)
 | 
					        self.ipfs_client = AsyncIPFSHTTP(config.ipfs_url)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # poll_index is used to detect stale data
 | 
				
			||||||
 | 
					        self.poll_index = 0
 | 
				
			||||||
        self._tables = {
 | 
					        self._tables = {
 | 
				
			||||||
            'queue': [],
 | 
					            'queue': [],
 | 
				
			||||||
            'requests': {},
 | 
					            'requests': {},
 | 
				
			||||||
| 
						 | 
					@ -180,6 +182,7 @@ class NetConnector:
 | 
				
			||||||
            self._data_event.set()
 | 
					            self._data_event.set()
 | 
				
			||||||
            await trio.sleep(max(poll_time - elapsed, 0.1))
 | 
					            await trio.sleep(max(poll_time - elapsed, 0.1))
 | 
				
			||||||
            self._data_event = trio.Event()
 | 
					            self._data_event = trio.Event()
 | 
				
			||||||
 | 
					            self.poll_index += 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def should_cancel_work(self, request_id: int) -> bool:
 | 
					    async def should_cancel_work(self, request_id: int) -> bool:
 | 
				
			||||||
        logging.info('should cancel work?')
 | 
					        logging.info('should cancel work?')
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue