diff --git a/skynet/dgpu/__init__.py b/skynet/dgpu/__init__.py index 24e2f7a..2d834f8 100755 --- a/skynet/dgpu/__init__.py +++ b/skynet/dgpu/__init__.py @@ -27,6 +27,8 @@ async def open_worker(config: Config): if tui: n.start_soon(tui.run) + n.start_soon(conn.iter_poll_update, config.poll_time) + yield conn except *urwid.ExitMainLoop: diff --git a/skynet/dgpu/daemon.py b/skynet/dgpu/daemon.py index eb002b5..fabe5a9 100755 --- a/skynet/dgpu/daemon.py +++ b/skynet/dgpu/daemon.py @@ -182,9 +182,11 @@ async def maybe_serve_one( async def dgpu_serve_forever(config: Config, conn: NetConnector): await maybe_update_tui_balance(conn) + try: - async for tables in conn.iter_poll_update(config.poll_time): - queue = tables['queue'] + while True: + await conn.wait_data_update() + queue = conn._tables['queue'] random.shuffle(queue) queue = sorted( diff --git a/skynet/dgpu/network.py b/skynet/dgpu/network.py index 8b45197..a54c032 100755 --- a/skynet/dgpu/network.py +++ b/skynet/dgpu/network.py @@ -72,6 +72,7 @@ class NetConnector: 'requests': {}, 'results': [] } + self._data_event = trio.Event() maybe_update_tui(lambda tui: tui.set_header_text(new_worker_name=self.config.account)) @@ -139,8 +140,7 @@ class NetConnector: async def get_full_queue_snapshot(self): ''' - Keep in-sync with latest (telos chain's smart-contract) table - state by polling (currently with period 1s). + Get a "snapshot" of current contract table state ''' snap = { @@ -164,17 +164,22 @@ class NetConnector: return snap - async def iter_poll_update(self, poll_time: float) -> AsyncGenerator[dict, None]: + async def wait_data_update(self): + await self._data_event.wait() + + async def iter_poll_update(self, poll_time: float): ''' - Long running task, olls gpu contract tables yields latest table rows + Long running task, polls gpu contract tables latest table rows, + awakes any self._data_event waiters ''' while True: start_time = time.time() self._tables = await self.get_full_queue_snapshot() elapsed = time.time() - start_time - yield self._tables + self._data_event.set() await trio.sleep(max(poll_time - elapsed, 0.1)) + self._data_event = trio.Event() async def should_cancel_work(self, request_id: int) -> bool: logging.info('should cancel work?')