mirror of https://github.com/skygpu/skynet.git
				
				
				
			Switch to non interator poller, NetConnector has wait_data_update() now
							parent
							
								
									5d67b3cd60
								
							
						
					
					
						commit
						30eaa6c194
					
				| 
						 | 
					@ -27,6 +27,8 @@ async def open_worker(config: Config):
 | 
				
			||||||
            if tui:
 | 
					            if tui:
 | 
				
			||||||
                n.start_soon(tui.run)
 | 
					                n.start_soon(tui.run)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            n.start_soon(conn.iter_poll_update, config.poll_time)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            yield conn
 | 
					            yield conn
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    except *urwid.ExitMainLoop:
 | 
					    except *urwid.ExitMainLoop:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -182,9 +182,11 @@ async def maybe_serve_one(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def dgpu_serve_forever(config: Config, conn: NetConnector):
 | 
					async def dgpu_serve_forever(config: Config, conn: NetConnector):
 | 
				
			||||||
    await maybe_update_tui_balance(conn)
 | 
					    await maybe_update_tui_balance(conn)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        async for tables in conn.iter_poll_update(config.poll_time):
 | 
					        while True:
 | 
				
			||||||
            queue = tables['queue']
 | 
					            await conn.wait_data_update()
 | 
				
			||||||
 | 
					            queue = conn._tables['queue']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            random.shuffle(queue)
 | 
					            random.shuffle(queue)
 | 
				
			||||||
            queue = sorted(
 | 
					            queue = sorted(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -72,6 +72,7 @@ class NetConnector:
 | 
				
			||||||
            'requests': {},
 | 
					            'requests': {},
 | 
				
			||||||
            'results': []
 | 
					            'results': []
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					        self._data_event = trio.Event()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        maybe_update_tui(lambda tui: tui.set_header_text(new_worker_name=self.config.account))
 | 
					        maybe_update_tui(lambda tui: tui.set_header_text(new_worker_name=self.config.account))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -139,8 +140,7 @@ class NetConnector:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def get_full_queue_snapshot(self):
 | 
					    async def get_full_queue_snapshot(self):
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        Keep in-sync with latest (telos chain's smart-contract) table
 | 
					        Get a "snapshot" of current contract table state
 | 
				
			||||||
        state by polling (currently with period 1s).
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        snap = {
 | 
					        snap = {
 | 
				
			||||||
| 
						 | 
					@ -164,17 +164,22 @@ class NetConnector:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return snap
 | 
					        return snap
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def iter_poll_update(self, poll_time: float) -> AsyncGenerator[dict, None]:
 | 
					    async def wait_data_update(self):
 | 
				
			||||||
 | 
					        await self._data_event.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async def iter_poll_update(self, poll_time: float):
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        Long running task, olls gpu contract tables yields latest table rows
 | 
					        Long running task, polls gpu contract tables latest table rows,
 | 
				
			||||||
 | 
					        awakes any self._data_event waiters
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        while True:
 | 
					        while True:
 | 
				
			||||||
            start_time = time.time()
 | 
					            start_time = time.time()
 | 
				
			||||||
            self._tables = await self.get_full_queue_snapshot()
 | 
					            self._tables = await self.get_full_queue_snapshot()
 | 
				
			||||||
            elapsed = time.time() - start_time
 | 
					            elapsed = time.time() - start_time
 | 
				
			||||||
            yield self._tables
 | 
					            self._data_event.set()
 | 
				
			||||||
            await trio.sleep(max(poll_time - elapsed, 0.1))
 | 
					            await trio.sleep(max(poll_time - elapsed, 0.1))
 | 
				
			||||||
 | 
					            self._data_event = trio.Event()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def should_cancel_work(self, request_id: int) -> bool:
 | 
					    async def should_cancel_work(self, request_id: int) -> bool:
 | 
				
			||||||
        logging.info('should cancel work?')
 | 
					        logging.info('should cancel work?')
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue