diff --git a/skynet.toml.example b/skynet.toml.example index 97423e3..337cedf 100644 --- a/skynet.toml.example +++ b/skynet.toml.example @@ -1,47 +1,41 @@ -# config sections are optional, depending on which services -# you wish to run +# general skynet config +[skynet] +contract = 'telos.gpu2' +node_url = 'https://testnet.skygpu.net' +hyperion_url = 'https://testnet.skygpu.net' +ipfs_url = 'http://127.0.0.1:5001' +# optional set only if local ipfs node != gateway node +ipfs_gateway_url = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv' + +# worker config (optional) [skynet.dgpu] account = 'testworkerX' permission = 'active' key = '5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' -node_url = 'https://testnet.skygpu.net' -hyperion_url = 'https://testnet.skygpu.net' -ipfs_gateway_url = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv' -ipfs_url = 'http://127.0.0.1:5001' hf_home = 'hf_home' hf_token = 'hf_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx' auto_withdraw = true non_compete = [] api_bind = '127.0.0.1:42690' +# telegram bot config (optional) [skynet.telegram] account = 'telegram' permission = 'active' key = '5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' -node_url = 'https://testnet.skygpu.net' -hyperion_url = 'https://testnet.skygpu.net' -ipfs_gateway_url = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv' -ipfs_url = 'http://127.0.0.1:5001' token = 'XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' +# discord bot config (optional) [skynet.discord] account = 'discord' permission = 'active' key = '5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' -node_url = 'https://testnet.skygpu.net' -hyperion_url = 'https://testnet.skygpu.net' -ipfs_gateway_url = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv' -ipfs_url = 'http://127.0.0.1:5001' token = 'XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' -[skynet.pinner] -hyperion_url = 'https://testnet.skygpu.net' -ipfs_url = 'http://127.0.0.1:5001' - +# cli utils account (optional) [skynet.user] account = 'testuser' permission = 'active' key = '5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' -node_url = 'https://testnet.skygpu.net' diff --git a/skynet/dgpu/compute.py b/skynet/dgpu/compute.py index 07bbb6a..c63696b 100644 --- a/skynet/dgpu/compute.py +++ b/skynet/dgpu/compute.py @@ -7,7 +7,7 @@ import logging from hashlib import sha256 from typing import Any -import zipfile + from PIL import Image from diffusers import DiffusionPipeline @@ -70,6 +70,12 @@ class SkynetMM: if 'hf_home' in config: self.cache_dir = config['hf_home'] + self.num_gpus = torch.cuda.device_count() + self.gpus = [ + torch.cuda.get_device_properties(i) + for i in range(self.num_gpus) + ] + self._models = {} for model in self.initial_models: self.load_model(model, False, force=True) diff --git a/skynet/dgpu/daemon.py b/skynet/dgpu/daemon.py index dc356a9..ef726c8 100644 --- a/skynet/dgpu/daemon.py +++ b/skynet/dgpu/daemon.py @@ -1,7 +1,6 @@ #!/usr/bin/python import json -import random import logging import time import traceback @@ -110,14 +109,14 @@ class SkynetDGPUDaemon: def find_best_requests(self) -> list[dict]: queue = self.conn.get_queue() - for _ in range(3): - random.shuffle(queue) + # for _ in range(3): + # random.shuffle(queue) - queue = sorted( - queue, - key=lambda req: convert_reward_to_int(req['reward']), - reverse=True - ) + # queue = sorted( + # queue, + # key=lambda req: convert_reward_to_int(req['reward']), + # reverse=True + # ) requests = [] for req in queue: @@ -161,7 +160,57 @@ class SkynetDGPUDaemon: return requests + async def sync_worker_on_chain_data(self, is_online: bool) -> bool: + # check worker is registered + me = self.conn.get_on_chain_worker_info(self.account) + if not me: + ec, out = await self.conn.register_worker() + if ec != 0: + raise DGPUDaemonError(f'Couldn\'t register worker! {out}') + + me = self.conn.get_on_chain_worker_info(self.account) + + # find if reported on chain gpus match local + found_difference = False + for i in range(self.mm.num_gpus): + chain_gpu = me['cards'][i] + + gpu = self.mm.gpus[i] + gpu_v = f'{gpu.major}.{gpu.minor}' + + found_difference = gpu.name != chain_gpu['card_name'] + found_difference = gpu_v != chain_gpu['version'] + found_difference = gpu.total_memory != chain_gpu['total_memory'] + found_difference = gpu.multi_processor_count != chain_gpu['mp_count'] + if found_difference: + break + + # difference found, flush and re-report + if found_difference: + await self.conn.flush_cards() + for i, gpu in enumerate(self.mm.gpus): + ec, _ = await self.conn.add_card( + gpu.name, f'{gpu.major}.{gpu.minor}', + gpu.total_memory, gpu.multi_processor_count, + '', + is_online + ) + if ec != 0: + raise DGPUDaemonError(f'error while reporting card {i}') + + return found_difference + + async def all_gpu_set_online_flag(self, is_online: bool): + for i, chain_gpu in enumerate(me['cards']): + if chain_gpu['is_online'] != is_online: + await self.conn.toggle_card(i) + async def serve_forever(self): + + diff = await self.sync_worker_on_chain_data(True) + if not diff: + await self.all_gpu_set_online_flag(True) + try: while True: if self.auto_withdraw: @@ -234,3 +283,6 @@ class SkynetDGPUDaemon: except KeyboardInterrupt: ... + + await self.sync_worker_on_chain_data(False) + await self.all_gpu_set_online_flag(False) diff --git a/skynet/dgpu/errors.py b/skynet/dgpu/errors.py index 91db585..e5deaa6 100644 --- a/skynet/dgpu/errors.py +++ b/skynet/dgpu/errors.py @@ -1,5 +1,7 @@ #!/usr/bin/python +class DGPUDaemonError(BaseException): + ... class DGPUComputeError(BaseException): ... diff --git a/skynet/dgpu/network.py b/skynet/dgpu/network.py index 90997bf..a6b6c14 100644 --- a/skynet/dgpu/network.py +++ b/skynet/dgpu/network.py @@ -68,6 +68,10 @@ class SkynetGPUConnector: if 'ipfs_domain' in config: self.ipfs_domain = config['ipfs_domain'] + self.worker_url = '' + if 'worker_url' in config: + self.worker_url = config['worker_url'] + self._update_delta = 1 self._cache: dict[str, tuple[float, Any]] = {} @@ -90,12 +94,16 @@ class SkynetGPUConnector: return default async def data_updater_task(self): + tasks = ( + (self._get_work_requests_last_hour, 'queue'), + (self._find_my_results, 'my_results'), + (self._get_workers, 'workers') + ) + while True: async with trio.open_nursery() as n: - n.start_soon( - self._cache_set, self._get_work_requests_last_hour, 'queue') - n.start_soon( - self._cache_set, self._find_my_results, 'my_results') + for task in tasks: + n.start_soon(self._cache_set, *task) await trio.sleep(self._update_delta) @@ -105,6 +113,9 @@ class SkynetGPUConnector: def get_my_results(self): return self._cache_get('my_results', default=[]) + def get_workers(self): + return self._cache_get('workers', default=[]) + def get_status_for_request(self, request_id: int) -> list[dict] | None: request: dict | None = next(( req @@ -135,8 +146,8 @@ class SkynetGPUConnector: self.cleos.aget_table, self.contract, self.contract, 'queue', index_position=2, - key_type='i64', - lower_bound=int(time.time()) - 3600 + order='asc', + limit=1000 ), ret_fail=[]) async def _find_my_results(self): @@ -152,6 +163,15 @@ class SkynetGPUConnector: ) ) + async def _get_workers(self) -> list[dict]: + logging.info('get_workers') + return await failable( + partial( + self.cleos.aget_table, + self.contract, self.contract, 'workers' + ) + ) + async def get_global_config(self): logging.info('get_global_config') rows = await failable( @@ -169,7 +189,7 @@ class SkynetGPUConnector: rows = await failable( partial( self.cleos.aget_table, - 'telos.gpu', 'telos.gpu', 'users', + self.contract, self.contract, 'users', index_position=1, key_type='name', lower_bound=self.account, @@ -181,12 +201,81 @@ class SkynetGPUConnector: else: return None + def get_on_chain_worker_info(self, worker: str): + return next(( + w for w in self.get_workers() + if w['account'] == w + ), None) + + async def register_worker(self): + logging.info(f'registering worker') + return await failable( + partial( + self.cleos.a_push_action, + self.contract, + 'regworker', + { + 'account': self.account, + 'url': self.worker_url + } + ) + ) + + async def add_card( + self, + card_name: str, + version: str, + total_memory: int, + mp_count: int, + extra: str, + is_online: bool + ): + logging.info(f'adding card: {card_name} {version}') + return await failable( + partial( + self.cleos.a_push_action, + self.contract, + 'addcard', + { + 'worker': self.account, + 'card_name': card_name, + 'version': version, + 'total_memory': total_memory, + 'mp_count': mp_count, + 'extra': extra, + 'is_online': is_online + } + ) + ) + + async def toggle_card(self, index: int): + logging.info(f'toggle card {index}') + return await failable( + partial( + self.cleos.a_push_action, + self.contract, + 'togglecard', + {'worker': self.account, 'index': index} + ) + ) + + async def flush_cards(self): + logging.info('flushing cards...') + return await failable( + partial( + self.cleos.a_push_action, + self.contract, + 'flushcards', + {'worker': self.account} + ) + ) + async def begin_work(self, request_id: int): logging.info('begin_work') return await failable( partial( self.cleos.a_push_action, - 'telos.gpu', + self.contract, 'workbegin', { 'worker': self.account, @@ -203,7 +292,7 @@ class SkynetGPUConnector: return await failable( partial( self.cleos.a_push_action, - 'telos.gpu', + self.contract, 'workcancel', { 'worker': self.account, @@ -226,7 +315,7 @@ class SkynetGPUConnector: await failable( partial( self.cleos.a_push_action, - 'telos.gpu', + self.contract, 'withdraw', { 'user': self.account, @@ -248,7 +337,7 @@ class SkynetGPUConnector: return await failable( partial( self.cleos.a_push_action, - 'telos.gpu', + self.contract, 'submit', { 'worker': self.account,