Separate worker into submodules

add-txt2txt-models
Guillermo Rodriguez 2023-06-04 17:51:43 -03:00
parent fc513b89af
commit bbc5751837
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
16 changed files with 684 additions and 485 deletions

View File

@ -1,11 +1,24 @@
[skynet.account]
name = xxxxxxxxxxxx
permission = active
key = EOSXxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[skynet.dgpu] [skynet.dgpu]
account = testworkerX
permission = active
key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
node_url = https://skynet.ancap.tech
hyperion_url = https://skynet.ancap.tech
ipfs_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv
hf_home = hf_home hf_home = hf_home
hf_token = hf_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx hf_token = hf_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx
auto_withdraw = True
[skynet.telegram] [skynet.telegram]
account = telegram
permission = active
key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
node_url = https://skynet.ancap.tech
hyperion_url = https://skynet.ancap.tech
ipfs_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv
token = XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx token = XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

View File

@ -17,8 +17,8 @@ import docker
import asyncio import asyncio
import requests import requests
from leap.cleos import CLEOS, default_nodeos_image from leap.cleos import CLEOS
from leap.sugar import get_container, collect_stdout from leap.sugar import collect_stdout
from leap.hyperion import HyperionAPI from leap.hyperion import HyperionAPI
from .db import open_new_database from .db import open_new_database
@ -46,7 +46,7 @@ def skynet(*args, **kwargs):
@click.option('--seed', '-S', default=None) @click.option('--seed', '-S', default=None)
def txt2img(*args, **kwargs): def txt2img(*args, **kwargs):
from . import utils from . import utils
_, hf_token, _, cfg = init_env_from_config() _, hf_token, _ = init_env_from_config()
utils.txt2img(hf_token, **kwargs) utils.txt2img(hf_token, **kwargs)
@click.command() @click.command()
@ -61,7 +61,7 @@ def txt2img(*args, **kwargs):
@click.option('--seed', '-S', default=None) @click.option('--seed', '-S', default=None)
def img2img(model, prompt, input, output, strength, guidance, steps, seed): def img2img(model, prompt, input, output, strength, guidance, steps, seed):
from . import utils from . import utils
_, hf_token, _, cfg = init_env_from_config() _, hf_token, _ = init_env_from_config()
utils.img2img( utils.img2img(
hf_token, hf_token,
model=model, model=model,
@ -89,7 +89,7 @@ def upscale(input, output, model):
@skynet.command() @skynet.command()
def download(): def download():
from . import utils from . import utils
_, hf_token, _, cfg = init_env_from_config() _, hf_token, _ = init_env_from_config()
utils.download_all_models(hf_token) utils.download_all_models(hf_token)
@skynet.command() @skynet.command()
@ -122,7 +122,11 @@ def enqueue(
**kwargs **kwargs
): ):
key, account, permission = load_account_info( key, account, permission = load_account_info(
key, account, permission) 'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
with open_cleos(node_url, key=key) as cleos: with open_cleos(node_url, key=key) as cleos:
if not kwargs['seed']: if not kwargs['seed']:
kwargs['seed'] = random.randint(0, 10e9) kwargs['seed'] = random.randint(0, 10e9)
@ -157,6 +161,12 @@ def clean(
key: str | None, key: str | None,
node_url: str, node_url: str,
): ):
key, account, permission = load_account_info(
'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
logging.basicConfig(level=loglevel) logging.basicConfig(level=loglevel)
cleos = CLEOS(None, None, url=node_url, remote=node_url) cleos = CLEOS(None, None, url=node_url, remote=node_url)
trio.run( trio.run(
@ -173,6 +183,8 @@ def clean(
@click.option( @click.option(
'--node-url', '-n', default='https://skynet.ancap.tech') '--node-url', '-n', default='https://skynet.ancap.tech')
def queue(node_url: str): def queue(node_url: str):
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
resp = requests.post( resp = requests.post(
f'{node_url}/v1/chain/get_table_rows', f'{node_url}/v1/chain/get_table_rows',
json={ json={
@ -189,6 +201,8 @@ def queue(node_url: str):
'--node-url', '-n', default='https://skynet.ancap.tech') '--node-url', '-n', default='https://skynet.ancap.tech')
@click.argument('request-id') @click.argument('request-id')
def status(node_url: str, request_id: int): def status(node_url: str, request_id: int):
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
resp = requests.post( resp = requests.post(
f'{node_url}/v1/chain/get_table_rows', f'{node_url}/v1/chain/get_table_rows',
json={ json={
@ -218,7 +232,11 @@ def dequeue(
request_id: int request_id: int
): ):
key, account, permission = load_account_info( key, account, permission = load_account_info(
key, account, permission) 'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
with open_cleos(node_url, key=key) as cleos: with open_cleos(node_url, key=key) as cleos:
ec, out = cleos.push_action( ec, out = cleos.push_action(
'telos.gpu', 'dequeue', [account, request_id], f'{account}@{permission}' 'telos.gpu', 'dequeue', [account, request_id], f'{account}@{permission}'
@ -236,8 +254,6 @@ def dequeue(
'--key', '-k', default=None) '--key', '-k', default=None)
@click.option( @click.option(
'--node-url', '-n', default='https://skynet.ancap.tech') '--node-url', '-n', default='https://skynet.ancap.tech')
@click.option(
'--verifications', '-v', default=1)
@click.option( @click.option(
'--token-contract', '-c', default='eosio.token') '--token-contract', '-c', default='eosio.token')
@click.option( @click.option(
@ -247,15 +263,17 @@ def config(
permission: str, permission: str,
key: str | None, key: str | None,
node_url: str, node_url: str,
verifications: int,
token_contract: str, token_contract: str,
token_symbol: str token_symbol: str
): ):
key, account, permission = load_account_info( key, account, permission = load_account_info(
key, account, permission) 'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
with open_cleos(node_url, key=key) as cleos: with open_cleos(node_url, key=key) as cleos:
ec, out = cleos.push_action( ec, out = cleos.push_action(
'telos.gpu', 'config', [verifications, token_contract, token_symbol], f'{account}@{permission}' 'telos.gpu', 'config', [token_contract, token_symbol], f'{account}@{permission}'
) )
print(collect_stdout(out)) print(collect_stdout(out))
@ -279,7 +297,10 @@ def deposit(
quantity: str quantity: str
): ):
key, account, permission = load_account_info( key, account, permission = load_account_info(
key, account, permission) 'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
with open_cleos(node_url, key=key) as cleos: with open_cleos(node_url, key=key) as cleos:
ec, out = cleos.transfer_token(account, 'telos.gpu', quantity) ec, out = cleos.transfer_token(account, 'telos.gpu', quantity)
@ -304,45 +325,22 @@ def nodeos():
... ...
@run.command() @run.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--loglevel', '-l', default='INFO', help='Logging level')
@click.option( @click.option(
'--account', '-a', default='testworker1') '--config-path', '-c', default='skynet.ini')
@click.option(
'--permission', '-p', default='active')
@click.option(
'--key', '-k', default=None)
@click.option(
'--auto-withdraw', '-w', default=True)
@click.option(
'--node-url', '-n', default='https://skynet.ancap.tech')
@click.option(
'--ipfs-url', '-n', default=DEFAULT_IPFS_REMOTE)
@click.option(
'--algos', '-A', default=json.dumps(['midj']))
def dgpu( def dgpu(
loglevel: str, loglevel: str,
account: str, config_path: str
permission: str,
key: str | None,
auto_withdraw: bool,
node_url: str,
ipfs_url: str,
algos: list[str]
): ):
from .dgpu import open_dgpu_node from .dgpu import open_dgpu_node
key, account, permission = load_account_info( logging.basicConfig(level=loglevel)
key, account, permission)
trio.run( config = load_skynet_ini(file_path=config_path)
partial(
open_dgpu_node, assert 'skynet.dgpu' in config
account, permission,
CLEOS(None, None, url=node_url, remote=node_url), trio.run(open_dgpu_node, config['skynet.dgpu'])
ipfs_url,
auto_withdraw=auto_withdraw,
key=key, initial_algos=json.loads(algos)
))
@run.command() @run.command()
@ -379,10 +377,13 @@ def telegram(
): ):
logging.basicConfig(level=loglevel) logging.basicConfig(level=loglevel)
key, account, permission = load_account_info( _, _, tg_token = init_env_from_config()
key, account, permission)
_, _, tg_token, cfg = init_env_from_config() key, account, permission = load_account_info(
'telegram', key, account, permission)
node_url, _, ipfs_url = load_endpoint_info(
'telegram', node_url, None, None)
async def _async_main(): async def _async_main():
frontend = SkynetTelegramFrontend( frontend = SkynetTelegramFrontend(
@ -485,7 +486,7 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
async def task_pin(cid: str): async def task_pin(cid: str):
logging.info(f'pinning {cid}...') logging.info(f'pinning {cid}...')
for i in range(6): for _ in range(6):
try: try:
with trio.move_on_after(5): with trio.move_on_after(5):
resp = await ipfs_node.a_pin(cid) resp = await ipfs_node.a_pin(cid)

View File

@ -1,9 +1,11 @@
#!/usr/bin/python #!/usr/bin/python
import os import os
import json
from pathlib import Path from pathlib import Path
from configparser import ConfigParser from configparser import ConfigParser
from re import sub
from .constants import DEFAULT_CONFIG_PATH from .constants import DEFAULT_CONFIG_PATH
@ -13,45 +15,89 @@ def load_skynet_ini(
): ):
config = ConfigParser() config = ConfigParser()
config.read(file_path) config.read(file_path)
return config return config
def init_env_from_config( def init_env_from_config(
hf_token: str | None = None,
hf_home: str | None = None,
tg_token: str | None = None,
file_path=DEFAULT_CONFIG_PATH file_path=DEFAULT_CONFIG_PATH
): ):
config = load_skynet_ini() config = load_skynet_ini(file_path=file_path)
if 'HF_TOKEN' in os.environ: if 'HF_TOKEN' in os.environ:
hf_token = os.environ['HF_TOKEN'] hf_token = os.environ['HF_TOKEN']
else:
hf_token = config['skynet.dgpu']['hf_token'] elif 'skynet.dgpu' in config:
sub_config = config['skynet.dgpu']
if 'hf_token' in sub_config:
hf_token = sub_config['hf_token']
if 'HF_HOME' in os.environ: if 'HF_HOME' in os.environ:
hf_home = os.environ['HF_HOME'] hf_home = os.environ['HF_HOME']
else:
hf_home = config['skynet.dgpu']['hf_home'] elif 'skynet.dgpu' in config:
sub_config = config['skynet.dgpu']
if 'hf_home' in sub_config:
hf_home = sub_config['hf_home']
if 'TG_TOKEN' in os.environ: if 'TG_TOKEN' in os.environ:
tg_token = os.environ['TG_TOKEN'] tg_token = os.environ['TG_TOKEN']
else: elif 'skynet.telegram' in config:
tg_token = config['skynet.telegram']['token'] sub_config = config['skynet.telegram']
if 'token' in sub_config:
tg_token = sub_config['token']
return hf_home, hf_token, tg_token, config return hf_home, hf_token, tg_token
def load_account_info( def load_account_info(
key, account, permission, _type: str,
key: str | None = None,
account: str | None = None,
permission: str | None = None,
file_path=DEFAULT_CONFIG_PATH file_path=DEFAULT_CONFIG_PATH
): ):
_, _, _, config = init_env_from_config() config = load_skynet_ini(file_path=file_path)
if not key: type_key = f'skynet.{_type}'
key = config['skynet.account']['key']
if not account: if type_key in config:
account = config['skynet.account']['name'] sub_config = config[type_key]
if not key and 'key' in sub_config:
key = sub_config['key']
if not permission: if not account and 'name' in sub_config:
permission = config['skynet.account']['permission'] account = sub_config['name']
if not permission and 'permission' in sub_config:
permission = sub_config['permission']
return key, account, permission return key, account, permission
def load_endpoint_info(
_type: str,
node_url: str | None = None,
hyperion_url: str | None = None,
ipfs_url: str | None = None,
file_path=DEFAULT_CONFIG_PATH
):
config = load_skynet_ini(file_path=file_path)
type_key = f'skynet.{_type}'
if type_key in config:
sub_config = config[type_key]
if not node_url and 'node_url' in sub_config:
node_url = sub_config['node_url']
if not hyperion_url and 'hyperion_url' in sub_config:
hyperion_url = sub_config['hyperion_url']
if not ipfs_url and 'ipfs_url' in sub_config:
ipfs_url = sub_config['ipfs_url']
return node_url, hyperion_url, ipfs_url

View File

@ -1,21 +1,26 @@
#!/usr/bin/python #!/usr/bin/python
VERSION = '0.1a9' VERSION = '0.1a10'
DOCKER_RUNTIME_CUDA = 'skynet:runtime-cuda' DOCKER_RUNTIME_CUDA = 'skynet:runtime-cuda'
ALGOS = { MODELS = {
'midj': 'prompthero/openjourney', 'prompthero/openjourney': { 'short': 'midj'},
'stable': 'runwayml/stable-diffusion-v1-5', 'runwayml/stable-diffusion-v1-5': { 'short': 'stable'},
'hdanime': 'Linaqruf/anything-v3.0', 'Linaqruf/anything-v3.0': { 'short': 'hdanime'},
'waifu': 'hakurei/waifu-diffusion', 'hakurei/waifu-diffusion': { 'short': 'waifu'},
'ghibli': 'nitrosocke/Ghibli-Diffusion', 'nitrosocke/Ghibli-Diffusion': { 'short': 'ghibli'},
'van-gogh': 'dallinmackay/Van-Gogh-diffusion', 'dallinmackay/Van-Gogh-diffusion': { 'short': 'van-gogh'},
'pokemon': 'lambdalabs/sd-pokemon-diffusers', 'lambdalabs/sd-pokemon-diffusers': { 'short': 'pokemon'},
'ink': 'Envvi/Inkpunk-Diffusion', 'Envvi/Inkpunk-Diffusion': { 'short': 'ink'},
'robot': 'nousr/robo-diffusion' 'nousr/robo-diffusion': { 'short': 'robot'}
} }
def get_model_by_shortname(short: str):
for model, info in MODELS.items():
if short == info['short']:
return model
N = '\n' N = '\n'
HELP_TEXT = f''' HELP_TEXT = f'''
test art bot v{VERSION} test art bot v{VERSION}
@ -36,7 +41,7 @@ config is individual to each user!
/config algo NAME - select AI to use one of: /config algo NAME - select AI to use one of:
{N.join(ALGOS.keys())} {N.join(MODELS.keys())}
/config step NUMBER - set amount of iterations /config step NUMBER - set amount of iterations
/config seed NUMBER - set the seed, deterministic results! /config seed NUMBER - set the seed, deterministic results!
@ -115,8 +120,10 @@ DEFAULT_UPSCALER = None
DEFAULT_CONFIG_PATH = 'skynet.ini' DEFAULT_CONFIG_PATH = 'skynet.ini'
DEFAULT_DGPU_MAX_TASKS = 2 DEFAULT_INITAL_MODELS = [
DEFAULT_INITAL_ALGOS = ['midj', 'stable', 'ink'] 'prompthero/openjourney',
'runwayml/stable-diffusion-v1-5'
]
DATE_FORMAT = '%B the %dth %Y, %H:%M:%S' DATE_FORMAT = '%B the %dth %Y, %H:%M:%S'

View File

@ -1,375 +0,0 @@
#!/usr/bin/python
import gc
import io
import json
import time
import logging
import traceback
from PIL import Image
from typing import List, Optional
from hashlib import sha256
import trio
import asks
import torch
from leap.cleos import CLEOS
from leap.sugar import *
from realesrgan import RealESRGANer
from basicsr.archs.rrdbnet_arch import RRDBNet
from .ipfs import open_ipfs_node, get_ipfs_file
from .utils import *
from .constants import *
def init_upscaler(model_path: str = 'weights/RealESRGAN_x4plus.pth'):
return RealESRGANer(
scale=4,
model_path=model_path,
dni_weight=None,
model=RRDBNet(
num_in_ch=3,
num_out_ch=3,
num_feat=64,
num_block=23,
num_grow_ch=32,
scale=4
),
half=True
)
class DGPUComputeError(BaseException):
...
async def open_dgpu_node(
account: str,
permission: str,
cleos: CLEOS,
remote_ipfs_node: str,
key: str = None,
initial_algos: Optional[List[str]] = None,
auto_withdraw: bool = True
):
logging.basicConfig(level=logging.INFO)
logging.info(f'starting dgpu node!')
logging.info(f'launching toolchain container!')
logging.info(f'loading models...')
upscaler = init_upscaler()
initial_algos = (
initial_algos
if initial_algos else DEFAULT_INITAL_ALGOS
)
models = {}
for algo in initial_algos:
models[algo] = {
'pipe': pipeline_for(algo),
'generated': 0
}
logging.info(f'loaded {algo}.')
logging.info('memory summary:')
logging.info('\n' + torch.cuda.memory_summary())
def gpu_compute_one(method: str, params: dict, binext: Optional[bytes] = None):
match method:
case 'diffuse':
image = None
algo = params['algo']
if binext:
algo += 'img'
image = Image.open(io.BytesIO(binext))
w, h = image.size
logging.info(f'user sent img of size {image.size}')
if w > 512 or h > 512:
image.thumbnail((512, 512))
logging.info(f'resized it to {image.size}')
if algo not in models:
if params['algo'] not in ALGOS:
raise DGPUComputeError(f'Unknown algo \"{algo}\"')
logging.info(f'{algo} not in loaded models, swapping...')
least_used = list(models.keys())[0]
for model in models:
if models[least_used]['generated'] > models[model]['generated']:
least_used = model
del models[least_used]
gc.collect()
models[algo] = {
'pipe': pipeline_for(params['algo'], image=True if binext else False),
'generated': 0
}
logging.info(f'swapping done.')
_params = {}
logging.info(method)
logging.info(json.dumps(params, indent=4))
logging.info(f'binext: {len(binext) if binext else 0} bytes')
if binext:
_params['image'] = image
_params['strength'] = float(Decimal(params['strength']))
else:
_params['width'] = int(params['width'])
_params['height'] = int(params['height'])
try:
image = models[algo]['pipe'](
params['prompt'],
**_params,
guidance_scale=float(Decimal(params['guidance'])),
num_inference_steps=int(params['step']),
generator=torch.manual_seed(int(params['seed']))
).images[0]
if params['upscaler'] == 'x4':
logging.info(f'size: {len(image.tobytes())}')
logging.info('performing upscale...')
input_img = image.convert('RGB')
up_img, _ = upscaler.enhance(
convert_from_image_to_cv2(input_img), outscale=4)
image = convert_from_cv2_to_image(up_img)
logging.info('done')
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
raw_img = img_byte_arr.getvalue()
img_sha = sha256(raw_img).hexdigest()
logging.info(f'final img size {len(raw_img)} bytes.')
logging.info(params)
return img_sha, raw_img
except BaseException as e:
logging.error(e)
raise DGPUComputeError(str(e))
finally:
torch.cuda.empty_cache()
case _:
raise DGPUComputeError('Unsupported compute method')
async def get_work_requests_last_hour():
logging.info('get_work_requests_last_hour')
try:
return await cleos.aget_table(
'telos.gpu', 'telos.gpu', 'queue',
index_position=2,
key_type='i64',
lower_bound=int(time.time()) - 3600
)
except (
asks.errors.RequestTimeout,
json.JSONDecodeError
):
return []
async def get_status_by_request_id(request_id: int):
logging.info('get_status_by_request_id')
return await cleos.aget_table(
'telos.gpu', request_id, 'status')
async def get_global_config():
logging.info('get_global_config')
return (await cleos.aget_table(
'telos.gpu', 'telos.gpu', 'config'))[0]
async def get_worker_balance():
logging.info('get_worker_balance')
rows = await cleos.aget_table(
'telos.gpu', 'telos.gpu', 'users',
index_position=1,
key_type='name',
lower_bound=account,
upper_bound=account
)
if len(rows) == 1:
return rows[0]['balance']
else:
return None
async def begin_work(request_id: int):
logging.info('begin_work')
return await cleos.a_push_action(
'telos.gpu',
'workbegin',
{
'worker': Name(account),
'request_id': request_id,
'max_workers': 2
},
account, key,
permission=permission
)
async def cancel_work(request_id: int, reason: str):
logging.info('cancel_work')
return await cleos.a_push_action(
'telos.gpu',
'workcancel',
{
'worker': Name(account),
'request_id': request_id,
'reason': reason
},
account, key,
permission=permission
)
async def maybe_withdraw_all():
logging.info('maybe_withdraw_all')
balance = await get_worker_balance()
if not balance:
return
balance_amount = float(balance.split(' ')[0])
if balance_amount > 0:
await cleos.a_push_action(
'telos.gpu',
'withdraw',
{
'user': Name(account),
'quantity': asset_from_str(balance)
},
account, key,
permission=permission
)
async def find_my_results():
logging.info('find_my_results')
return await cleos.aget_table(
'telos.gpu', 'telos.gpu', 'results',
index_position=4,
key_type='name',
lower_bound=account,
upper_bound=account
)
ipfs_node = None
def publish_on_ipfs(img_sha: str, raw_img: bytes):
logging.info('publish_on_ipfs')
img = Image.open(io.BytesIO(raw_img))
img.save(f'ipfs-docker-staging/image.png')
ipfs_hash = ipfs_node.add('image.png')
ipfs_node.pin(ipfs_hash)
return ipfs_hash
async def submit_work(
request_id: int,
request_hash: str,
result_hash: str,
ipfs_hash: str
):
logging.info('submit_work')
await cleos.a_push_action(
'telos.gpu',
'submit',
{
'worker': Name(account),
'request_id': request_id,
'request_hash': Checksum256(request_hash),
'result_hash': Checksum256(result_hash),
'ipfs_hash': ipfs_hash
},
account, key,
permission=permission
)
async def get_input_data(ipfs_hash: str) -> bytes:
if ipfs_hash == '':
return b''
resp = await get_ipfs_file(f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png')
if resp.status_code != 200:
raise DGPUComputeError('Couldn\'t gather input data from ipfs')
return resp.raw
config = await get_global_config()
with open_ipfs_node() as ipfs_node:
ipfs_node.connect(remote_ipfs_node)
try:
while True:
if auto_withdraw:
await maybe_withdraw_all()
queue = await get_work_requests_last_hour()
for req in queue:
rid = req['id']
my_results = [res['id'] for res in (await find_my_results())]
if rid not in my_results:
statuses = await get_status_by_request_id(rid)
if len(statuses) < req['min_verification']:
# parse request
body = json.loads(req['body'])
binary = await get_input_data(req['binary_data'])
hash_str = (
str(req['nonce'])
+
req['body']
+
req['binary_data']
)
logging.info(f'hashing: {hash_str}')
request_hash = sha256(hash_str.encode('utf-8')).hexdigest()
# TODO: validate request
# perform work
logging.info(f'working on {body}')
resp = await begin_work(rid)
if 'code' in resp:
logging.info(f'probably beign worked on already... skip.')
else:
try:
img_sha, raw_img = gpu_compute_one(
body['method'], body['params'], binext=binary)
ipfs_hash = publish_on_ipfs(img_sha, raw_img)
await submit_work(rid, request_hash, img_sha, ipfs_hash)
break
except BaseException as e:
traceback.print_exc()
await cancel_work(rid, str(e))
break
else:
logging.info(f'request {rid} already beign worked on, skip...')
await trio.sleep(1)
except KeyboardInterrupt:
...

View File

@ -0,0 +1,16 @@
#!/usr/bin/python
import trio
from skynet.dgpu.compute import SkynetMM
from skynet.dgpu.daemon import SkynetDGPUDaemon
from skynet.dgpu.network import SkynetGPUConnector
async def open_dgpu_node(config: dict):
conn = SkynetGPUConnector(config)
mm = SkynetMM(config)
async with conn.open() as conn:
await (SkynetDGPUDaemon(mm, conn, config)
.serve_forever())

View File

@ -0,0 +1,165 @@
#!/usr/bin/python
# Skynet Memory Manager
import gc
from hashlib import sha256
import json
import logging
import torch
from skynet.constants import DEFAULT_INITAL_MODELS, MODELS
from skynet.dgpu.errors import DGPUComputeError
from skynet.utils import convert_from_bytes_and_crop, convert_from_cv2_to_image, convert_from_image_to_cv2, convert_from_img_to_bytes, init_upscaler, pipeline_for
def prepare_params_for_diffuse(
params: dict,
binary: bytes | None = None
):
image = None
if binary:
image = convert_from_bytes_and_crop(binary, 512, 512)
_params = {}
if image:
_params['image'] = image
_params['strength'] = float(params['strength'])
else:
_params['width'] = int(params['width'])
_params['height'] = int(params['height'])
return (
params['prompt'],
float(params['guidance']),
int(params['step']),
torch.manual_seed(int(params['seed'])),
params['upscaler'] if 'upscaler' in params else None,
_params
)
class SkynetMM:
def __init__(self, config: dict):
self.upscaler = init_upscaler()
self.initial_models = (
config['initial_models']
if 'initial_models' in config else DEFAULT_INITAL_MODELS
)
self._models = {}
for model in self.initial_models:
self.load_model(model, False, force=True)
def log_debug_info(self):
logging.info('memory summary:')
logging.info('\n' + torch.cuda.memory_summary())
def is_model_loaded(self, model_name: str, image: bool):
for model_key, model_data in self._models.items():
if (model_key == model_name and
model_data['image'] == image):
return True
return False
def load_model(
self,
model_name: str,
image: bool,
force=False
):
logging.info(f'loading model {model_name}...')
if force or len(self._models.keys()) == 0:
pipe = pipeline_for(model_name, image=image)
self._models[model_name] = {
'pipe': pipe,
'generated': 0,
'image': image
}
else:
least_used = list(self._models.keys())[0]
for model in self._models:
if self._models[
least_used]['generated'] > self._models[model]['generated']:
least_used = model
del self._models[least_used]
logging.info(f'swapping model {least_used} for {model_name}...')
gc.collect()
torch.cuda.empty_cache()
pipe = pipeline_for(model_name, image=image)
self._models[model_name] = {
'pipe': pipe,
'generated': 0,
'image': image
}
logging.info(f'loaded model {model_name}')
return pipe
def get_model(self, model_name: str, image: bool):
if model_name not in MODELS:
raise DGPUComputeError(f'Unknown model {model_name}')
if not self.is_model_loaded(model_name, image):
pipe = self.load_model(model_name, image=image)
else:
pipe = self._models[model_name]
return pipe
def compute_one(
self,
method: str,
params: dict,
binary: bytes | None = None
):
try:
match method:
case 'diffuse':
image = None
arguments = prepare_params_for_diffuse(params, binary)
prompt, guidance, step, seed, upscaler, extra_params = arguments
model = self.get_model(params['model'], 'image' in params)
image = model['pipe'](
prompt,
guidance_scale=guidance,
num_inference_steps=step,
generator=seed,
**extra_params
).images[0]
if upscaler == 'x4':
input_img = image.convert('RGB')
up_img, _ = upscaler.enhance(
convert_from_image_to_cv2(input_img), outscale=4)
image = convert_from_cv2_to_image(up_img)
img_raw = convert_from_img_to_bytes(image)
img_sha = sha256(img_raw).hexdigest()
return img_sha, img_raw
case _:
raise DGPUComputeError('Unsupported compute method')
except BaseException as e:
logging.error(e)
raise DGPUComputeError(str(e))
finally:
torch.cuda.empty_cache()

View File

@ -0,0 +1,92 @@
#!/usr/bin/python
import json
import logging
import traceback
from hashlib import sha256
import trio
from skynet.dgpu.compute import SkynetMM
from skynet.dgpu.network import SkynetGPUConnector
class SkynetDGPUDaemon:
def __init__(
self,
mm: SkynetMM,
conn: SkynetGPUConnector,
config: dict
):
self.mm = mm
self.conn = conn
self.auto_withdraw = (
config['auto_withdraw']
if 'auto_withdraw' in config else False
)
async def serve_forever(self):
try:
while True:
if self.auto_withdraw:
await self.conn.maybe_withdraw_all()
queue = await self.conn.get_work_requests_last_hour()
for req in queue:
rid = req['id']
my_results = [res['id'] for res in (await self.conn.find_my_results())]
if rid not in my_results:
statuses = await self.conn.get_status_by_request_id(rid)
if len(statuses) < req['min_verification']:
# parse request
body = json.loads(req['body'])
binary = await self.conn.get_input_data(req['binary_data'])
hash_str = (
str(req['nonce'])
+
req['body']
+
req['binary_data']
)
logging.info(f'hashing: {hash_str}')
request_hash = sha256(hash_str.encode('utf-8')).hexdigest()
# TODO: validate request
# perform work
logging.info(f'working on {body}')
resp = await self.conn.begin_work(rid)
if 'code' in resp:
logging.info(f'probably beign worked on already... skip.')
else:
try:
img_sha, img_raw = self.mm.compute_one(
body['method'], body['params'], binary=binary)
ipfs_hash = self.conn.publish_on_ipfs( img_raw)
await self.conn.submit_work(rid, request_hash, img_sha, ipfs_hash)
break
except BaseException as e:
traceback.print_exc()
await self.conn.cancel_work(rid, str(e))
break
else:
logging.info(f'request {rid} already beign worked on, skip...')
await trio.sleep(1)
except KeyboardInterrupt:
...

View File

@ -0,0 +1,5 @@
#!/usr/bin/python
class DGPUComputeError(BaseException):
...

View File

@ -0,0 +1,191 @@
#!/usr/bin/python
import io
import json
import time
import logging
import asks
from PIL import Image
from contextlib import ExitStack
from contextlib import asynccontextmanager as acm
from leap.cleos import CLEOS
from leap.sugar import Checksum256, Name, asset_from_str
from skynet.dgpu.errors import DGPUComputeError
from skynet.ipfs import get_ipfs_file, open_ipfs_node
class SkynetGPUConnector:
def __init__(self, config: dict):
self.account = Name(config['account'])
self.permission = config['permission']
self.key = config['key']
self.node_url = config['node_url']
self.hyperion_url = config['hyperion_url']
self.ipfs_url = config['ipfs_url']
self.cleos = CLEOS(
None, None, self.node_url, remote=self.node_url)
self._exit_stack = ExitStack()
def connect(self):
self.ipfs_node = self._exit_stack.enter_context(
open_ipfs_node())
def disconnect(self):
self._exit_stack.close()
@acm
async def open(self):
self.connect()
yield self
self.disconnect()
# blockchain helpers
async def get_work_requests_last_hour(self):
logging.info('get_work_requests_last_hour')
try:
return await self.cleos.aget_table(
'telos.gpu', 'telos.gpu', 'queue',
index_position=2,
key_type='i64',
lower_bound=int(time.time()) - 3600
)
except (
asks.errors.RequestTimeout,
json.JSONDecodeError
):
return []
async def get_status_by_request_id(self, request_id: int):
logging.info('get_status_by_request_id')
return await self.cleos.aget_table(
'telos.gpu', request_id, 'status')
async def get_global_config(self):
logging.info('get_global_config')
return (await self.cleos.aget_table(
'telos.gpu', 'telos.gpu', 'config'))[0]
async def get_worker_balance(self):
logging.info('get_worker_balance')
rows = await self.cleos.aget_table(
'telos.gpu', 'telos.gpu', 'users',
index_position=1,
key_type='name',
lower_bound=self.account,
upper_bound=self.account
)
if len(rows) == 1:
return rows[0]['balance']
else:
return None
async def begin_work(self, request_id: int):
logging.info('begin_work')
return await self.cleos.a_push_action(
'telos.gpu',
'workbegin',
{
'worker': self.account,
'request_id': request_id,
'max_workers': 2
},
self.account, self.key,
permission=self.permission
)
async def cancel_work(self, request_id: int, reason: str):
logging.info('cancel_work')
return await self.cleos.a_push_action(
'telos.gpu',
'workcancel',
{
'worker': self.account,
'request_id': request_id,
'reason': reason
},
self.account, self.key,
permission=self.permission
)
async def maybe_withdraw_all(self):
logging.info('maybe_withdraw_all')
balance = await self.get_worker_balance()
if not balance:
return
balance_amount = float(balance.split(' ')[0])
if balance_amount > 0:
await self.cleos.a_push_action(
'telos.gpu',
'withdraw',
{
'user': self.account,
'quantity': asset_from_str(balance)
},
self.account, self.key,
permission=self.permission
)
async def find_my_results(self):
logging.info('find_my_results')
return await self.cleos.aget_table(
'telos.gpu', 'telos.gpu', 'results',
index_position=4,
key_type='name',
lower_bound=self.account,
upper_bound=self.account
)
async def submit_work(
self,
request_id: int,
request_hash: str,
result_hash: str,
ipfs_hash: str
):
logging.info('submit_work')
await self.cleos.a_push_action(
'telos.gpu',
'submit',
{
'worker': self.account,
'request_id': request_id,
'request_hash': Checksum256(request_hash),
'result_hash': Checksum256(result_hash),
'ipfs_hash': ipfs_hash
},
self.account, self.key,
permission=self.permission
)
# IPFS helpers
def publish_on_ipfs(self, raw_img: bytes):
logging.info('publish_on_ipfs')
img = Image.open(io.BytesIO(raw_img))
img.save(f'ipfs-docker-staging/image.png')
ipfs_hash = self.ipfs_node.add('image.png')
self.ipfs_node.pin(ipfs_hash)
return ipfs_hash
async def get_input_data(self, ipfs_hash: str) -> bytes:
if ipfs_hash == '':
return b''
resp = await get_ipfs_file(f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png')
if resp.status_code != 200:
raise DGPUComputeError('Couldn\'t gather input data from ipfs')
return resp.raw

View File

@ -214,7 +214,7 @@ class SkynetTelegramFrontend:
if not ipfs_hash: if not ipfs_hash:
await self.update_status_message( await self.update_status_message(
status_msg, status_msg,
'\n[{timestamp_pretty()}] <b>timeout processing request</b>', f'\n[{timestamp_pretty()}] <b>timeout processing request</b>',
parse_mode='HTML' parse_mode='HTML'
) )
return return

View File

@ -151,6 +151,9 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'):
**user_config **user_config
} }
params['model'] = get_model_by_shortname(params['algo'])
del params['algo']
await db_call( await db_call(
'update_user_stats', user.id, 'txt2img', last_prompt=prompt) 'update_user_stats', user.id, 'txt2img', last_prompt=prompt)
@ -227,6 +230,8 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'):
'prompt': prompt, 'prompt': prompt,
**user_config **user_config
} }
params['model'] = get_model_by_shortname(params['algo'])
del params['algo']
await db_call( await db_call(
'update_user_stats', 'update_user_stats',
@ -297,6 +302,8 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'):
'prompt': prompt, 'prompt': prompt,
**user_config **user_config
} }
params['model'] = get_model_by_shortname(params['algo'])
del params['algo']
await work_request( await work_request(
user, status_msg, 'redo', params, user, status_msg, 'redo', params,

View File

@ -53,7 +53,7 @@ def prepare_metainfo_caption(tguser, worker: str, reward: str, meta: dict) -> st
meta_str += f'<code>guidance: {meta["guidance"]}</code>\n' meta_str += f'<code>guidance: {meta["guidance"]}</code>\n'
if meta['strength']: if meta['strength']:
meta_str += f'<code>strength: {meta["strength"]}</code>\n' meta_str += f'<code>strength: {meta["strength"]}</code>\n'
meta_str += f'<code>algo: {meta["algo"]}</code>\n' meta_str += f'<code>algo: {meta["model"]}</code>\n'
if meta['upscaler']: if meta['upscaler']:
meta_str += f'<code>upscaler: {meta["upscaler"]}</code>\n' meta_str += f'<code>upscaler: {meta["upscaler"]}</code>\n'

View File

@ -116,12 +116,14 @@ def open_nodeos(cleanup: bool = True):
priv, pub = cleos.create_key_pair() priv, pub = cleos.create_key_pair()
cleos.import_key(priv) cleos.import_key(priv)
cleos.private_keys['telos.gpu'] = priv
logging.info(f'GPU KEYS: {(priv, pub)}') logging.info(f'GPU KEYS: {(priv, pub)}')
cleos.new_account('telos.gpu', ram=4200000, key=pub) cleos.new_account('telos.gpu', ram=4200000, key=pub)
for i in range(1, 4): for i in range(1, 4):
priv, pub = cleos.create_key_pair() priv, pub = cleos.create_key_pair()
cleos.import_key(priv) cleos.import_key(priv)
cleos.private_keys[f'testworker{i}'] = priv
logging.info(f'testworker{i} KEYS: {(priv, pub)}') logging.info(f'testworker{i} KEYS: {(priv, pub)}')
cleos.create_account_staked( cleos.create_account_staked(
'eosio', f'testworker{i}', key=pub) 'eosio', f'testworker{i}', key=pub)

View File

@ -1,5 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
import io
import os import os
import time import time
import random import random
@ -13,6 +14,7 @@ import numpy as np
from PIL import Image from PIL import Image
from basicsr.archs.rrdbnet_arch import RRDBNet from basicsr.archs.rrdbnet_arch import RRDBNet
from diffusers import ( from diffusers import (
DiffusionPipeline,
StableDiffusionPipeline, StableDiffusionPipeline,
StableDiffusionImg2ImgPipeline, StableDiffusionImg2ImgPipeline,
EulerAncestralDiscreteScheduler EulerAncestralDiscreteScheduler
@ -20,7 +22,7 @@ from diffusers import (
from realesrgan import RealESRGANer from realesrgan import RealESRGANer
from huggingface_hub import login from huggingface_hub import login
from .constants import ALGOS from .constants import MODELS
def time_ms(): def time_ms():
@ -37,7 +39,24 @@ def convert_from_image_to_cv2(img: Image) -> np.ndarray:
return np.asarray(img) return np.asarray(img)
def pipeline_for(algo: str, mem_fraction: float = 1.0, image=False): def convert_from_bytes_to_img(raw: bytes) -> Image:
return Image.open(io.BytesIO(raw))
def convert_from_img_to_bytes(image: Image, fmt='PNG') -> bytes:
byte_arr = io.BytesIO()
image.save(byte_arr, format=fmt)
return byte_arr.getvalue()
def convert_from_bytes_and_crop(raw: bytes, max_w: int, max_h: int) -> Image:
image = convert_from_bytes_to_img(raw)
w, h = image.size
if w > max_w or h > max_h:
image.thumbnail((512, 512))
def pipeline_for(model: str, mem_fraction: float = 1.0, image=False) -> DiffusionPipeline:
assert torch.cuda.is_available() assert torch.cuda.is_available()
torch.cuda.empty_cache() torch.cuda.empty_cache()
torch.cuda.set_per_process_memory_fraction(mem_fraction) torch.cuda.set_per_process_memory_fraction(mem_fraction)
@ -56,7 +75,7 @@ def pipeline_for(algo: str, mem_fraction: float = 1.0, image=False):
'safety_checker': None 'safety_checker': None
} }
if algo == 'stable': if model == 'runwayml/stable-diffusion-v1-5':
params['revision'] = 'fp16' params['revision'] = 'fp16'
if image: if image:
@ -65,7 +84,7 @@ def pipeline_for(algo: str, mem_fraction: float = 1.0, image=False):
pipe_class = StableDiffusionPipeline pipe_class = StableDiffusionPipeline
pipe = pipe_class.from_pretrained( pipe = pipe_class.from_pretrained(
ALGOS[algo], **params) model, **params)
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config( pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(
pipe.scheduler.config) pipe.scheduler.config)
@ -78,7 +97,7 @@ def pipeline_for(algo: str, mem_fraction: float = 1.0, image=False):
def txt2img( def txt2img(
hf_token: str, hf_token: str,
model: str = 'midj', model: str = 'prompthero/openjourney',
prompt: str = 'a red old tractor in a sunny wheat field', prompt: str = 'a red old tractor in a sunny wheat field',
output: str = 'output.png', output: str = 'output.png',
width: int = 512, height: int = 512, width: int = 512, height: int = 512,
@ -110,7 +129,7 @@ def txt2img(
def img2img( def img2img(
hf_token: str, hf_token: str,
model: str = 'midj', model: str = 'prompthero/openjourney',
prompt: str = 'a red old tractor in a sunny wheat field', prompt: str = 'a red old tractor in a sunny wheat field',
img_path: str = 'input.png', img_path: str = 'input.png',
output: str = 'output.png', output: str = 'output.png',
@ -143,6 +162,23 @@ def img2img(
image.save(output) image.save(output)
def init_upscaler(model_path: str = 'weights/RealESRGAN_x4plus.pth'):
return RealESRGANer(
scale=4,
model_path=model_path,
dni_weight=None,
model=RRDBNet(
num_in_ch=3,
num_out_ch=3,
num_feat=64,
num_block=23,
num_grow_ch=32,
scale=4
),
half=True
)
def upscale( def upscale(
img_path: str = 'input.png', img_path: str = 'input.png',
output: str = 'output.png', output: str = 'output.png',
@ -156,19 +192,7 @@ def upscale(
input_img = Image.open(img_path).convert('RGB') input_img = Image.open(img_path).convert('RGB')
upscaler = RealESRGANer( upscaler = init_upscaler(model_path=model_path)
scale=4,
model_path=model_path,
dni_weight=None,
model=RRDBNet(
num_in_ch=3,
num_out_ch=3,
num_feat=64,
num_block=23,
num_grow_ch=32,
scale=4
),
half=True)
up_img, _ = upscaler.enhance( up_img, _ = upscaler.enhance(
convert_from_image_to_cv2(input_img), outscale=4) convert_from_image_to_cv2(input_img), outscale=4)
@ -183,7 +207,8 @@ def download_all_models(hf_token: str):
assert torch.cuda.is_available() assert torch.cuda.is_available()
login(token=hf_token) login(token=hf_token)
for model in ALGOS: for model in MODELS:
print(f'DOWNLOADING {model.upper()}') print(f'DOWNLOADING {model.upper()}')
pipeline_for(model) pipeline_for(model)
print(f'DOWNLOADING IMAGE {model.upper()}')
pipeline_for(model, image=True)

View File

@ -8,6 +8,7 @@ from functools import partial
import trio import trio
import requests import requests
from skynet.constants import DEFAULT_IPFS_REMOTE
from skynet.dgpu import open_dgpu_node from skynet.dgpu import open_dgpu_node
@ -32,7 +33,7 @@ def test_enqueue_work(cleos):
binary = '' binary = ''
ec, out = cleos.push_action( ec, out = cleos.push_action(
'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU'], f'{user}@active' 'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU', 1], f'{user}@active'
) )
assert ec == 0 assert ec == 0
@ -53,6 +54,8 @@ def test_enqueue_work(cleos):
f'testworker1', f'testworker1',
'active', 'active',
cleos, cleos,
DEFAULT_IPFS_REMOTE,
cleos.private_keys['testworker1'],
initial_algos=['midj'] initial_algos=['midj']
) )
) )
@ -80,12 +83,13 @@ def test_enqueue_dequeue(cleos):
binary = '' binary = ''
ec, out = cleos.push_action( ec, out = cleos.push_action(
'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU'], f'{user}@active' 'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU', 1], f'{user}@active'
) )
assert ec == 0 assert ec == 0
request_id = int(collect_stdout(out)) request_id, _ = collect_stdout(out).split(':')
request_id = int(request_id)
queue = cleos.get_table('telos.gpu', 'telos.gpu', 'queue') queue = cleos.get_table('telos.gpu', 'telos.gpu', 'queue')