diff --git a/skynet.ini.example b/skynet.ini.example index 1faf8b8..b498dd8 100644 --- a/skynet.ini.example +++ b/skynet.ini.example @@ -1,11 +1,24 @@ -[skynet.account] -name = xxxxxxxxxxxx -permission = active -key = EOSXxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx - [skynet.dgpu] +account = testworkerX +permission = active +key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +node_url = https://skynet.ancap.tech +hyperion_url = https://skynet.ancap.tech +ipfs_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv + hf_home = hf_home hf_token = hf_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx +auto_withdraw = True + [skynet.telegram] +account = telegram +permission = active +key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +node_url = https://skynet.ancap.tech +hyperion_url = https://skynet.ancap.tech +ipfs_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv + token = XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx diff --git a/skynet/cli.py b/skynet/cli.py index e1ed879..a9f240a 100644 --- a/skynet/cli.py +++ b/skynet/cli.py @@ -17,8 +17,8 @@ import docker import asyncio import requests -from leap.cleos import CLEOS, default_nodeos_image -from leap.sugar import get_container, collect_stdout +from leap.cleos import CLEOS +from leap.sugar import collect_stdout from leap.hyperion import HyperionAPI from .db import open_new_database @@ -46,7 +46,7 @@ def skynet(*args, **kwargs): @click.option('--seed', '-S', default=None) def txt2img(*args, **kwargs): from . import utils - _, hf_token, _, cfg = init_env_from_config() + _, hf_token, _ = init_env_from_config() utils.txt2img(hf_token, **kwargs) @click.command() @@ -61,7 +61,7 @@ def txt2img(*args, **kwargs): @click.option('--seed', '-S', default=None) def img2img(model, prompt, input, output, strength, guidance, steps, seed): from . import utils - _, hf_token, _, cfg = init_env_from_config() + _, hf_token, _ = init_env_from_config() utils.img2img( hf_token, model=model, @@ -89,7 +89,7 @@ def upscale(input, output, model): @skynet.command() def download(): from . import utils - _, hf_token, _, cfg = init_env_from_config() + _, hf_token, _ = init_env_from_config() utils.download_all_models(hf_token) @skynet.command() @@ -122,7 +122,11 @@ def enqueue( **kwargs ): key, account, permission = load_account_info( - key, account, permission) + 'user', key, account, permission) + + node_url, _, _ = load_endpoint_info( + 'user', node_url, None, None) + with open_cleos(node_url, key=key) as cleos: if not kwargs['seed']: kwargs['seed'] = random.randint(0, 10e9) @@ -157,6 +161,12 @@ def clean( key: str | None, node_url: str, ): + key, account, permission = load_account_info( + 'user', key, account, permission) + + node_url, _, _ = load_endpoint_info( + 'user', node_url, None, None) + logging.basicConfig(level=loglevel) cleos = CLEOS(None, None, url=node_url, remote=node_url) trio.run( @@ -173,6 +183,8 @@ def clean( @click.option( '--node-url', '-n', default='https://skynet.ancap.tech') def queue(node_url: str): + node_url, _, _ = load_endpoint_info( + 'user', node_url, None, None) resp = requests.post( f'{node_url}/v1/chain/get_table_rows', json={ @@ -189,6 +201,8 @@ def queue(node_url: str): '--node-url', '-n', default='https://skynet.ancap.tech') @click.argument('request-id') def status(node_url: str, request_id: int): + node_url, _, _ = load_endpoint_info( + 'user', node_url, None, None) resp = requests.post( f'{node_url}/v1/chain/get_table_rows', json={ @@ -218,7 +232,11 @@ def dequeue( request_id: int ): key, account, permission = load_account_info( - key, account, permission) + 'user', key, account, permission) + + node_url, _, _ = load_endpoint_info( + 'user', node_url, None, None) + with open_cleos(node_url, key=key) as cleos: ec, out = cleos.push_action( 'telos.gpu', 'dequeue', [account, request_id], f'{account}@{permission}' @@ -236,8 +254,6 @@ def dequeue( '--key', '-k', default=None) @click.option( '--node-url', '-n', default='https://skynet.ancap.tech') -@click.option( - '--verifications', '-v', default=1) @click.option( '--token-contract', '-c', default='eosio.token') @click.option( @@ -247,15 +263,17 @@ def config( permission: str, key: str | None, node_url: str, - verifications: int, token_contract: str, token_symbol: str ): key, account, permission = load_account_info( - key, account, permission) + 'user', key, account, permission) + + node_url, _, _ = load_endpoint_info( + 'user', node_url, None, None) with open_cleos(node_url, key=key) as cleos: ec, out = cleos.push_action( - 'telos.gpu', 'config', [verifications, token_contract, token_symbol], f'{account}@{permission}' + 'telos.gpu', 'config', [token_contract, token_symbol], f'{account}@{permission}' ) print(collect_stdout(out)) @@ -279,7 +297,10 @@ def deposit( quantity: str ): key, account, permission = load_account_info( - key, account, permission) + 'user', key, account, permission) + + node_url, _, _ = load_endpoint_info( + 'user', node_url, None, None) with open_cleos(node_url, key=key) as cleos: ec, out = cleos.transfer_token(account, 'telos.gpu', quantity) @@ -304,45 +325,22 @@ def nodeos(): ... @run.command() -@click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--loglevel', '-l', default='INFO', help='Logging level') @click.option( - '--account', '-a', default='testworker1') -@click.option( - '--permission', '-p', default='active') -@click.option( - '--key', '-k', default=None) -@click.option( - '--auto-withdraw', '-w', default=True) -@click.option( - '--node-url', '-n', default='https://skynet.ancap.tech') -@click.option( - '--ipfs-url', '-n', default=DEFAULT_IPFS_REMOTE) -@click.option( - '--algos', '-A', default=json.dumps(['midj'])) + '--config-path', '-c', default='skynet.ini') def dgpu( loglevel: str, - account: str, - permission: str, - key: str | None, - auto_withdraw: bool, - node_url: str, - ipfs_url: str, - algos: list[str] + config_path: str ): from .dgpu import open_dgpu_node - key, account, permission = load_account_info( - key, account, permission) + logging.basicConfig(level=loglevel) - trio.run( - partial( - open_dgpu_node, - account, permission, - CLEOS(None, None, url=node_url, remote=node_url), - ipfs_url, - auto_withdraw=auto_withdraw, - key=key, initial_algos=json.loads(algos) - )) + config = load_skynet_ini(file_path=config_path) + + assert 'skynet.dgpu' in config + + trio.run(open_dgpu_node, config['skynet.dgpu']) @run.command() @@ -379,10 +377,13 @@ def telegram( ): logging.basicConfig(level=loglevel) - key, account, permission = load_account_info( - key, account, permission) + _, _, tg_token = init_env_from_config() - _, _, tg_token, cfg = init_env_from_config() + key, account, permission = load_account_info( + 'telegram', key, account, permission) + + node_url, _, ipfs_url = load_endpoint_info( + 'telegram', node_url, None, None) async def _async_main(): frontend = SkynetTelegramFrontend( @@ -485,7 +486,7 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): async def task_pin(cid: str): logging.info(f'pinning {cid}...') - for i in range(6): + for _ in range(6): try: with trio.move_on_after(5): resp = await ipfs_node.a_pin(cid) diff --git a/skynet/config.py b/skynet/config.py index ace1dd0..fc8f2d9 100644 --- a/skynet/config.py +++ b/skynet/config.py @@ -1,9 +1,11 @@ #!/usr/bin/python import os +import json from pathlib import Path from configparser import ConfigParser +from re import sub from .constants import DEFAULT_CONFIG_PATH @@ -13,45 +15,89 @@ def load_skynet_ini( ): config = ConfigParser() config.read(file_path) + return config def init_env_from_config( + hf_token: str | None = None, + hf_home: str | None = None, + tg_token: str | None = None, file_path=DEFAULT_CONFIG_PATH ): - config = load_skynet_ini() + config = load_skynet_ini(file_path=file_path) if 'HF_TOKEN' in os.environ: hf_token = os.environ['HF_TOKEN'] - else: - hf_token = config['skynet.dgpu']['hf_token'] + + elif 'skynet.dgpu' in config: + sub_config = config['skynet.dgpu'] + if 'hf_token' in sub_config: + hf_token = sub_config['hf_token'] if 'HF_HOME' in os.environ: hf_home = os.environ['HF_HOME'] - else: - hf_home = config['skynet.dgpu']['hf_home'] + + elif 'skynet.dgpu' in config: + sub_config = config['skynet.dgpu'] + if 'hf_home' in sub_config: + hf_home = sub_config['hf_home'] if 'TG_TOKEN' in os.environ: tg_token = os.environ['TG_TOKEN'] - else: - tg_token = config['skynet.telegram']['token'] + elif 'skynet.telegram' in config: + sub_config = config['skynet.telegram'] + if 'token' in sub_config: + tg_token = sub_config['token'] - return hf_home, hf_token, tg_token, config + return hf_home, hf_token, tg_token def load_account_info( - key, account, permission, + _type: str, + key: str | None = None, + account: str | None = None, + permission: str | None = None, file_path=DEFAULT_CONFIG_PATH ): - _, _, _, config = init_env_from_config() + config = load_skynet_ini(file_path=file_path) - if not key: - key = config['skynet.account']['key'] + type_key = f'skynet.{_type}' - if not account: - account = config['skynet.account']['name'] + if type_key in config: + sub_config = config[type_key] + if not key and 'key' in sub_config: + key = sub_config['key'] - if not permission: - permission = config['skynet.account']['permission'] + if not account and 'name' in sub_config: + account = sub_config['name'] + + if not permission and 'permission' in sub_config: + permission = sub_config['permission'] return key, account, permission + + +def load_endpoint_info( + _type: str, + node_url: str | None = None, + hyperion_url: str | None = None, + ipfs_url: str | None = None, + file_path=DEFAULT_CONFIG_PATH +): + config = load_skynet_ini(file_path=file_path) + + type_key = f'skynet.{_type}' + + if type_key in config: + sub_config = config[type_key] + if not node_url and 'node_url' in sub_config: + node_url = sub_config['node_url'] + + if not hyperion_url and 'hyperion_url' in sub_config: + hyperion_url = sub_config['hyperion_url'] + + if not ipfs_url and 'ipfs_url' in sub_config: + ipfs_url = sub_config['ipfs_url'] + + return node_url, hyperion_url, ipfs_url diff --git a/skynet/constants.py b/skynet/constants.py index 74fed5d..7e41d7a 100644 --- a/skynet/constants.py +++ b/skynet/constants.py @@ -1,21 +1,26 @@ #!/usr/bin/python -VERSION = '0.1a9' +VERSION = '0.1a10' DOCKER_RUNTIME_CUDA = 'skynet:runtime-cuda' -ALGOS = { - 'midj': 'prompthero/openjourney', - 'stable': 'runwayml/stable-diffusion-v1-5', - 'hdanime': 'Linaqruf/anything-v3.0', - 'waifu': 'hakurei/waifu-diffusion', - 'ghibli': 'nitrosocke/Ghibli-Diffusion', - 'van-gogh': 'dallinmackay/Van-Gogh-diffusion', - 'pokemon': 'lambdalabs/sd-pokemon-diffusers', - 'ink': 'Envvi/Inkpunk-Diffusion', - 'robot': 'nousr/robo-diffusion' +MODELS = { + 'prompthero/openjourney': { 'short': 'midj'}, + 'runwayml/stable-diffusion-v1-5': { 'short': 'stable'}, + 'Linaqruf/anything-v3.0': { 'short': 'hdanime'}, + 'hakurei/waifu-diffusion': { 'short': 'waifu'}, + 'nitrosocke/Ghibli-Diffusion': { 'short': 'ghibli'}, + 'dallinmackay/Van-Gogh-diffusion': { 'short': 'van-gogh'}, + 'lambdalabs/sd-pokemon-diffusers': { 'short': 'pokemon'}, + 'Envvi/Inkpunk-Diffusion': { 'short': 'ink'}, + 'nousr/robo-diffusion': { 'short': 'robot'} } +def get_model_by_shortname(short: str): + for model, info in MODELS.items(): + if short == info['short']: + return model + N = '\n' HELP_TEXT = f''' test art bot v{VERSION} @@ -36,7 +41,7 @@ config is individual to each user! /config algo NAME - select AI to use one of: -{N.join(ALGOS.keys())} +{N.join(MODELS.keys())} /config step NUMBER - set amount of iterations /config seed NUMBER - set the seed, deterministic results! @@ -115,8 +120,10 @@ DEFAULT_UPSCALER = None DEFAULT_CONFIG_PATH = 'skynet.ini' -DEFAULT_DGPU_MAX_TASKS = 2 -DEFAULT_INITAL_ALGOS = ['midj', 'stable', 'ink'] +DEFAULT_INITAL_MODELS = [ + 'prompthero/openjourney', + 'runwayml/stable-diffusion-v1-5' +] DATE_FORMAT = '%B the %dth %Y, %H:%M:%S' diff --git a/skynet/dgpu.py b/skynet/dgpu.py deleted file mode 100644 index 209dbc3..0000000 --- a/skynet/dgpu.py +++ /dev/null @@ -1,375 +0,0 @@ -#!/usr/bin/python - -import gc -import io -import json -import time -import logging -import traceback - -from PIL import Image -from typing import List, Optional -from hashlib import sha256 - -import trio -import asks -import torch - -from leap.cleos import CLEOS -from leap.sugar import * - -from realesrgan import RealESRGANer -from basicsr.archs.rrdbnet_arch import RRDBNet - -from .ipfs import open_ipfs_node, get_ipfs_file -from .utils import * -from .constants import * - - -def init_upscaler(model_path: str = 'weights/RealESRGAN_x4plus.pth'): - return RealESRGANer( - scale=4, - model_path=model_path, - dni_weight=None, - model=RRDBNet( - num_in_ch=3, - num_out_ch=3, - num_feat=64, - num_block=23, - num_grow_ch=32, - scale=4 - ), - half=True - ) - - -class DGPUComputeError(BaseException): - ... - - -async def open_dgpu_node( - account: str, - permission: str, - cleos: CLEOS, - remote_ipfs_node: str, - key: str = None, - initial_algos: Optional[List[str]] = None, - auto_withdraw: bool = True -): - - logging.basicConfig(level=logging.INFO) - logging.info(f'starting dgpu node!') - logging.info(f'launching toolchain container!') - - logging.info(f'loading models...') - - upscaler = init_upscaler() - initial_algos = ( - initial_algos - if initial_algos else DEFAULT_INITAL_ALGOS - ) - models = {} - for algo in initial_algos: - models[algo] = { - 'pipe': pipeline_for(algo), - 'generated': 0 - } - logging.info(f'loaded {algo}.') - - logging.info('memory summary:') - logging.info('\n' + torch.cuda.memory_summary()) - - def gpu_compute_one(method: str, params: dict, binext: Optional[bytes] = None): - match method: - case 'diffuse': - image = None - algo = params['algo'] - if binext: - algo += 'img' - image = Image.open(io.BytesIO(binext)) - w, h = image.size - logging.info(f'user sent img of size {image.size}') - - if w > 512 or h > 512: - image.thumbnail((512, 512)) - logging.info(f'resized it to {image.size}') - - if algo not in models: - if params['algo'] not in ALGOS: - raise DGPUComputeError(f'Unknown algo \"{algo}\"') - - logging.info(f'{algo} not in loaded models, swapping...') - least_used = list(models.keys())[0] - for model in models: - if models[least_used]['generated'] > models[model]['generated']: - least_used = model - - del models[least_used] - gc.collect() - - models[algo] = { - 'pipe': pipeline_for(params['algo'], image=True if binext else False), - 'generated': 0 - } - logging.info(f'swapping done.') - - _params = {} - logging.info(method) - logging.info(json.dumps(params, indent=4)) - logging.info(f'binext: {len(binext) if binext else 0} bytes') - if binext: - _params['image'] = image - _params['strength'] = float(Decimal(params['strength'])) - - else: - _params['width'] = int(params['width']) - _params['height'] = int(params['height']) - - try: - image = models[algo]['pipe']( - params['prompt'], - **_params, - guidance_scale=float(Decimal(params['guidance'])), - num_inference_steps=int(params['step']), - generator=torch.manual_seed(int(params['seed'])) - ).images[0] - - if params['upscaler'] == 'x4': - logging.info(f'size: {len(image.tobytes())}') - logging.info('performing upscale...') - input_img = image.convert('RGB') - up_img, _ = upscaler.enhance( - convert_from_image_to_cv2(input_img), outscale=4) - - image = convert_from_cv2_to_image(up_img) - logging.info('done') - - img_byte_arr = io.BytesIO() - image.save(img_byte_arr, format='PNG') - raw_img = img_byte_arr.getvalue() - img_sha = sha256(raw_img).hexdigest() - logging.info(f'final img size {len(raw_img)} bytes.') - - logging.info(params) - - return img_sha, raw_img - - except BaseException as e: - logging.error(e) - raise DGPUComputeError(str(e)) - - finally: - torch.cuda.empty_cache() - - case _: - raise DGPUComputeError('Unsupported compute method') - - async def get_work_requests_last_hour(): - logging.info('get_work_requests_last_hour') - try: - return await cleos.aget_table( - 'telos.gpu', 'telos.gpu', 'queue', - index_position=2, - key_type='i64', - lower_bound=int(time.time()) - 3600 - ) - - except ( - asks.errors.RequestTimeout, - json.JSONDecodeError - ): - return [] - - async def get_status_by_request_id(request_id: int): - logging.info('get_status_by_request_id') - return await cleos.aget_table( - 'telos.gpu', request_id, 'status') - - async def get_global_config(): - logging.info('get_global_config') - return (await cleos.aget_table( - 'telos.gpu', 'telos.gpu', 'config'))[0] - - async def get_worker_balance(): - logging.info('get_worker_balance') - rows = await cleos.aget_table( - 'telos.gpu', 'telos.gpu', 'users', - index_position=1, - key_type='name', - lower_bound=account, - upper_bound=account - ) - if len(rows) == 1: - return rows[0]['balance'] - else: - return None - - async def begin_work(request_id: int): - logging.info('begin_work') - return await cleos.a_push_action( - 'telos.gpu', - 'workbegin', - { - 'worker': Name(account), - 'request_id': request_id, - 'max_workers': 2 - }, - account, key, - permission=permission - ) - - async def cancel_work(request_id: int, reason: str): - logging.info('cancel_work') - return await cleos.a_push_action( - 'telos.gpu', - 'workcancel', - { - 'worker': Name(account), - 'request_id': request_id, - 'reason': reason - }, - account, key, - permission=permission - ) - - async def maybe_withdraw_all(): - logging.info('maybe_withdraw_all') - balance = await get_worker_balance() - if not balance: - return - - balance_amount = float(balance.split(' ')[0]) - if balance_amount > 0: - await cleos.a_push_action( - 'telos.gpu', - 'withdraw', - { - 'user': Name(account), - 'quantity': asset_from_str(balance) - }, - account, key, - permission=permission - ) - - async def find_my_results(): - logging.info('find_my_results') - return await cleos.aget_table( - 'telos.gpu', 'telos.gpu', 'results', - index_position=4, - key_type='name', - lower_bound=account, - upper_bound=account - ) - - ipfs_node = None - def publish_on_ipfs(img_sha: str, raw_img: bytes): - logging.info('publish_on_ipfs') - img = Image.open(io.BytesIO(raw_img)) - img.save(f'ipfs-docker-staging/image.png') - - ipfs_hash = ipfs_node.add('image.png') - - ipfs_node.pin(ipfs_hash) - - return ipfs_hash - - async def submit_work( - request_id: int, - request_hash: str, - result_hash: str, - ipfs_hash: str - ): - logging.info('submit_work') - await cleos.a_push_action( - 'telos.gpu', - 'submit', - { - 'worker': Name(account), - 'request_id': request_id, - 'request_hash': Checksum256(request_hash), - 'result_hash': Checksum256(result_hash), - 'ipfs_hash': ipfs_hash - }, - account, key, - permission=permission - ) - - async def get_input_data(ipfs_hash: str) -> bytes: - if ipfs_hash == '': - return b'' - - resp = await get_ipfs_file(f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png') - if resp.status_code != 200: - raise DGPUComputeError('Couldn\'t gather input data from ipfs') - - return resp.raw - - config = await get_global_config() - - with open_ipfs_node() as ipfs_node: - ipfs_node.connect(remote_ipfs_node) - try: - while True: - if auto_withdraw: - await maybe_withdraw_all() - - queue = await get_work_requests_last_hour() - - for req in queue: - rid = req['id'] - - my_results = [res['id'] for res in (await find_my_results())] - if rid not in my_results: - statuses = await get_status_by_request_id(rid) - - if len(statuses) < req['min_verification']: - - # parse request - body = json.loads(req['body']) - - binary = await get_input_data(req['binary_data']) - - hash_str = ( - str(req['nonce']) - + - req['body'] - + - req['binary_data'] - ) - logging.info(f'hashing: {hash_str}') - request_hash = sha256(hash_str.encode('utf-8')).hexdigest() - - # TODO: validate request - - # perform work - logging.info(f'working on {body}') - - resp = await begin_work(rid) - if 'code' in resp: - logging.info(f'probably beign worked on already... skip.') - - else: - try: - img_sha, raw_img = gpu_compute_one( - body['method'], body['params'], binext=binary) - - ipfs_hash = publish_on_ipfs(img_sha, raw_img) - - await submit_work(rid, request_hash, img_sha, ipfs_hash) - break - - except BaseException as e: - traceback.print_exc() - await cancel_work(rid, str(e)) - break - - else: - logging.info(f'request {rid} already beign worked on, skip...') - - await trio.sleep(1) - - except KeyboardInterrupt: - ... - - - diff --git a/skynet/dgpu/__init__.py b/skynet/dgpu/__init__.py new file mode 100644 index 0000000..ca34499 --- /dev/null +++ b/skynet/dgpu/__init__.py @@ -0,0 +1,16 @@ +#!/usr/bin/python + +import trio + +from skynet.dgpu.compute import SkynetMM +from skynet.dgpu.daemon import SkynetDGPUDaemon +from skynet.dgpu.network import SkynetGPUConnector + + +async def open_dgpu_node(config: dict): + conn = SkynetGPUConnector(config) + mm = SkynetMM(config) + + async with conn.open() as conn: + await (SkynetDGPUDaemon(mm, conn, config) + .serve_forever()) diff --git a/skynet/dgpu/compute.py b/skynet/dgpu/compute.py new file mode 100644 index 0000000..424d704 --- /dev/null +++ b/skynet/dgpu/compute.py @@ -0,0 +1,165 @@ +#!/usr/bin/python + +# Skynet Memory Manager + +import gc +from hashlib import sha256 +import json +import logging + +import torch +from skynet.constants import DEFAULT_INITAL_MODELS, MODELS +from skynet.dgpu.errors import DGPUComputeError + +from skynet.utils import convert_from_bytes_and_crop, convert_from_cv2_to_image, convert_from_image_to_cv2, convert_from_img_to_bytes, init_upscaler, pipeline_for + + +def prepare_params_for_diffuse( + params: dict, + binary: bytes | None = None +): + image = None + if binary: + image = convert_from_bytes_and_crop(binary, 512, 512) + + _params = {} + if image: + _params['image'] = image + _params['strength'] = float(params['strength']) + + else: + _params['width'] = int(params['width']) + _params['height'] = int(params['height']) + + return ( + params['prompt'], + float(params['guidance']), + int(params['step']), + torch.manual_seed(int(params['seed'])), + params['upscaler'] if 'upscaler' in params else None, + _params + ) + + +class SkynetMM: + + def __init__(self, config: dict): + self.upscaler = init_upscaler() + self.initial_models = ( + config['initial_models'] + if 'initial_models' in config else DEFAULT_INITAL_MODELS + ) + + self._models = {} + for model in self.initial_models: + self.load_model(model, False, force=True) + + def log_debug_info(self): + logging.info('memory summary:') + logging.info('\n' + torch.cuda.memory_summary()) + + def is_model_loaded(self, model_name: str, image: bool): + for model_key, model_data in self._models.items(): + if (model_key == model_name and + model_data['image'] == image): + return True + + return False + + def load_model( + self, + model_name: str, + image: bool, + force=False + ): + logging.info(f'loading model {model_name}...') + if force or len(self._models.keys()) == 0: + pipe = pipeline_for(model_name, image=image) + self._models[model_name] = { + 'pipe': pipe, + 'generated': 0, + 'image': image + } + + else: + least_used = list(self._models.keys())[0] + + for model in self._models: + if self._models[ + least_used]['generated'] > self._models[model]['generated']: + least_used = model + + del self._models[least_used] + + logging.info(f'swapping model {least_used} for {model_name}...') + + gc.collect() + torch.cuda.empty_cache() + + pipe = pipeline_for(model_name, image=image) + + self._models[model_name] = { + 'pipe': pipe, + 'generated': 0, + 'image': image + } + + logging.info(f'loaded model {model_name}') + return pipe + + def get_model(self, model_name: str, image: bool): + if model_name not in MODELS: + raise DGPUComputeError(f'Unknown model {model_name}') + + if not self.is_model_loaded(model_name, image): + pipe = self.load_model(model_name, image=image) + + else: + pipe = self._models[model_name] + + return pipe + + def compute_one( + self, + method: str, + params: dict, + binary: bytes | None = None + ): + try: + match method: + case 'diffuse': + image = None + + arguments = prepare_params_for_diffuse(params, binary) + prompt, guidance, step, seed, upscaler, extra_params = arguments + model = self.get_model(params['model'], 'image' in params) + + image = model['pipe']( + prompt, + guidance_scale=guidance, + num_inference_steps=step, + generator=seed, + **extra_params + ).images[0] + + if upscaler == 'x4': + input_img = image.convert('RGB') + up_img, _ = upscaler.enhance( + convert_from_image_to_cv2(input_img), outscale=4) + + image = convert_from_cv2_to_image(up_img) + + img_raw = convert_from_img_to_bytes(image) + img_sha = sha256(img_raw).hexdigest() + + return img_sha, img_raw + + case _: + raise DGPUComputeError('Unsupported compute method') + + except BaseException as e: + logging.error(e) + raise DGPUComputeError(str(e)) + + finally: + torch.cuda.empty_cache() diff --git a/skynet/dgpu/daemon.py b/skynet/dgpu/daemon.py new file mode 100644 index 0000000..4897f43 --- /dev/null +++ b/skynet/dgpu/daemon.py @@ -0,0 +1,92 @@ +#!/usr/bin/python + +import json +import logging +import traceback + +from hashlib import sha256 + +import trio + +from skynet.dgpu.compute import SkynetMM +from skynet.dgpu.network import SkynetGPUConnector + + +class SkynetDGPUDaemon: + + def __init__( + self, + mm: SkynetMM, + conn: SkynetGPUConnector, + config: dict + ): + self.mm = mm + self.conn = conn + self.auto_withdraw = ( + config['auto_withdraw'] + if 'auto_withdraw' in config else False + ) + + async def serve_forever(self): + try: + while True: + if self.auto_withdraw: + await self.conn.maybe_withdraw_all() + + queue = await self.conn.get_work_requests_last_hour() + + for req in queue: + rid = req['id'] + + my_results = [res['id'] for res in (await self.conn.find_my_results())] + if rid not in my_results: + statuses = await self.conn.get_status_by_request_id(rid) + + if len(statuses) < req['min_verification']: + + # parse request + body = json.loads(req['body']) + + binary = await self.conn.get_input_data(req['binary_data']) + + hash_str = ( + str(req['nonce']) + + + req['body'] + + + req['binary_data'] + ) + logging.info(f'hashing: {hash_str}') + request_hash = sha256(hash_str.encode('utf-8')).hexdigest() + + # TODO: validate request + + # perform work + logging.info(f'working on {body}') + + resp = await self.conn.begin_work(rid) + if 'code' in resp: + logging.info(f'probably beign worked on already... skip.') + + else: + try: + img_sha, img_raw = self.mm.compute_one( + body['method'], body['params'], binary=binary) + + ipfs_hash = self.conn.publish_on_ipfs( img_raw) + + await self.conn.submit_work(rid, request_hash, img_sha, ipfs_hash) + break + + except BaseException as e: + traceback.print_exc() + await self.conn.cancel_work(rid, str(e)) + break + + else: + logging.info(f'request {rid} already beign worked on, skip...') + + await trio.sleep(1) + + except KeyboardInterrupt: + ... diff --git a/skynet/dgpu/errors.py b/skynet/dgpu/errors.py new file mode 100644 index 0000000..1f08624 --- /dev/null +++ b/skynet/dgpu/errors.py @@ -0,0 +1,5 @@ +#!/usr/bin/python + + +class DGPUComputeError(BaseException): + ... diff --git a/skynet/dgpu/network.py b/skynet/dgpu/network.py new file mode 100644 index 0000000..55b8228 --- /dev/null +++ b/skynet/dgpu/network.py @@ -0,0 +1,191 @@ +#!/usr/bin/python + +import io +import json +import time +import logging + +import asks +from PIL import Image + +from contextlib import ExitStack +from contextlib import asynccontextmanager as acm + +from leap.cleos import CLEOS +from leap.sugar import Checksum256, Name, asset_from_str + +from skynet.dgpu.errors import DGPUComputeError +from skynet.ipfs import get_ipfs_file, open_ipfs_node + + +class SkynetGPUConnector: + + def __init__(self, config: dict): + self.account = Name(config['account']) + self.permission = config['permission'] + self.key = config['key'] + self.node_url = config['node_url'] + self.hyperion_url = config['hyperion_url'] + self.ipfs_url = config['ipfs_url'] + + self.cleos = CLEOS( + None, None, self.node_url, remote=self.node_url) + + self._exit_stack = ExitStack() + + def connect(self): + self.ipfs_node = self._exit_stack.enter_context( + open_ipfs_node()) + + def disconnect(self): + self._exit_stack.close() + + @acm + async def open(self): + self.connect() + yield self + self.disconnect() + + # blockchain helpers + + async def get_work_requests_last_hour(self): + logging.info('get_work_requests_last_hour') + try: + return await self.cleos.aget_table( + 'telos.gpu', 'telos.gpu', 'queue', + index_position=2, + key_type='i64', + lower_bound=int(time.time()) - 3600 + ) + + except ( + asks.errors.RequestTimeout, + json.JSONDecodeError + ): + return [] + + async def get_status_by_request_id(self, request_id: int): + logging.info('get_status_by_request_id') + return await self.cleos.aget_table( + 'telos.gpu', request_id, 'status') + + async def get_global_config(self): + logging.info('get_global_config') + return (await self.cleos.aget_table( + 'telos.gpu', 'telos.gpu', 'config'))[0] + + async def get_worker_balance(self): + logging.info('get_worker_balance') + rows = await self.cleos.aget_table( + 'telos.gpu', 'telos.gpu', 'users', + index_position=1, + key_type='name', + lower_bound=self.account, + upper_bound=self.account + ) + if len(rows) == 1: + return rows[0]['balance'] + else: + return None + + async def begin_work(self, request_id: int): + logging.info('begin_work') + return await self.cleos.a_push_action( + 'telos.gpu', + 'workbegin', + { + 'worker': self.account, + 'request_id': request_id, + 'max_workers': 2 + }, + self.account, self.key, + permission=self.permission + ) + + async def cancel_work(self, request_id: int, reason: str): + logging.info('cancel_work') + return await self.cleos.a_push_action( + 'telos.gpu', + 'workcancel', + { + 'worker': self.account, + 'request_id': request_id, + 'reason': reason + }, + self.account, self.key, + permission=self.permission + ) + + async def maybe_withdraw_all(self): + logging.info('maybe_withdraw_all') + balance = await self.get_worker_balance() + if not balance: + return + + balance_amount = float(balance.split(' ')[0]) + if balance_amount > 0: + await self.cleos.a_push_action( + 'telos.gpu', + 'withdraw', + { + 'user': self.account, + 'quantity': asset_from_str(balance) + }, + self.account, self.key, + permission=self.permission + ) + + async def find_my_results(self): + logging.info('find_my_results') + return await self.cleos.aget_table( + 'telos.gpu', 'telos.gpu', 'results', + index_position=4, + key_type='name', + lower_bound=self.account, + upper_bound=self.account + ) + + async def submit_work( + self, + request_id: int, + request_hash: str, + result_hash: str, + ipfs_hash: str + ): + logging.info('submit_work') + await self.cleos.a_push_action( + 'telos.gpu', + 'submit', + { + 'worker': self.account, + 'request_id': request_id, + 'request_hash': Checksum256(request_hash), + 'result_hash': Checksum256(result_hash), + 'ipfs_hash': ipfs_hash + }, + self.account, self.key, + permission=self.permission + ) + + # IPFS helpers + + def publish_on_ipfs(self, raw_img: bytes): + logging.info('publish_on_ipfs') + img = Image.open(io.BytesIO(raw_img)) + img.save(f'ipfs-docker-staging/image.png') + + ipfs_hash = self.ipfs_node.add('image.png') + + self.ipfs_node.pin(ipfs_hash) + + return ipfs_hash + + async def get_input_data(self, ipfs_hash: str) -> bytes: + if ipfs_hash == '': + return b'' + + resp = await get_ipfs_file(f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png') + if resp.status_code != 200: + raise DGPUComputeError('Couldn\'t gather input data from ipfs') + + return resp.raw diff --git a/skynet/frontend/telegram/__init__.py b/skynet/frontend/telegram/__init__.py index f417842..07a39c5 100644 --- a/skynet/frontend/telegram/__init__.py +++ b/skynet/frontend/telegram/__init__.py @@ -214,7 +214,7 @@ class SkynetTelegramFrontend: if not ipfs_hash: await self.update_status_message( status_msg, - '\n[{timestamp_pretty()}] timeout processing request', + f'\n[{timestamp_pretty()}] timeout processing request', parse_mode='HTML' ) return diff --git a/skynet/frontend/telegram/handlers.py b/skynet/frontend/telegram/handlers.py index 7e77880..17d3213 100644 --- a/skynet/frontend/telegram/handlers.py +++ b/skynet/frontend/telegram/handlers.py @@ -151,6 +151,9 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'): **user_config } + params['model'] = get_model_by_shortname(params['algo']) + del params['algo'] + await db_call( 'update_user_stats', user.id, 'txt2img', last_prompt=prompt) @@ -227,6 +230,8 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'): 'prompt': prompt, **user_config } + params['model'] = get_model_by_shortname(params['algo']) + del params['algo'] await db_call( 'update_user_stats', @@ -297,6 +302,8 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'): 'prompt': prompt, **user_config } + params['model'] = get_model_by_shortname(params['algo']) + del params['algo'] await work_request( user, status_msg, 'redo', params, diff --git a/skynet/frontend/telegram/utils.py b/skynet/frontend/telegram/utils.py index 682cd02..cebb6e5 100644 --- a/skynet/frontend/telegram/utils.py +++ b/skynet/frontend/telegram/utils.py @@ -53,7 +53,7 @@ def prepare_metainfo_caption(tguser, worker: str, reward: str, meta: dict) -> st meta_str += f'guidance: {meta["guidance"]}\n' if meta['strength']: meta_str += f'strength: {meta["strength"]}\n' - meta_str += f'algo: {meta["algo"]}\n' + meta_str += f'algo: {meta["model"]}\n' if meta['upscaler']: meta_str += f'upscaler: {meta["upscaler"]}\n' diff --git a/skynet/nodeos.py b/skynet/nodeos.py index dddc935..01524e8 100644 --- a/skynet/nodeos.py +++ b/skynet/nodeos.py @@ -116,12 +116,14 @@ def open_nodeos(cleanup: bool = True): priv, pub = cleos.create_key_pair() cleos.import_key(priv) + cleos.private_keys['telos.gpu'] = priv logging.info(f'GPU KEYS: {(priv, pub)}') cleos.new_account('telos.gpu', ram=4200000, key=pub) for i in range(1, 4): priv, pub = cleos.create_key_pair() cleos.import_key(priv) + cleos.private_keys[f'testworker{i}'] = priv logging.info(f'testworker{i} KEYS: {(priv, pub)}') cleos.create_account_staked( 'eosio', f'testworker{i}', key=pub) diff --git a/skynet/utils.py b/skynet/utils.py index 3789aa4..e4bd04b 100644 --- a/skynet/utils.py +++ b/skynet/utils.py @@ -1,5 +1,6 @@ #!/usr/bin/python +import io import os import time import random @@ -13,6 +14,7 @@ import numpy as np from PIL import Image from basicsr.archs.rrdbnet_arch import RRDBNet from diffusers import ( + DiffusionPipeline, StableDiffusionPipeline, StableDiffusionImg2ImgPipeline, EulerAncestralDiscreteScheduler @@ -20,7 +22,7 @@ from diffusers import ( from realesrgan import RealESRGANer from huggingface_hub import login -from .constants import ALGOS +from .constants import MODELS def time_ms(): @@ -37,7 +39,24 @@ def convert_from_image_to_cv2(img: Image) -> np.ndarray: return np.asarray(img) -def pipeline_for(algo: str, mem_fraction: float = 1.0, image=False): +def convert_from_bytes_to_img(raw: bytes) -> Image: + return Image.open(io.BytesIO(raw)) + + +def convert_from_img_to_bytes(image: Image, fmt='PNG') -> bytes: + byte_arr = io.BytesIO() + image.save(byte_arr, format=fmt) + return byte_arr.getvalue() + + +def convert_from_bytes_and_crop(raw: bytes, max_w: int, max_h: int) -> Image: + image = convert_from_bytes_to_img(raw) + w, h = image.size + if w > max_w or h > max_h: + image.thumbnail((512, 512)) + + +def pipeline_for(model: str, mem_fraction: float = 1.0, image=False) -> DiffusionPipeline: assert torch.cuda.is_available() torch.cuda.empty_cache() torch.cuda.set_per_process_memory_fraction(mem_fraction) @@ -56,7 +75,7 @@ def pipeline_for(algo: str, mem_fraction: float = 1.0, image=False): 'safety_checker': None } - if algo == 'stable': + if model == 'runwayml/stable-diffusion-v1-5': params['revision'] = 'fp16' if image: @@ -65,7 +84,7 @@ def pipeline_for(algo: str, mem_fraction: float = 1.0, image=False): pipe_class = StableDiffusionPipeline pipe = pipe_class.from_pretrained( - ALGOS[algo], **params) + model, **params) pipe.scheduler = EulerAncestralDiscreteScheduler.from_config( pipe.scheduler.config) @@ -78,7 +97,7 @@ def pipeline_for(algo: str, mem_fraction: float = 1.0, image=False): def txt2img( hf_token: str, - model: str = 'midj', + model: str = 'prompthero/openjourney', prompt: str = 'a red old tractor in a sunny wheat field', output: str = 'output.png', width: int = 512, height: int = 512, @@ -110,7 +129,7 @@ def txt2img( def img2img( hf_token: str, - model: str = 'midj', + model: str = 'prompthero/openjourney', prompt: str = 'a red old tractor in a sunny wheat field', img_path: str = 'input.png', output: str = 'output.png', @@ -143,6 +162,23 @@ def img2img( image.save(output) + +def init_upscaler(model_path: str = 'weights/RealESRGAN_x4plus.pth'): + return RealESRGANer( + scale=4, + model_path=model_path, + dni_weight=None, + model=RRDBNet( + num_in_ch=3, + num_out_ch=3, + num_feat=64, + num_block=23, + num_grow_ch=32, + scale=4 + ), + half=True + ) + def upscale( img_path: str = 'input.png', output: str = 'output.png', @@ -156,19 +192,7 @@ def upscale( input_img = Image.open(img_path).convert('RGB') - upscaler = RealESRGANer( - scale=4, - model_path=model_path, - dni_weight=None, - model=RRDBNet( - num_in_ch=3, - num_out_ch=3, - num_feat=64, - num_block=23, - num_grow_ch=32, - scale=4 - ), - half=True) + upscaler = init_upscaler(model_path=model_path) up_img, _ = upscaler.enhance( convert_from_image_to_cv2(input_img), outscale=4) @@ -183,7 +207,8 @@ def download_all_models(hf_token: str): assert torch.cuda.is_available() login(token=hf_token) - for model in ALGOS: + for model in MODELS: print(f'DOWNLOADING {model.upper()}') pipeline_for(model) - + print(f'DOWNLOADING IMAGE {model.upper()}') + pipeline_for(model, image=True) diff --git a/tests/test_deploy.py b/tests/test_deploy.py index 3598788..62ef635 100644 --- a/tests/test_deploy.py +++ b/tests/test_deploy.py @@ -8,6 +8,7 @@ from functools import partial import trio import requests +from skynet.constants import DEFAULT_IPFS_REMOTE from skynet.dgpu import open_dgpu_node @@ -32,7 +33,7 @@ def test_enqueue_work(cleos): binary = '' ec, out = cleos.push_action( - 'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU'], f'{user}@active' + 'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU', 1], f'{user}@active' ) assert ec == 0 @@ -53,6 +54,8 @@ def test_enqueue_work(cleos): f'testworker1', 'active', cleos, + DEFAULT_IPFS_REMOTE, + cleos.private_keys['testworker1'], initial_algos=['midj'] ) ) @@ -80,12 +83,13 @@ def test_enqueue_dequeue(cleos): binary = '' ec, out = cleos.push_action( - 'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU'], f'{user}@active' + 'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU', 1], f'{user}@active' ) assert ec == 0 - request_id = int(collect_stdout(out)) + request_id, _ = collect_stdout(out).split(':') + request_id = int(request_id) queue = cleos.get_table('telos.gpu', 'telos.gpu', 'queue')