Merge pull request #26 from guilledk/worker_upgrade_reloaded

Worker upgrade reloaded
master v0.1a12
Guillermo Rodriguez 2023-10-13 17:22:22 -03:00 committed by GitHub
commit 8a415b450f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 5081 additions and 714 deletions

View File

@ -7,3 +7,4 @@ outputs
*.egg-info
**/*.key
**/*.cert
.venv

4
.gitignore vendored
View File

@ -1,4 +1,4 @@
skynet.ini
skynet.toml
.python-version
hf_home
outputs
@ -9,5 +9,5 @@ secrets
**/*.cert
docs
ipfs-docker-data
ipfs-docker-staging
ipfs-staging
weights

104
README.md
View File

@ -1,30 +1,104 @@
# skynet
### decentralized compute platform
<div align="center">
<img src="https://explorer.skygpu.net/v2/explore/assets/logo.png" width=512 height=512>
</div>
## decentralized compute platform
### native install
system dependencies:
- `cuda` 11.8
- `llvm` 10
- `python` 3.10+
- `docker` (for ipfs node)
To launch a worker:
```
# create and edit config from template
cp skynet.ini.example skynet.ini
cp skynet.toml.example skynet.toml
# create python virtual envoirment 3.10+
python3 -m venv venv
# install poetry package manager
curl -sSL https://install.python-poetry.org | python3 -
# enable envoirment
source venv/bin/activate
# install
poetry install
# install requirements
pip install -r requirements.txt
pip install -r requirements.cuda.0.txt
pip install -r requirements.cuda.1.txt
pip install -r requirements.cuda.2.txt
# install skynet
pip install -e .
# enable environment
poetry shell
# test you can run this command
skynet --help
# launch ipfs node
skynet run ipfs
# to launch worker
skynet run dgpu
```
### dockerized install
## frontend
system dependencies:
- `docker`
```
# create and edit config from template
cp skynet.toml.example skynet.toml
# pull runtime container
docker pull guilledk/skynet:runtime-frontend
# run telegram bot
docker run \
-it \
--rm \
--network host \
--name skynet-telegram \
--mount type=bind,source="$(pwd)",target=/root/target \
guilledk/skynet:runtime-frontend \
skynet run telegram --db-pass PASSWORD --db-user USER --db-host HOST
```
## worker
system dependencies:
- `docker` with gpu enabled
```
# create and edit config from template
cp skynet.toml.example skynet.toml
# pull runtime container
docker pull guilledk/skynet:runtime-cuda
# or build it (takes a bit of time)
./build_docker.sh
# launch simple ipfs node
./launch_ipfs.sh
# run worker with all gpus
docker run \
-it \
--rm \
--gpus all \
--network host \
--name skynet-worker \
--mount type=bind,source="$(pwd)",target=/root/target \
guilledk/skynet:runtime-cuda \
skynet run dgpu
# run worker with specific gpu
docker run \
-it \
--rm \
--gpus '"device=1"' \
--network host \
--name skynet-worker-1 \
--mount type=bind,source="$(pwd)",target=/root/target \
guilledk/skynet:runtime-cuda \
skynet run dgpu
```

View File

@ -1,7 +0,0 @@
docker build \
-t skynet:runtime-cuda \
-f docker/Dockerfile.runtime+cuda .
docker build \
-t skynet:runtime \
-f docker/Dockerfile.runtime .

View File

@ -2,15 +2,24 @@ from python:3.11
env DEBIAN_FRONTEND=noninteractive
run apt-get update && apt-get install -y \
git
run curl -sSL https://install.python-poetry.org | python3 -
env PATH "/root/.local/bin:$PATH"
copy . /skynet
workdir /skynet
copy requirements.txt requirements.txt
copy pytest.ini ./
copy setup.py ./
copy skynet ./skynet
env POETRY_VIRTUALENVS_PATH /skynet/.venv
run pip install \
-e . \
-r requirements.txt
run poetry install
copy tests ./
workdir /root/target
copy docker/entrypoint.sh /entrypoint.sh
entrypoint ["/entrypoint.sh"]
cmd ["skynet", "--help"]

View File

@ -1,29 +0,0 @@
from nvidia/cuda:11.7.0-devel-ubuntu20.04
from python:3.11
env DEBIAN_FRONTEND=noninteractive
run apt-get update && \
apt-get install -y ffmpeg libsm6 libxext6
workdir /skynet
copy requirements.cuda* ./
run pip install -U pip ninja
run pip install -v -r requirements.cuda.0.txt
run pip install -v -r requirements.cuda.1.txt
run pip install -v -r requirements.cuda.2.txt
copy requirements.txt requirements.txt
copy pytest.ini pytest.ini
copy setup.py setup.py
copy skynet skynet
run pip install -e . -r requirements.txt
env PYTORCH_CUDA_ALLOC_CONF max_split_size_mb:128
env NVIDIA_VISIBLE_DEVICES=all
env HF_HOME /hf_home
copy tests tests

View File

@ -0,0 +1,46 @@
from nvidia/cuda:11.8.0-devel-ubuntu20.04
from python:3.10
env DEBIAN_FRONTEND=noninteractive
run apt-get update && apt-get install -y \
git \
clang \
cmake \
ffmpeg \
libsm6 \
libxext6 \
ninja-build
env CC /usr/bin/clang
env CXX /usr/bin/clang++
# install llvm10 as required by llvm-lite
run git clone https://github.com/llvm/llvm-project.git -b llvmorg-10.0.1
workdir /llvm-project
# this adds a commit from 12.0.0 that fixes build on newer compilers
run git cherry-pick -n b498303066a63a203d24f739b2d2e0e56dca70d1
run cmake -S llvm -B build -G Ninja -DCMAKE_BUILD_TYPE=Release
run ninja -C build install # -j8
run curl -sSL https://install.python-poetry.org | python3 -
env PATH "/root/.local/bin:$PATH"
copy . /skynet
workdir /skynet
env POETRY_VIRTUALENVS_PATH /skynet/.venv
run poetry install --with=cuda -v
workdir /root/target
env PYTORCH_CUDA_ALLOC_CONF max_split_size_mb:128
env NVIDIA_VISIBLE_DEVICES=all
copy docker/entrypoint.sh /entrypoint.sh
entrypoint ["/entrypoint.sh"]
cmd ["skynet", "--help"]

View File

@ -0,0 +1,46 @@
from nvidia/cuda:11.8.0-devel-ubuntu20.04
from python:3.11
env DEBIAN_FRONTEND=noninteractive
run apt-get update && apt-get install -y \
git \
clang \
cmake \
ffmpeg \
libsm6 \
libxext6 \
ninja-build
env CC /usr/bin/clang
env CXX /usr/bin/clang++
# install llvm10 as required by llvm-lite
run git clone https://github.com/llvm/llvm-project.git -b llvmorg-10.0.1
workdir /llvm-project
# this adds a commit from 12.0.0 that fixes build on newer compilers
run git cherry-pick -n b498303066a63a203d24f739b2d2e0e56dca70d1
run cmake -S llvm -B build -G Ninja -DCMAKE_BUILD_TYPE=Release
run ninja -C build install # -j8
run curl -sSL https://install.python-poetry.org | python3 -
env PATH "/root/.local/bin:$PATH"
copy . /skynet
workdir /skynet
env POETRY_VIRTUALENVS_PATH /skynet/.venv
run poetry install --with=cuda -v
workdir /root/target
env PYTORCH_CUDA_ALLOC_CONF max_split_size_mb:128
env NVIDIA_VISIBLE_DEVICES=all
copy docker/entrypoint.sh /entrypoint.sh
entrypoint ["/entrypoint.sh"]
cmd ["skynet", "--help"]

View File

@ -0,0 +1,25 @@
from python:3.11
env DEBIAN_FRONTEND=noninteractive
run apt-get update && apt-get install -y \
git
run curl -sSL https://install.python-poetry.org | python3 -
env PATH "/root/.local/bin:$PATH"
copy . /skynet
workdir /skynet
env POETRY_VIRTUALENVS_PATH /skynet/.venv
run poetry install --with=frontend -v
workdir /root/target
copy docker/entrypoint.sh /entrypoint.sh
entrypoint ["/entrypoint.sh"]
cmd ["skynet", "--help"]

View File

@ -0,0 +1,20 @@
docker build \
-t guilledk/skynet:runtime \
-f Dockerfile.runtime .
docker build \
-t guilledk/skynet:runtime-frontend \
-f Dockerfile.runtime+frontend .
docker build \
-t guilledk/skynet:runtime-cuda-py311 \
-f Dockerfile.runtime+cuda-py311 .
docker build \
-t guilledk/skynet:runtime-cuda \
-f Dockerfile.runtime+cuda-py311 .
docker build \
-t guilledk/skynet:runtime-cuda-py310 \
-f Dockerfile.runtime+cuda-py310 .

View File

@ -0,0 +1,8 @@
#!/bin/sh
export VIRTUAL_ENV='/skynet/.venv'
poetry env use $VIRTUAL_ENV/bin/python
poetry install
exec poetry run "$@"

View File

@ -0,0 +1,5 @@
docker push guilledk/skynet:runtime
docker push guilledk/skynet:runtime-frontend
docker push guilledk/skynet:runtime-cuda
docker push guilledk/skynet:runtime-cuda-py311
docker push guilledk/skynet:runtime-cuda-py310

36
launch_ipfs.sh 100755
View File

@ -0,0 +1,36 @@
#!/bin/bash
name='skynet-ipfs'
peers=("$@")
data_dir="$(pwd)/ipfs-docker-data"
data_target='/data/ipfs'
# Create data directory if it doesn't exist
mkdir -p "$data_dir"
# Run the container
docker run -d \
--name "$name" \
-p 8080:8080/tcp \
-p 4001:4001/tcp \
-p 127.0.0.1:5001:5001/tcp \
--mount type=bind,source="$data_dir",target="$data_target" \
--rm \
ipfs/go-ipfs:latest
# Change ownership
docker exec "$name" chown 1000:1000 -R "$data_target"
# Wait for Daemon to be ready
while read -r log; do
echo "$log"
if [[ "$log" == *"Daemon is ready"* ]]; then
break
fi
done < <(docker logs -f "$name")
# Connect to peers
for peer in "${peers[@]}"; do
docker exec "$name" ipfs swarm connect "$peer" || echo "Error connecting to peer: $peer"
done

3835
poetry.lock generated 100644

File diff suppressed because it is too large Load Diff

2
poetry.toml 100644
View File

@ -0,0 +1,2 @@
[virtualenvs]
in-project = true

67
pyproject.toml 100644
View File

@ -0,0 +1,67 @@
[tool.poetry]
name = 'skynet'
version = '0.1a12'
description = 'Decentralized compute platform'
authors = ['Guillermo Rodriguez <guillermo@telos.net>']
license = 'AGPL'
readme = 'README.md'
[tool.poetry.dependencies]
python = '>=3.10,<3.12'
pytz = '^2023.3.post1'
trio = '^0.22.2'
asks = '^3.0.0'
Pillow = '^10.0.1'
docker = '^6.1.3'
py-leap = {git = 'https://github.com/guilledk/py-leap.git', rev = 'v0.1a14'}
toml = '^0.10.2'
[tool.poetry.group.frontend]
optional = true
[tool.poetry.group.frontend.dependencies]
triopg = {version = '^0.6.0'}
aiohttp = {version = '^3.8.5'}
psycopg2-binary = {version = '^2.9.7'}
pyTelegramBotAPI = {version = '^4.14.0'}
'discord.py' = {version = '^2.3.2'}
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
pdbpp = {version = '^0.10.3'}
pytest = {version = '^7.4.2'}
[tool.poetry.group.cuda]
optional = true
[tool.poetry.group.cuda.dependencies]
torch = {version = '2.0.1+cu118', source = 'torch'}
scipy = {version = '^1.11.2'}
numba = {version = '0.57.0'}
quart = {version = '^0.19.3'}
triton = {version = '2.0.0', source = 'torch'}
basicsr = {version = '^1.4.2'}
xformers = {version = '^0.0.22'}
hypercorn = {version = '^0.14.4'}
diffusers = {version = '^0.21.2'}
realesrgan = {version = '^0.3.0'}
quart-trio = {version = '^0.11.0'}
torchvision = {version = '0.15.2+cu118', source = 'torch'}
accelerate = {version = '^0.23.0'}
transformers = {version = '^4.33.2'}
huggingface-hub = {version = '^0.17.3'}
invisible-watermark = {version = '^0.2.0'}
[[tool.poetry.source]]
name = 'torch'
url = 'https://download.pytorch.org/whl/cu118'
priority = 'explicit'
[build-system]
requires = ['poetry-core', 'cython']
build-backend = 'poetry.core.masonry.api'
[tool.poetry.scripts]
skynet = 'skynet.cli:skynet'

View File

@ -1,9 +0,0 @@
scipy
triton
accelerate
transformers
huggingface_hub
diffusers[torch]>=0.18.0
invisible-watermark
torch==1.13.0+cu117
--extra-index-url https://download.pytorch.org/whl/cu117

View File

@ -1 +0,0 @@
git+https://github.com/facebookresearch/xformers.git@main#egg=xformers

View File

@ -1,2 +0,0 @@
basicsr
realesrgan

View File

@ -1,15 +0,0 @@
pytz
trio
asks
numpy
pdbpp
Pillow
triopg
pytest
docker
aiohttp
psycopg2-binary
pyTelegramBotAPI
discord.py
py-leap@git+https://github.com/guilledk/py-leap.git@v0.1a14

View File

@ -1,21 +0,0 @@
from setuptools import setup, find_packages
from skynet.constants import VERSION
setup(
name='skynet',
version=VERSION,
description='Decentralized compute platform',
author='Guillermo Rodriguez',
author_email='guillermo@telos.net',
packages=find_packages(),
entry_points={
'console_scripts': [
'skynet = skynet.cli:skynet',
'txt2img = skynet.cli:txt2img',
'img2img = skynet.cli:img2img',
'upscale = skynet.cli:upscale'
]
},
install_requires=['click']
)

View File

@ -1,38 +0,0 @@
[skynet.dgpu]
account = testworkerX
permission = active
key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
node_url = https://testnet.skygpu.net
hyperion_url = https://testnet.skygpu.net
ipfs_gateway_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv
ipfs_url = http://127.0.0.1:5001
hf_home = hf_home
hf_token = hf_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx
auto_withdraw = True
[skynet.telegram]
account = telegram
permission = active
key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
node_url = https://testnet.skygpu.net
hyperion_url = https://testnet.skygpu.net
ipfs_gateway_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv
ipfs_url = http://127.0.0.1:5001
token = XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
[skynet.discord]
account = discord
permission = active
key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
node_url = https://testnet.skygpu.net
hyperion_url = https://testnet.skygpu.net
ipfs_gateway_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv
ipfs_url = http://127.0.0.1:5001
token = XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

View File

@ -0,0 +1,47 @@
# config sections are optional, depending on which services
# you wish to run
[skynet.dgpu]
account = 'testworkerX'
permission = 'active'
key = '5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
node_url = 'https://testnet.skygpu.net'
hyperion_url = 'https://testnet.skygpu.net'
ipfs_gateway_url = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv'
ipfs_url = 'http://127.0.0.1:5001'
hf_home = 'hf_home'
hf_token = 'hf_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx'
auto_withdraw = true
non_compete = []
api_bind = '127.0.0.1:42690'
[skynet.telegram]
account = 'telegram'
permission = 'active'
key = '5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
node_url = 'https://testnet.skygpu.net'
hyperion_url = 'https://testnet.skygpu.net'
ipfs_gateway_url = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv'
ipfs_url = 'http://127.0.0.1:5001'
token = 'XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
[skynet.discord]
account = 'discord'
permission = 'active'
key = '5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
node_url = 'https://testnet.skygpu.net'
hyperion_url = 'https://testnet.skygpu.net'
ipfs_gateway_url = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv'
ipfs_url = 'http://127.0.0.1:5001'
token = 'XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
[skynet.pinner]
hyperion_url = 'https://testnet.skygpu.net'
ipfs_url = 'http://127.0.0.1:5001'
[skynet.user]
account = 'testuser'
permission = 'active'
key = '5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
node_url = 'https://testnet.skygpu.net'

View File

@ -6,25 +6,12 @@ import random
from functools import partial
import trio
import asks
import click
import asyncio
import requests
from leap.cleos import CLEOS
from leap.sugar import collect_stdout, Name, asset_from_str
from leap.hyperion import HyperionAPI
from leap.sugar import Name, asset_from_str
from skynet.ipfs import AsyncIPFSHTTP
from .db import open_new_database
from .config import *
from .nodeos import open_cleos, open_nodeos
from .constants import *
from .frontend.telegram import SkynetTelegramFrontend
from .frontend.discord import SkynetDiscordFrontend
@click.group()
@ -44,7 +31,11 @@ def skynet(*args, **kwargs):
@click.option('--seed', '-S', default=None)
def txt2img(*args, **kwargs):
from . import utils
_, hf_token, _, _ = init_env_from_config()
config = load_skynet_toml()
hf_token = load_key(config, 'skynet.dgpu.hf_token')
hf_home = load_key(config, 'skynet.dgpu.hf_home')
set_hf_vars(hf_token, hf_home)
utils.txt2img(hf_token, **kwargs)
@click.command()
@ -59,7 +50,10 @@ def txt2img(*args, **kwargs):
@click.option('--seed', '-S', default=None)
def img2img(model, prompt, input, output, strength, guidance, steps, seed):
from . import utils
_, hf_token, _, _ = init_env_from_config()
config = load_skynet_toml()
hf_token = load_key(config, 'skynet.dgpu.hf_token')
hf_home = load_key(config, 'skynet.dgpu.hf_home')
set_hf_vars(hf_token, hf_home)
utils.img2img(
hf_token,
model=model,
@ -87,95 +81,95 @@ def upscale(input, output, model):
@skynet.command()
def download():
from . import utils
_, hf_token, _, _ = init_env_from_config()
utils.download_all_models(hf_token)
config = load_skynet_toml()
hf_token = load_key(config, 'skynet.dgpu.hf_token')
hf_home = load_key(config, 'skynet.dgpu.hf_home')
set_hf_vars(hf_token, hf_home)
utils.download_all_models(hf_token, hf_home)
@skynet.command()
@click.option(
'--account', '-A', default=None)
@click.option(
'--permission', '-P', default=None)
@click.option(
'--key', '-k', default=None)
@click.option(
'--node-url', '-n', default='https://skynet.ancap.tech')
@click.option(
'--reward', '-r', default='20.0000 GPU')
@click.option('--jobs', '-j', default=1)
@click.option('--model', '-m', default='prompthero/openjourney')
@click.option('--model', '-m', default='stabilityai/stable-diffusion-xl-base-1.0')
@click.option(
'--prompt', '-p', default='a red old tractor in a sunny wheat field')
@click.option('--output', '-o', default='output.png')
@click.option('--width', '-w', default=512)
@click.option('--height', '-h', default=512)
@click.option('--width', '-w', default=1024)
@click.option('--height', '-h', default=1024)
@click.option('--guidance', '-g', default=10)
@click.option('--step', '-s', default=26)
@click.option('--seed', '-S', default=None)
@click.option('--upscaler', '-U', default='x4')
@click.option('--binary_data', '-b', default='')
@click.option('--strength', '-Z', default=None)
def enqueue(
account: str,
permission: str,
key: str | None,
node_url: str,
reward: str,
jobs: int,
**kwargs
):
key, account, permission = load_account_info(
'user', key, account, permission)
import trio
from leap.cleos import CLEOS
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
config = load_skynet_toml()
with open_cleos(node_url, key=key) as cleos:
async def enqueue_n_jobs():
for i in range(jobs):
if not kwargs['seed']:
kwargs['seed'] = random.randint(0, 10e9)
key = load_key(config, 'skynet.user.key')
account = load_key(config, 'skynet.user.account')
permission = load_key(config, 'skynet.user.permission')
node_url = load_key(config, 'skynet.user.node_url')
req = json.dumps({
'method': 'diffuse',
'params': kwargs
})
binary = kwargs['binary_data']
cleos = CLEOS(None, None, url=node_url, remote=node_url)
res = await cleos.a_push_action(
'telos.gpu',
'enqueue',
{
'user': Name(account),
'request_body': req,
'binary_data': binary,
'reward': asset_from_str(reward),
'min_verification': 1
},
account, key, permission,
)
print(res)
trio.run(enqueue_n_jobs)
binary = kwargs['binary_data']
if not kwargs['strength']:
if binary:
raise ValueError('strength -Z param required if binary data passed')
del kwargs['strength']
else:
kwargs['strength'] = float(kwargs['strength'])
async def enqueue_n_jobs():
for i in range(jobs):
if not kwargs['seed']:
kwargs['seed'] = random.randint(0, 10e9)
req = json.dumps({
'method': 'diffuse',
'params': kwargs
})
res = await cleos.a_push_action(
'telos.gpu',
'enqueue',
{
'user': Name(account),
'request_body': req,
'binary_data': binary,
'reward': asset_from_str(reward),
'min_verification': 1
},
account, key, permission,
)
print(res)
trio.run(enqueue_n_jobs)
@skynet.command()
@click.option('--loglevel', '-l', default='INFO', help='Logging level')
@click.option(
'--account', '-A', default='telos.gpu')
@click.option(
'--permission', '-P', default='active')
@click.option(
'--key', '-k', default=None)
@click.option(
'--node-url', '-n', default='https://skynet.ancap.tech')
def clean(
loglevel: str,
account: str,
permission: str,
key: str | None,
node_url: str,
):
key, account, permission = load_account_info(
'user', key, account, permission)
import trio
from leap.cleos import CLEOS
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
config = load_skynet_toml()
key = load_key(config, 'skynet.user.key')
account = load_key(config, 'skynet.user.account')
permission = load_key(config, 'skynet.user.permission')
node_url = load_key(config, 'skynet.user.node_url')
logging.basicConfig(level=loglevel)
cleos = CLEOS(None, None, url=node_url, remote=node_url)
@ -190,10 +184,10 @@ def clean(
)
@skynet.command()
@click.option(
'--node-url', '-n', default='https://skynet.ancap.tech')
def queue(node_url: str):
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
def queue():
import requests
config = load_skynet_toml()
node_url = load_key(config, 'skynet.user.node_url')
resp = requests.post(
f'{node_url}/v1/chain/get_table_rows',
json={
@ -206,11 +200,11 @@ def queue(node_url: str):
print(json.dumps(resp.json(), indent=4))
@skynet.command()
@click.option(
'--node-url', '-n', default='https://skynet.ancap.tech')
@click.argument('request-id')
def status(node_url: str, request_id: int):
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
def status(request_id: int):
import requests
config = load_skynet_toml()
node_url = load_key(config, 'skynet.user.node_url')
resp = requests.post(
f'{node_url}/v1/chain/get_table_rows',
json={
@ -223,26 +217,16 @@ def status(node_url: str, request_id: int):
print(json.dumps(resp.json(), indent=4))
@skynet.command()
@click.option(
'--account', '-a', default='telegram')
@click.option(
'--permission', '-p', default='active')
@click.option(
'--key', '-k', default=None)
@click.option(
'--node-url', '-n', default='https://skynet.ancap.tech')
@click.argument('request-id')
def dequeue(
account: str,
permission: str,
key: str | None,
node_url: str,
request_id: int
):
key, account, permission = load_account_info(
'user', key, account, permission)
def dequeue(request_id: int):
import trio
from leap.cleos import CLEOS
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
config = load_skynet_toml()
key = load_key(config, 'skynet.user.key')
account = load_key(config, 'skynet.user.account')
permission = load_key(config, 'skynet.user.permission')
node_url = load_key(config, 'skynet.user.node_url')
cleos = CLEOS(None, None, url=node_url, remote=node_url)
res = trio.run(
@ -261,29 +245,24 @@ def dequeue(
@skynet.command()
@click.option(
'--account', '-a', default='telos.gpu')
@click.option(
'--permission', '-p', default='active')
@click.option(
'--key', '-k', default=None)
@click.option(
'--node-url', '-n', default='https://skynet.ancap.tech')
@click.option(
'--token-contract', '-c', default='eosio.token')
@click.option(
'--token-symbol', '-S', default='4,GPU')
def config(
account: str,
permission: str,
key: str | None,
node_url: str,
token_contract: str,
token_symbol: str
):
key, account, permission = load_account_info(
'user', key, account, permission)
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
import trio
from leap.cleos import CLEOS
config = load_skynet_toml()
key = load_key(config, 'skynet.user.key')
account = load_key(config, 'skynet.user.account')
permission = load_key(config, 'skynet.user.permission')
node_url = load_key(config, 'skynet.user.node_url')
cleos = CLEOS(None, None, url=node_url, remote=node_url)
res = trio.run(
partial(
@ -301,26 +280,18 @@ def config(
@skynet.command()
@click.option(
'--account', '-a', default='telegram')
@click.option(
'--permission', '-p', default='active')
@click.option(
'--key', '-k', default=None)
@click.option(
'--node-url', '-n', default='https://skynet.ancap.tech')
@click.argument('quantity')
def deposit(
account: str,
permission: str,
key: str | None,
node_url: str,
quantity: str
):
key, account, permission = load_account_info(
'user', key, account, permission)
def deposit(quantity: str):
import trio
from leap.cleos import CLEOS
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
config = load_skynet_toml()
key = load_key(config, 'skynet.user.key')
account = load_key(config, 'skynet.user.account')
permission = load_key(config, 'skynet.user.permission')
node_url = load_key(config, 'skynet.user.node_url')
cleos = CLEOS(None, None, url=node_url, remote=node_url)
res = trio.run(
partial(
@ -345,6 +316,8 @@ def run(*args, **kwargs):
@run.command()
def db():
from .db import open_new_database
logging.basicConfig(level=logging.INFO)
with open_new_database(cleanup=False) as db_params:
container, passwd, host = db_params
@ -352,6 +325,8 @@ def db():
@run.command()
def nodeos():
from .nodeos import open_nodeos
logging.basicConfig(filename='skynet-nodeos.log', level=logging.INFO)
with open_nodeos(cleanup=False):
...
@ -359,38 +334,29 @@ def nodeos():
@run.command()
@click.option('--loglevel', '-l', default='INFO', help='Logging level')
@click.option(
'--config-path', '-c', default='skynet.ini')
'--config-path', '-c', default=DEFAULT_CONFIG_PATH)
def dgpu(
loglevel: str,
config_path: str
):
import trio
from .dgpu import open_dgpu_node
logging.basicConfig(level=loglevel)
config = load_skynet_ini(file_path=config_path)
config = load_skynet_toml(file_path=config_path)
hf_token = load_key(config, 'skynet.dgpu.hf_token')
hf_home = load_key(config, 'skynet.dgpu.hf_home')
set_hf_vars(hf_token, hf_home)
assert 'skynet.dgpu' in config
assert 'skynet' in config
assert 'dgpu' in config['skynet']
trio.run(open_dgpu_node, config['skynet.dgpu'])
trio.run(open_dgpu_node, config['skynet']['dgpu'])
@run.command()
@click.option('--loglevel', '-l', default='INFO', help='logging level')
@click.option(
'--account', '-a', default='telegram')
@click.option(
'--permission', '-p', default='active')
@click.option(
'--key', '-k', default=None)
@click.option(
'--hyperion-url', '-y', default=f'https://testnet.{DEFAULT_DOMAIN}')
@click.option(
'--node-url', '-n', default=f'https://testnet.{DEFAULT_DOMAIN}')
@click.option(
'--ipfs-url', '-i', default=DEFAULT_IPFS_LOCAL)
@click.option(
'--ipfs-gateway-url', '-I', default=None)
@click.option(
'--db-host', '-h', default='localhost:5432')
@click.option(
@ -399,26 +365,43 @@ def dgpu(
'--db-pass', '-u', default='password')
def telegram(
loglevel: str,
account: str,
permission: str,
key: str | None,
hyperion_url: str,
ipfs_url: str,
ipfs_gateway_url: str,
node_url: str,
db_host: str,
db_user: str,
db_pass: str
):
import asyncio
from .frontend.telegram import SkynetTelegramFrontend
logging.basicConfig(level=loglevel)
_, _, tg_token, _ = init_env_from_config()
config = load_skynet_toml()
tg_token = load_key(config, 'skynet.telegram.tg_token')
key, account, permission = load_account_info(
'telegram', key, account, permission)
key = load_key(config, 'skynet.telegram.key')
account = load_key(config, 'skynet.telegram.account')
permission = load_key(config, 'skynet.telegram.permission')
node_url = load_key(config, 'skynet.telegram.node_url')
hyperion_url = load_key(config, 'skynet.telegram.hyperion_url')
node_url, _, ipfs_gateway_url, ipfs_url = load_endpoint_info(
'telegram', node_url=node_url, ipfs_gateway_url=ipfs_gateway_url)
try:
ipfs_gateway_url = load_key(config, 'skynet.telegram.ipfs_gateway_url')
except ConfigParsingError:
ipfs_gateway_url = None
ipfs_url = load_key(config, 'skynet.telegram.ipfs_url')
try:
explorer_domain = load_key(config, 'skynet.telegram.explorer_domain')
except ConfigParsingError:
explorer_domain = DEFAULT_EXPLORER_DOMAIN
try:
ipfs_domain = load_key(config, 'skynet.telegram.ipfs_domain')
except ConfigParsingError:
ipfs_domain = DEFAULT_IPFS_DOMAIN
async def _async_main():
frontend = SkynetTelegramFrontend(
@ -430,7 +413,9 @@ def telegram(
db_host, db_user, db_pass,
ipfs_url,
remote_ipfs_node=ipfs_gateway_url,
key=key
key=key,
explorer_domain=explorer_domain,
ipfs_domain=ipfs_domain
)
async with frontend.open():
@ -442,20 +427,6 @@ def telegram(
@run.command()
@click.option('--loglevel', '-l', default='INFO', help='logging level')
@click.option(
'--account', '-a', default='discord')
@click.option(
'--permission', '-p', default='active')
@click.option(
'--key', '-k', default=None)
@click.option(
'--hyperion-url', '-y', default=f'https://testnet.{DEFAULT_DOMAIN}')
@click.option(
'--node-url', '-n', default=f'https://testnet.{DEFAULT_DOMAIN}')
@click.option(
'--ipfs-url', '-i', default=DEFAULT_IPFS_LOCAL)
@click.option(
'--ipfs-gateway-url', '-I', default=DEFAULT_IPFS_REMOTE)
@click.option(
'--db-host', '-h', default='localhost:5432')
@click.option(
@ -464,26 +435,38 @@ def telegram(
'--db-pass', '-u', default='password')
def discord(
loglevel: str,
account: str,
permission: str,
key: str | None,
hyperion_url: str,
ipfs_url: str,
ipfs_gateway_url: str,
node_url: str,
db_host: str,
db_user: str,
db_pass: str
):
import asyncio
from .frontend.discord import SkynetDiscordFrontend
logging.basicConfig(level=loglevel)
_, _, _, dc_token = init_env_from_config()
config = load_skynet_toml()
dc_token = load_key(config, 'skynet.discord.dc_token')
key, account, permission = load_account_info(
'discord', key, account, permission)
key = load_key(config, 'skynet.discord.key')
account = load_key(config, 'skynet.discord.account')
permission = load_key(config, 'skynet.discord.permission')
node_url = load_key(config, 'skynet.discord.node_url')
hyperion_url = load_key(config, 'skynet.discord.hyperion_url')
node_url, _, ipfs_gateway_url, ipfs_url = load_endpoint_info(
'discord', node_url=node_url, ipfs_gateway_url=ipfs_gateway_url)
ipfs_gateway_url = load_key(config, 'skynet.discord.ipfs_gateway_url')
ipfs_url = load_key(config, 'skynet.discord.ipfs_url')
try:
explorer_domain = load_key(config, 'skynet.discord.explorer_domain')
except ConfigParsingError:
explorer_domain = DEFAULT_EXPLORER_DOMAIN
try:
ipfs_domain = load_key(config, 'skynet.discord.ipfs_domain')
except ConfigParsingError:
ipfs_domain = DEFAULT_IPFS_DOMAIN
async def _async_main():
frontend = SkynetDiscordFrontend(
@ -495,7 +478,9 @@ def discord(
db_host, db_user, db_pass,
ipfs_url,
remote_ipfs_node=ipfs_gateway_url,
key=key
key=key,
explorer_domain=explorer_domain,
ipfs_domain=ipfs_domain
)
async with frontend.open():
@ -507,24 +492,28 @@ def discord(
@run.command()
@click.option('--loglevel', '-l', default='INFO', help='logging level')
@click.option('--name', '-n', default='skynet-ipfs', help='container name')
def ipfs(loglevel, name):
@click.option('--peer', '-p', default=(), help='connect to peer', multiple=True, type=str)
def ipfs(loglevel, name, peer):
from skynet.ipfs.docker import open_ipfs_node
logging.basicConfig(level=loglevel)
with open_ipfs_node(name=name):
with open_ipfs_node(name=name, peers=peer):
...
@run.command()
@click.option('--loglevel', '-l', default='INFO', help='logging level')
@click.option(
'--ipfs-rpc', '-i', default='http://127.0.0.1:5001')
@click.option(
'--hyperion-url', '-y', default='http://127.0.0.1:42001')
def pinner(loglevel, ipfs_rpc, hyperion_url):
def pinner(loglevel):
import trio
from leap.hyperion import HyperionAPI
from .ipfs import AsyncIPFSHTTP
from .ipfs.pinner import SkynetPinner
config = load_skynet_toml()
hyperion_url = load_key(config, 'skynet.pinner.hyperion_url')
ipfs_url = load_key(config, 'skynet.pinner.ipfs_url')
logging.basicConfig(level=loglevel)
ipfs_node = AsyncIPFSHTTP(ipfs_rpc)
ipfs_node = AsyncIPFSHTTP(ipfs_url)
hyperion = HyperionAPI(hyperion_url)
pinner = SkynetPinner(hyperion, ipfs_node)

View File

@ -1,117 +1,33 @@
#!/usr/bin/python
import os
import json
import toml
from pathlib import Path
from configparser import ConfigParser
from re import sub
from .constants import DEFAULT_CONFIG_PATH
def load_skynet_ini(
file_path=DEFAULT_CONFIG_PATH
):
config = ConfigParser()
config.read(file_path)
class ConfigParsingError(BaseException):
...
def load_skynet_toml(file_path=DEFAULT_CONFIG_PATH) -> dict:
config = toml.load(file_path)
return config
def load_key(config: dict, key: str) -> str:
for skey in key.split('.'):
if skey not in config:
conf_keys = [k for k in config]
raise ConfigParsingError(f'key \"{skey}\" not in {conf_keys}')
config = config[skey]
return config
def init_env_from_config(
hf_token: str | None = None,
hf_home: str | None = None,
tg_token: str | None = None,
dc_token: str | None = None,
file_path=DEFAULT_CONFIG_PATH
):
config = load_skynet_ini(file_path=file_path)
if 'HF_TOKEN' in os.environ:
hf_token = os.environ['HF_TOKEN']
elif 'skynet.dgpu' in config:
sub_config = config['skynet.dgpu']
if 'hf_token' in sub_config:
hf_token = sub_config['hf_token']
os.environ['HF_TOKEN'] = hf_token
if 'HF_HOME' in os.environ:
hf_home = os.environ['HF_HOME']
elif 'skynet.dgpu' in config:
sub_config = config['skynet.dgpu']
if 'hf_home' in sub_config:
hf_home = sub_config['hf_home']
os.environ['HF_HOME'] = hf_home
if 'TG_TOKEN' in os.environ:
tg_token = os.environ['TG_TOKEN']
elif 'skynet.telegram' in config:
sub_config = config['skynet.telegram']
if 'token' in sub_config:
tg_token = sub_config['token']
if 'DC_TOKEN' in os.environ:
dc_token = os.environ['DC_TOKEN']
elif 'skynet.discord' in config:
sub_config = config['skynet.discord']
if 'token' in sub_config:
dc_token = sub_config['token']
return hf_home, hf_token, tg_token, dc_token
def load_account_info(
_type: str,
key: str | None = None,
account: str | None = None,
permission: str | None = None,
file_path=DEFAULT_CONFIG_PATH
):
config = load_skynet_ini(file_path=file_path)
type_key = f'skynet.{_type}'
if type_key in config:
sub_config = config[type_key]
if not key and 'key' in sub_config:
key = sub_config['key']
if not account and 'account' in sub_config:
account = sub_config['account']
if not permission and 'permission' in sub_config:
permission = sub_config['permission']
return key, account, permission
def load_endpoint_info(
_type: str,
node_url: str | None = None,
hyperion_url: str | None = None,
ipfs_url: str | None = None,
ipfs_gateway_url: str | None = None,
file_path=DEFAULT_CONFIG_PATH
):
config = load_skynet_ini(file_path=file_path)
type_key = f'skynet.{_type}'
if type_key in config:
sub_config = config[type_key]
if not node_url and 'node_url' in sub_config:
node_url = sub_config['node_url']
if not hyperion_url and 'hyperion_url' in sub_config:
hyperion_url = sub_config['hyperion_url']
if not ipfs_url and 'ipfs_url' in sub_config:
ipfs_url = sub_config['ipfs_url']
if not ipfs_gateway_url and 'ipfs_gateway_url' in sub_config:
ipfs_gateway_url = sub_config['ipfs_gateway_url']
return node_url, hyperion_url, ipfs_gateway_url, ipfs_url
def set_hf_vars(hf_token: str, hf_home: str):
os.environ['HF_TOKEN'] = hf_token
os.environ['HF_HOME'] = hf_home

View File

@ -1,22 +1,24 @@
#!/usr/bin/python
VERSION = '0.1a11'
VERSION = '0.1a12'
DOCKER_RUNTIME_CUDA = 'skynet:runtime-cuda'
MODELS = {
'prompthero/openjourney': { 'short': 'midj'},
'runwayml/stable-diffusion-v1-5': { 'short': 'stable'},
'stabilityai/stable-diffusion-2-1-base': { 'short': 'stable2'},
'snowkidy/stable-diffusion-xl-base-0.9': { 'short': 'stablexl0.9'},
'stabilityai/stable-diffusion-xl-base-1.0': { 'short': 'stablexl'},
'Linaqruf/anything-v3.0': { 'short': 'hdanime'},
'hakurei/waifu-diffusion': { 'short': 'waifu'},
'nitrosocke/Ghibli-Diffusion': { 'short': 'ghibli'},
'dallinmackay/Van-Gogh-diffusion': { 'short': 'van-gogh'},
'lambdalabs/sd-pokemon-diffusers': { 'short': 'pokemon'},
'Envvi/Inkpunk-Diffusion': { 'short': 'ink'},
'nousr/robo-diffusion': { 'short': 'robot'}
'prompthero/openjourney': {'short': 'midj', 'mem': 6},
'runwayml/stable-diffusion-v1-5': {'short': 'stable', 'mem': 6},
'stabilityai/stable-diffusion-2-1-base': {'short': 'stable2', 'mem': 6},
'snowkidy/stable-diffusion-xl-base-0.9': {'short': 'stablexl0.9', 'mem': 8.3},
'Linaqruf/anything-v3.0': {'short': 'hdanime', 'mem': 6},
'hakurei/waifu-diffusion': {'short': 'waifu', 'mem': 6},
'nitrosocke/Ghibli-Diffusion': {'short': 'ghibli', 'mem': 6},
'dallinmackay/Van-Gogh-diffusion': {'short': 'van-gogh', 'mem': 6},
'lambdalabs/sd-pokemon-diffusers': {'short': 'pokemon', 'mem': 6},
'Envvi/Inkpunk-Diffusion': {'short': 'ink', 'mem': 6},
'nousr/robo-diffusion': {'short': 'robot', 'mem': 6},
# default is always last
'stabilityai/stable-diffusion-xl-base-1.0': {'short': 'stablexl', 'mem': 8.3},
}
SHORT_NAMES = [
@ -158,15 +160,14 @@ DEFAULT_GUIDANCE = 7.5
DEFAULT_STRENGTH = 0.5
DEFAULT_STEP = 28
DEFAULT_CREDITS = 10
DEFAULT_MODEL = list(MODELS.keys())[4]
DEFAULT_MODEL = list(MODELS.keys())[-1]
DEFAULT_ROLE = 'pleb'
DEFAULT_UPSCALER = None
DEFAULT_CONFIG_PATH = 'skynet.ini'
DEFAULT_CONFIG_PATH = 'skynet.toml'
DEFAULT_INITAL_MODELS = [
'prompthero/openjourney',
'runwayml/stable-diffusion-v1-5'
'stabilityai/stable-diffusion-xl-base-1.0'
]
DATE_FORMAT = '%B the %dth %Y, %H:%M:%S'
@ -182,10 +183,13 @@ CONFIG_ATTRS = [
'upscaler'
]
DEFAULT_DOMAIN = 'skygpu.net'
DEFAULT_EXPLORER_DOMAIN = 'explorer.skygpu.net'
DEFAULT_IPFS_DOMAIN = 'ipfs.skygpu.net'
DEFAULT_IPFS_REMOTE = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv'
DEFAULT_IPFS_LOCAL = 'http://127.0.0.1:5001'
TG_MAX_WIDTH = 1280
TG_MAX_HEIGHT = 1280
DEFAULT_SINGLE_CARD_MAP = 'cuda:0'

View File

@ -43,6 +43,7 @@ CREATE TABLE IF NOT EXISTS skynet.user_config(
guidance DECIMAL NOT NULL,
strength DECIMAL NOT NULL,
upscaler VARCHAR(128),
autoconf BOOLEAN DEFAULT TRUE,
CONSTRAINT fk_config
FOREIGN KEY(id)
REFERENCES skynet.user(id)
@ -165,6 +166,15 @@ async def open_database_connection(
else:
await conn.execute(DB_INIT_SQL)
col_check = await conn.fetch(f'''
select column_name
from information_schema.columns
where table_name = 'user_config' and column_name = 'autoconf';
''')
if not col_check:
await conn.execute('alter table skynet.user_config add column autoconf boolean default true;')
async def _db_call(method: str, *args, **kwargs):
method = getattr(db, method)

View File

@ -2,6 +2,9 @@
import trio
from hypercorn.config import Config
from hypercorn.trio import serve
from skynet.dgpu.compute import SkynetMM
from skynet.dgpu.daemon import SkynetDGPUDaemon
from skynet.dgpu.network import SkynetGPUConnector
@ -10,7 +13,18 @@ from skynet.dgpu.network import SkynetGPUConnector
async def open_dgpu_node(config: dict):
conn = SkynetGPUConnector(config)
mm = SkynetMM(config)
daemon = SkynetDGPUDaemon(mm, conn, config)
async with conn.open() as conn:
await (SkynetDGPUDaemon(mm, conn, config)
.serve_forever())
api = None
if 'api_bind' in config:
api_conf = Config()
api_conf.bind = [config['api_bind']]
api = await daemon.generate_api()
async with trio.open_nursery() as n:
n.start_soon(daemon.snap_updater_task)
if api:
n.start_soon(serve, api, api_conf)
await daemon.serve_forever()

View File

@ -3,30 +3,42 @@
# Skynet Memory Manager
import gc
from hashlib import sha256
import json
import logging
from hashlib import sha256
import zipfile
from PIL import Image
from diffusers import DiffusionPipeline
import trio
import torch
from skynet.constants import DEFAULT_INITAL_MODELS, MODELS
from skynet.dgpu.errors import DGPUComputeError
from skynet.utils import convert_from_bytes_and_crop, convert_from_cv2_to_image, convert_from_image_to_cv2, convert_from_img_to_bytes, init_upscaler, pipeline_for
from skynet.constants import DEFAULT_INITAL_MODELS, MODELS
from skynet.dgpu.errors import DGPUComputeError, DGPUInferenceCancelled
from skynet.utils import crop_image, convert_from_cv2_to_image, convert_from_image_to_cv2, convert_from_img_to_bytes, init_upscaler, pipeline_for
def prepare_params_for_diffuse(
params: dict,
binary: bytes | None = None
input_type: str,
binary = None
):
image = None
if binary:
image = convert_from_bytes_and_crop(binary, 512, 512)
_params = {}
if image:
_params['image'] = image
_params['strength'] = float(params['strength'])
if binary != None:
match input_type:
case 'png':
image = crop_image(
binary, params['width'], params['height'])
_params['image'] = image
_params['strength'] = float(params['strength'])
case 'none':
...
case _:
raise DGPUComputeError(f'Unknown input_type {input_type}')
else:
_params['width'] = int(params['width'])
@ -51,6 +63,10 @@ class SkynetMM:
if 'initial_models' in config else DEFAULT_INITAL_MODELS
)
self.cache_dir = None
if 'hf_home' in config:
self.cache_dir = config['hf_home']
self._models = {}
for model in self.initial_models:
self.load_model(model, False, force=True)
@ -75,7 +91,9 @@ class SkynetMM:
):
logging.info(f'loading model {model_name}...')
if force or len(self._models.keys()) == 0:
pipe = pipeline_for(model_name, image=image)
pipe = pipeline_for(
model_name, image=image, cache_dir=self.cache_dir)
self._models[model_name] = {
'pipe': pipe,
'generated': 0,
@ -97,7 +115,8 @@ class SkynetMM:
gc.collect()
torch.cuda.empty_cache()
pipe = pipeline_for(model_name, image=image)
pipe = pipeline_for(
model_name, image=image, cache_dir=self.cache_dir)
self._models[model_name] = {
'pipe': pipe,
@ -122,38 +141,61 @@ class SkynetMM:
def compute_one(
self,
request_id: int,
method: str,
params: dict,
input_type: str = 'png',
binary: bytes | None = None
):
def maybe_cancel_work(step, *args, **kwargs):
if self._should_cancel:
should_raise = trio.from_thread.run(self._should_cancel, request_id)
if should_raise:
logging.warn(f'cancelling work at step {step}')
raise DGPUInferenceCancelled()
maybe_cancel_work(0)
output_type = 'png'
if 'output_type' in params:
output_type = params['output_type']
output = None
output_hash = None
try:
match method:
case 'diffuse':
image = None
arguments = prepare_params_for_diffuse(params, binary)
arguments = prepare_params_for_diffuse(
params, input_type, binary=binary)
prompt, guidance, step, seed, upscaler, extra_params = arguments
model = self.get_model(params['model'], 'image' in extra_params)
image = model(
output = model(
prompt,
guidance_scale=guidance,
num_inference_steps=step,
generator=seed,
callback=maybe_cancel_work,
callback_steps=1,
**extra_params
).images[0]
if upscaler == 'x4':
input_img = image.convert('RGB')
up_img, _ = self.upscaler.enhance(
convert_from_image_to_cv2(input_img), outscale=4)
output_binary = b''
match output_type:
case 'png':
if upscaler == 'x4':
input_img = output.convert('RGB')
up_img, _ = self.upscaler.enhance(
convert_from_image_to_cv2(input_img), outscale=4)
image = convert_from_cv2_to_image(up_img)
output = convert_from_cv2_to_image(up_img)
img_raw = convert_from_img_to_bytes(image)
img_sha = sha256(img_raw).hexdigest()
output_binary = convert_from_img_to_bytes(output)
return img_sha, img_raw
case _:
raise DGPUComputeError(f'Unsupported output type: {output_type}')
output_hash = sha256(output_binary).hexdigest()
case _:
raise DGPUComputeError('Unsupported compute method')
@ -164,3 +206,5 @@ class SkynetMM:
finally:
torch.cuda.empty_cache()
return output_hash, output

View File

@ -1,17 +1,35 @@
#!/usr/bin/python
import json
import random
import logging
import time
import traceback
from hashlib import sha256
from datetime import datetime
from functools import partial
import trio
from quart import jsonify
from quart_trio import QuartTrio as Quart
from skynet.constants import MODELS, VERSION
from skynet.dgpu.errors import *
from skynet.dgpu.compute import SkynetMM
from skynet.dgpu.network import SkynetGPUConnector
def convert_reward_to_int(reward_str):
int_part, decimal_part = (
reward_str.split('.')[0],
reward_str.split('.')[1].split(' ')[0]
)
return int(int_part + decimal_part)
class SkynetDGPUDaemon:
def __init__(
@ -27,27 +45,120 @@ class SkynetDGPUDaemon:
if 'auto_withdraw' in config else False
)
self.account = config['account']
self.non_compete = set()
if 'non_compete' in config:
self.non_compete = set(config['non_compete'])
self.model_whitelist = set()
if 'model_whitelist' in config:
self.model_whitelist = set(config['model_whitelist'])
self.model_blacklist = set()
if 'model_blacklist' in config:
self.model_blacklist = set(config['model_blacklist'])
self.backend = 'sync-on-thread'
if 'backend' in config:
self.backend = config['backend']
self._snap = {
'queue': [],
'requests': {},
'my_results': []
}
self._benchmark = []
self._last_benchmark = None
self._last_generation_ts = None
def _get_benchmark_speed(self) -> float:
if not self._last_benchmark:
return 0
start = self._last_benchmark[0]
end = self._last_benchmark[-1]
elapsed = end - start
its = len(self._last_benchmark)
speed = its / elapsed
logging.info(f'{elapsed} s total its: {its}, at {speed} it/s ')
return speed
async def should_cancel_work(self, request_id: int):
self._benchmark.append(time.time())
competitors = set([
status['worker']
for status in self._snap['requests'][request_id]
if status['worker'] != self.account
])
return bool(self.non_compete & competitors)
async def snap_updater_task(self):
while True:
self._snap = await self.conn.get_full_queue_snapshot()
await trio.sleep(1)
async def generate_api(self):
app = Quart(__name__)
@app.route('/')
async def health():
return jsonify(
account=self.account,
version=VERSION,
last_generation_ts=self._last_generation_ts,
last_generation_speed=self._get_benchmark_speed()
)
return app
async def serve_forever(self):
try:
while True:
if self.auto_withdraw:
await self.conn.maybe_withdraw_all()
queue = await self.conn.get_work_requests_last_hour()
queue = self._snap['queue']
random.shuffle(queue)
queue = sorted(
queue,
key=lambda req: convert_reward_to_int(req['reward']),
reverse=True
)
for req in queue:
rid = req['id']
my_results = [res['id'] for res in (await self.conn.find_my_results())]
if rid not in my_results:
statuses = await self.conn.get_status_by_request_id(rid)
# parse request
body = json.loads(req['body'])
model = body['params']['model']
# if model not known
if model not in MODELS:
logging.warning(f'Unknown model {model}')
continue
# if whitelist enabled and model not in it continue
if (len(self.model_whitelist) > 0 and
not model in self.model_whitelist):
continue
# if blacklist contains model skip
if model in self.model_blacklist:
continue
my_results = [res['id'] for res in self._snap['my_results']]
if rid not in my_results and rid in self._snap['requests']:
statuses = self._snap['requests'][rid]
if len(statuses) == 0:
# parse request
body = json.loads(req['body'])
binary = await self.conn.get_input_data(req['binary_data'])
binary, input_type = await self.conn.get_input_data(req['binary_data'])
hash_str = (
str(req['nonce'])
@ -70,17 +181,40 @@ class SkynetDGPUDaemon:
else:
try:
img_sha, img_raw = self.mm.compute_one(
body['method'], body['params'], binary=binary)
output_type = 'png'
if 'output_type' in body['params']:
output_type = body['params']['output_type']
ipfs_hash = await self.conn.publish_on_ipfs(img_raw)
output = None
output_hash = None
match self.backend:
case 'sync-on-thread':
self.mm._should_cancel = self.should_cancel_work
output_hash, output = await trio.to_thread.run_sync(
partial(
self.mm.compute_one,
rid,
body['method'], body['params'],
input_type=input_type,
binary=binary
)
)
await self.conn.submit_work(rid, request_hash, img_sha, ipfs_hash)
break
case _:
raise DGPUComputeError(f'Unsupported backend {self.backend}')
self._last_generation_ts = datetime.now().isoformat()
self._last_benchmark = self._benchmark
self._benchmark = []
ipfs_hash = await self.conn.publish_on_ipfs(output, typ=output_type)
await self.conn.submit_work(rid, request_hash, output_hash, ipfs_hash)
except BaseException as e:
traceback.print_exc()
await self.conn.cancel_work(rid, str(e))
finally:
break
else:

View File

@ -3,3 +3,6 @@
class DGPUComputeError(BaseException):
...
class DGPUInferenceCancelled(BaseException):
...

View File

@ -1,23 +1,28 @@
#!/usr/bin/python
from functools import partial
import io
import json
from pathlib import Path
import time
import logging
import asks
from PIL import Image
from pathlib import Path
from functools import partial
from contextlib import asynccontextmanager as acm
import asks
import trio
import anyio
from PIL import Image, UnidentifiedImageError
from leap.cleos import CLEOS
from leap.sugar import Checksum256, Name, asset_from_str
from skynet.constants import DEFAULT_DOMAIN
from skynet.constants import DEFAULT_IPFS_DOMAIN
from skynet.dgpu.errors import DGPUComputeError
from skynet.ipfs import AsyncIPFSHTTP, get_ipfs_file
from skynet.dgpu.errors import DGPUComputeError
REQUEST_UPDATE_TIME = 3
async def failable(fn: partial, ret_fail=None):
@ -25,8 +30,11 @@ async def failable(fn: partial, ret_fail=None):
return await fn()
except (
OSError,
json.JSONDecodeError,
asks.errors.RequestTimeout,
json.JSONDecodeError
asks.errors.BadHttpResponse,
anyio.BrokenResourceError
):
return ret_fail
@ -44,11 +52,19 @@ class SkynetGPUConnector:
self.cleos = CLEOS(
None, None, self.node_url, remote=self.node_url)
self.ipfs_gateway_url = config['ipfs_gateway_url']
self.ipfs_gateway_url = None
if 'ipfs_gateway_url' in config:
self.ipfs_gateway_url = config['ipfs_gateway_url']
self.ipfs_url = config['ipfs_url']
self.ipfs_client = AsyncIPFSHTTP(self.ipfs_url)
self.ipfs_domain = DEFAULT_IPFS_DOMAIN
if 'ipfs_domain' in config:
self.ipfs_domain = config['ipfs_domain']
self._wip_requests = {}
# blockchain helpers
async def get_work_requests_last_hour(self):
@ -98,6 +114,36 @@ class SkynetGPUConnector:
else:
return None
async def get_competitors_for_req(self, request_id: int) -> set:
competitors = [
status['worker']
for status in
(await self.get_status_by_request_id(request_id))
if status['worker'] != self.account
]
logging.info(f'competitors: {competitors}')
return set(competitors)
async def get_full_queue_snapshot(self):
snap = {
'requests': {},
'my_results': []
}
snap['queue'] = await self.get_work_requests_last_hour()
async def _run_and_save(d, key: str, fn, *args, **kwargs):
d[key] = await fn(*args, **kwargs)
async with trio.open_nursery() as n:
n.start_soon(_run_and_save, snap, 'my_results', self.find_my_results)
for req in snap['queue']:
n.start_soon(
_run_and_save, snap['requests'], req['id'], self.get_status_by_request_id, req['id'])
return snap
async def begin_work(self, request_id: int):
logging.info('begin_work')
return await failable(
@ -193,31 +239,74 @@ class SkynetGPUConnector:
)
# IPFS helpers
async def publish_on_ipfs(self, raw_img: bytes):
async def publish_on_ipfs(self, raw, typ: str = 'png'):
Path('ipfs-staging').mkdir(exist_ok=True)
logging.info('publish_on_ipfs')
img = Image.open(io.BytesIO(raw_img))
img.save('ipfs-docker-staging/image.png')
# check peer connections, reconnect to skynet gateway if not
gateway_id = Path(self.ipfs_gateway_url).name
peers = await self.ipfs_client.peers()
if gateway_id not in [p['Peer'] for p in peers]:
await self.ipfs_client.connect(self.ipfs_gateway_url)
target_file = ''
match typ:
case 'png':
raw: Image
target_file = 'ipfs-staging/image.png'
raw.save(target_file)
file_info = await self.ipfs_client.add(Path('ipfs-docker-staging/image.png'))
case _:
raise ValueError(f'Unsupported output type: {typ}')
if self.ipfs_gateway_url:
# check peer connections, reconnect to skynet gateway if not
gateway_id = Path(self.ipfs_gateway_url).name
peers = await self.ipfs_client.peers()
if gateway_id not in [p['Peer'] for p in peers]:
await self.ipfs_client.connect(self.ipfs_gateway_url)
file_info = await self.ipfs_client.add(Path(target_file))
file_cid = file_info['Hash']
await self.ipfs_client.pin(file_cid)
return file_cid
async def get_input_data(self, ipfs_hash: str) -> bytes:
if ipfs_hash == '':
return b''
async def get_input_data(self, ipfs_hash: str) -> tuple[bytes, str]:
input_type = 'none'
resp = await get_ipfs_file(f'https://ipfs.{DEFAULT_DOMAIN}/ipfs/{ipfs_hash}/image.png')
if not resp:
if ipfs_hash == '':
return b'', input_type
results = {}
ipfs_link = f'https://{self.ipfs_domain}/ipfs/{ipfs_hash}'
ipfs_link_legacy = ipfs_link + '/image.png'
async with trio.open_nursery() as n:
async def get_and_set_results(link: str):
res = await get_ipfs_file(link, timeout=1)
logging.info(f'got response from {link}')
if not res or res.status_code != 200:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
else:
try:
# attempt to decode as image
results[link] = Image.open(io.BytesIO(res.raw))
input_type = 'png'
n.cancel_scope.cancel()
except UnidentifiedImageError:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
n.start_soon(
get_and_set_results, ipfs_link)
n.start_soon(
get_and_set_results, ipfs_link_legacy)
input_data = None
if ipfs_link_legacy in results:
input_data = results[ipfs_link_legacy]
if ipfs_link in results:
input_data = results[ipfs_link]
if input_data == None:
raise DGPUComputeError('Couldn\'t gather input data from ipfs')
return resp.raw
return input_data, input_type

View File

@ -1,5 +1,7 @@
#!/usr/bin/python
import random
from ..constants import *
@ -15,10 +17,14 @@ class ConfigUnknownAlgorithm(BaseException):
class ConfigUnknownUpscaler(BaseException):
...
class ConfigUnknownAutoConfSetting(BaseException):
...
class ConfigSizeDivisionByEight(BaseException):
...
def validate_user_config_request(req: str):
params = req.split(' ')
@ -78,6 +84,18 @@ def validate_user_config_request(req: str):
raise ConfigUnknownUpscaler(
f'\"{val}\" is not a valid upscaler')
case 'autoconf':
val = params[2]
if val == 'on':
val = True
elif val == 'off':
val = False
else:
raise ConfigUnknownAutoConfSetting(
f'\"{val}\" not a valid setting for autoconf')
case _:
raise ConfigUnknownAttribute(
f'\"{attr}\" not a configurable parameter')
@ -92,3 +110,22 @@ def validate_user_config_request(req: str):
except ValueError:
raise ValueError(f'\"{val}\" is not a number silly')
def perform_auto_conf(config: dict) -> dict:
model = config['model']
prefered_size_w = 512
prefered_size_h = 512
if 'xl' in model:
prefered_size_w = 1024
prefered_size_h = 1024
else:
prefered_size_w = 512
prefered_size_h = 512
config['step'] = random.randint(20, 35)
config['width'] = prefered_size_w
config['height'] = prefered_size_h
return config

View File

@ -45,7 +45,9 @@ class SkynetDiscordFrontend:
db_pass: str,
ipfs_url: str,
remote_ipfs_node: str,
key: str
key: str,
explorer_domain: str,
ipfs_domain: str
):
# self.token = token
self.account = account
@ -58,6 +60,8 @@ class SkynetDiscordFrontend:
self.ipfs_url = ipfs_url
self.remote_ipfs_node = remote_ipfs_node
self.key = key
self.explorer_domain = explorer_domain
self.ipfs_domain = ipfs_domain
self.bot = DiscordBot(self)
self.cleos = CLEOS(None, None, url=node_url, remote=node_url)
@ -169,7 +173,7 @@ class SkynetDiscordFrontend:
return False
enqueue_tx_id = res['transaction_id']
enqueue_tx_link = f'[**Your request on Skynet Explorer**](https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{enqueue_tx_id})'
enqueue_tx_link = f'[**Your request on Skynet Explorer**](https://{self.explorer_domain}/v2/explore/transaction/{enqueue_tx_id})'
msg_text += f'**broadcasted!** \n{enqueue_tx_link}\n[{timestamp_pretty()}] *workers are processing request...* '
embed = discord.Embed(
@ -241,8 +245,48 @@ class SkynetDiscordFrontend:
await message.edit(embed=embed)
# attempt to get the image and send it
ipfs_link = f'https://ipfs.{DEFAULT_DOMAIN}/ipfs/{ipfs_hash}/image.png'
resp = await get_ipfs_file(ipfs_link)
results = {}
ipfs_link = f'https://{self.ipfs_domain}/ipfs/{ipfs_hash}'
ipfs_link_legacy = ipfs_link + '/image.png'
async def get_and_set_results(link: str):
res = await get_ipfs_file(link)
logging.info(f'got response from {link}')
if not res or res.status_code != 200:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
else:
try:
with Image.open(io.BytesIO(res.raw)) as image:
tmp_buf = io.BytesIO()
image.save(tmp_buf, format='PNG')
png_img = tmp_buf.getvalue()
results[link] = png_img
except UnidentifiedImageError:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
tasks = [
get_and_set_results(ipfs_link),
get_and_set_results(ipfs_link_legacy)
]
await asyncio.gather(*tasks)
png_img = None
if ipfs_link_legacy in results:
png_img = results[ipfs_link_legacy]
if ipfs_link in results:
png_img = results[ipfs_link]
if not png_img:
await self.update_status_message(
status_msg,
caption,
reply_markup=build_redo_menu(),
parse_mode='HTML'
)
return True
# reword this function, may not need caption
caption, embed = generate_reply_caption(

View File

@ -81,11 +81,12 @@ def generate_reply_caption(
params: dict,
tx_hash: str,
worker: str,
reward: str
reward: str,
explorer_domain: str
):
explorer_link = discord.Embed(
title='[SKYNET Transaction Explorer]',
url=f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{tx_hash}',
url=f'https://{explorer_domain}/v2/explore/transaction/{tx_hash}',
color=discord.Color.blue())
meta_info = prepare_metainfo_caption(user, worker, reward, params, explorer_link)

View File

@ -5,22 +5,22 @@ import random
import logging
import asyncio
from PIL import Image
from PIL import Image, UnidentifiedImageError
from json import JSONDecodeError
from decimal import Decimal
from hashlib import sha256
from datetime import datetime
from contextlib import ExitStack, AsyncExitStack
from contextlib import AsyncExitStack
from contextlib import asynccontextmanager as acm
from leap.cleos import CLEOS
from leap.sugar import Name, asset_from_str, collect_stdout
from leap.hyperion import HyperionAPI
from telebot.types import InputMediaPhoto
from telebot.types import InputMediaPhoto
from telebot.async_telebot import AsyncTeleBot
from skynet.db import open_new_database, open_database_connection
from skynet.db import open_database_connection
from skynet.ipfs import get_ipfs_file, AsyncIPFSHTTP
from skynet.constants import *
@ -44,7 +44,9 @@ class SkynetTelegramFrontend:
db_pass: str,
ipfs_node: str,
remote_ipfs_node: str | None,
key: str
key: str,
explorer_domain: str,
ipfs_domain: str
):
self.token = token
self.account = account
@ -56,6 +58,8 @@ class SkynetTelegramFrontend:
self.db_pass = db_pass
self.remote_ipfs_node = remote_ipfs_node
self.key = key
self.explorer_domain = explorer_domain
self.ipfs_domain = ipfs_domain
self.bot = AsyncTeleBot(token, exception_handler=SKYExceptionHandler)
self.cleos = CLEOS(None, None, url=node_url, remote=node_url)
@ -161,7 +165,7 @@ class SkynetTelegramFrontend:
enqueue_tx_id = res['transaction_id']
enqueue_tx_link = hlink(
'Your request on Skynet Explorer',
f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{enqueue_tx_id}'
f'https://{self.explorer_domain}/v2/explore/transaction/{enqueue_tx_id}'
)
await self.append_status_message(
@ -222,7 +226,7 @@ class SkynetTelegramFrontend:
tx_link = hlink(
'Your result on Skynet Explorer',
f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{tx_hash}'
f'https://{self.explorer_domain}/v2/explore/transaction/{tx_hash}'
)
await self.append_status_message(
@ -234,14 +238,51 @@ class SkynetTelegramFrontend:
)
caption = generate_reply_caption(
user, params, tx_hash, worker, reward)
user, params, tx_hash, worker, reward, self.explorer_domain)
# attempt to get the image and send it
ipfs_link = f'https://ipfs.{DEFAULT_DOMAIN}/ipfs/{ipfs_hash}/image.png'
resp = await get_ipfs_file(ipfs_link)
results = {}
ipfs_link = f'https://{self.ipfs_domain}/ipfs/{ipfs_hash}'
ipfs_link_legacy = ipfs_link + '/image.png'
if not resp or resp.status_code != 200:
logging.error(f'couldn\'t get ipfs hosted image at {ipfs_link}!')
async def get_and_set_results(link: str):
res = await get_ipfs_file(link)
logging.info(f'got response from {link}')
if not res or res.status_code != 200:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
else:
try:
with Image.open(io.BytesIO(res.raw)) as image:
w, h = image.size
if w > TG_MAX_WIDTH or h > TG_MAX_HEIGHT:
logging.warning(f'result is of size {image.size}')
image.thumbnail((TG_MAX_WIDTH, TG_MAX_HEIGHT))
tmp_buf = io.BytesIO()
image.save(tmp_buf, format='PNG')
png_img = tmp_buf.getvalue()
results[link] = png_img
except UnidentifiedImageError:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
tasks = [
get_and_set_results(ipfs_link),
get_and_set_results(ipfs_link_legacy)
]
await asyncio.gather(*tasks)
png_img = None
if ipfs_link_legacy in results:
png_img = results[ipfs_link_legacy]
if ipfs_link in results:
png_img = results[ipfs_link]
if not png_img:
await self.update_status_message(
status_msg,
caption,
@ -250,17 +291,6 @@ class SkynetTelegramFrontend:
)
return True
png_img = resp.raw
with Image.open(io.BytesIO(resp.raw)) as image:
w, h = image.size
if w > TG_MAX_WIDTH or h > TG_MAX_HEIGHT:
logging.warning(f'result is of size {image.size}')
image.thumbnail((TG_MAX_WIDTH, TG_MAX_HEIGHT))
tmp_buf = io.BytesIO()
image.save(tmp_buf, format='PNG')
png_img = tmp_buf.getvalue()
logging.info(f'success! sending generated image')
await self.bot.delete_message(
chat_id=status_msg.chat.id, message_id=status_msg.id)

View File

@ -9,7 +9,7 @@ from datetime import datetime, timedelta
from PIL import Image
from telebot.types import CallbackQuery, Message
from skynet.frontend import validate_user_config_request
from skynet.frontend import validate_user_config_request, perform_auto_conf
from skynet.constants import *
@ -149,6 +149,9 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'):
user_config = {**user_row}
del user_config['id']
if user_config['autoconf']:
user_config = perform_auto_conf(user_config)
params = {
'prompt': prompt,
**user_config
@ -209,12 +212,18 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'):
file_path = (await bot.get_file(file_id)).file_path
image_raw = await bot.download_file(file_path)
user_config = {**user_row}
del user_config['id']
if user_config['autoconf']:
user_config = perform_auto_conf(user_config)
with Image.open(io.BytesIO(image_raw)) as image:
w, h = image.size
if w > 512 or h > 512:
if w > user_config['width'] or h > user_config['height']:
logging.warning(f'user sent img of size {image.size}')
image.thumbnail((512, 512))
image.thumbnail(
(user_config['width'], user_config['height']))
logging.warning(f'resized it to {image.size}')
image_loc = 'ipfs-staging/image.png'
@ -228,9 +237,6 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'):
logging.info(f'mid: {message.id}')
user_config = {**user_row}
del user_config['id']
params = {
'prompt': prompt,
**user_config
@ -303,6 +309,8 @@ def create_handler_context(frontend: 'SkynetTelegramFrontend'):
'new_user_request', user.id, message.id, status_msg.id, status=init_msg)
user_config = {**user_row}
del user_config['id']
if user_config['autoconf']:
user_config = perform_auto_conf(user_config)
params = {
'prompt': prompt,

View File

@ -67,11 +67,12 @@ def generate_reply_caption(
params: dict,
tx_hash: str,
worker: str,
reward: str
reward: str,
explorer_domain: str
):
explorer_link = hlink(
'SKYNET Transaction Explorer',
f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{tx_hash}'
f'https://explorer.{explorer_domain}/v2/explore/transaction/{tx_hash}'
)
meta_info = prepare_metainfo_caption(tguser, worker, reward, params)

View File

@ -1,6 +1,5 @@
#!/usr/bin/python
import os
import sys
import logging
@ -10,48 +9,14 @@ from contextlib import contextmanager as cm
import docker
from docker.types import Mount
from docker.models.containers import Container
class IPFSDocker:
def __init__(self, container: Container):
self._container = container
def add(self, file: str) -> str:
ec, out = self._container.exec_run(
['ipfs', 'add', '-w', f'/export/{file}', '-Q'])
if ec != 0:
logging.error(out)
assert ec == 0
return out.decode().rstrip()
def pin(self, ipfs_hash: str):
ec, _ = self._container.exec_run(
['ipfs', 'pin', 'add', ipfs_hash])
assert ec == 0
def connect(self, remote_node: str):
ec, out = self._container.exec_run(
['ipfs', 'swarm', 'connect', remote_node])
if ec != 0:
logging.error(out)
assert ec == 0
def check_connect(self):
ec, out = self._container.exec_run(
['ipfs', 'swarm', 'peers'])
if ec != 0:
logging.error(out)
assert ec == 0
return out.splitlines()
@cm
def open_ipfs_node(name='skynet-ipfs', teardown=False):
def open_ipfs_node(
name: str = 'skynet-ipfs',
teardown: bool = False,
peers: list[str] = []
):
dclient = docker.from_env()
container = None
@ -59,13 +24,9 @@ def open_ipfs_node(name='skynet-ipfs', teardown=False):
container = dclient.containers.get(name)
except docker.errors.NotFound:
staging_dir = Path().resolve() / 'ipfs-docker-staging'
staging_dir.mkdir(parents=True, exist_ok=True)
data_dir = Path().resolve() / 'ipfs-docker-data'
data_dir.mkdir(parents=True, exist_ok=True)
export_target = '/export'
data_target = '/data/ipfs'
container = dclient.containers.run(
@ -77,19 +38,15 @@ def open_ipfs_node(name='skynet-ipfs', teardown=False):
'5001/tcp': ('127.0.0.1', 5001)
},
mounts=[
Mount(export_target, str(staging_dir), 'bind'),
Mount(data_target, str(data_dir), 'bind')
],
detach=True,
remove=True
)
uid, gid = 1000, 1000
if sys.platform != 'win32':
uid = os.getuid()
gid = os.getgid()
ec, out = container.exec_run(['chown', f'{uid}:{gid}', '-R', export_target])
logging.info(out)
assert ec == 0
ec, out = container.exec_run(['chown', f'{uid}:{gid}', '-R', data_target])
logging.info(out)
assert ec == 0
@ -100,7 +57,13 @@ def open_ipfs_node(name='skynet-ipfs', teardown=False):
if 'Daemon is ready' in log:
break
yield IPFSDocker(container)
for peer in peers:
ec, out = container.exec_run(
['ipfs', 'swarm', 'connect', peer])
if ec != 0:
logging.error(out)
yield
if teardown and container:
container.stop()

View File

@ -85,9 +85,9 @@ class SkynetPinner:
for _ in range(6):
try:
with trio.move_on_after(5):
resp = await self.ipfs_http.pin(cid)
if resp.status_code != 200:
logging.error(f'error pinning {cid}:\n{resp.text}')
pins = await self.ipfs_http.pin(cid)
if cid not in pins:
logging.error(f'error pinning {cid}')
del self._pinned[cid]
else:

View File

@ -4,44 +4,12 @@ import json
import time
import logging
from datetime import datetime
from contextlib import contextmanager as cm
import docker
from pytz import timezone
from leap.cleos import CLEOS, default_nodeos_image
from leap.sugar import get_container, Symbol, random_string
@cm
def open_cleos(
node_url: str,
key: str | None
):
vtestnet = None
try:
dclient = docker.from_env()
vtestnet = get_container(
dclient,
default_nodeos_image(),
name=f'skynet-wallet-{random_string(size=8)}',
force_unique=True,
detach=True,
network='host',
remove=True)
cleos = CLEOS(dclient, vtestnet, url=node_url, remote=node_url)
if key:
cleos.setup_wallet(key)
yield cleos
finally:
if vtestnet:
vtestnet.stop()
from leap.cleos import CLEOS
from leap.sugar import get_container, Symbol
@cm

View File

@ -2,11 +2,14 @@
import io
import os
import sys
import time
import random
import logging
from typing import Optional
from pathlib import Path
import asks
import torch
import numpy as np
@ -15,14 +18,11 @@ from PIL import Image
from basicsr.archs.rrdbnet_arch import RRDBNet
from diffusers import (
DiffusionPipeline,
StableDiffusionXLPipeline,
StableDiffusionXLImg2ImgPipeline,
StableDiffusionPipeline,
StableDiffusionImg2ImgPipeline,
EulerAncestralDiscreteScheduler
)
from realesrgan import RealESRGANer
from huggingface_hub import login
import trio
from .constants import MODELS
@ -51,19 +51,23 @@ def convert_from_img_to_bytes(image: Image, fmt='PNG') -> bytes:
return byte_arr.getvalue()
def convert_from_bytes_and_crop(raw: bytes, max_w: int, max_h: int) -> Image:
image = convert_from_bytes_to_img(raw)
def crop_image(image: Image, max_w: int, max_h: int) -> Image:
w, h = image.size
if w > max_w or h > max_h:
image.thumbnail((512, 512))
image.thumbnail((max_w, max_h))
return image.convert('RGB')
def pipeline_for(model: str, mem_fraction: float = 1.0, image=False) -> DiffusionPipeline:
def pipeline_for(
model: str,
mem_fraction: float = 1.0,
image: bool = False,
cache_dir: str | None = None
) -> DiffusionPipeline:
assert torch.cuda.is_available()
torch.cuda.empty_cache()
torch.cuda.set_per_process_memory_fraction(mem_fraction)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
@ -74,36 +78,54 @@ def pipeline_for(model: str, mem_fraction: float = 1.0, image=False) -> Diffusio
torch.backends.cudnn.benchmark = False
torch.use_deterministic_algorithms(True)
model_info = MODELS[model]
req_mem = model_info['mem']
mem_gb = torch.cuda.mem_get_info()[1] / (10**9)
mem_gb *= mem_fraction
over_mem = mem_gb < req_mem
if over_mem:
logging.warn(f'model requires {req_mem} but card has {mem_gb}, model will run slower..')
shortname = model_info['short']
params = {
'safety_checker': None,
'torch_dtype': torch.float16,
'safety_checker': None
'cache_dir': cache_dir,
'variant': 'fp16'
}
if model == 'runwayml/stable-diffusion-v1-5':
params['revision'] = 'fp16'
match shortname:
case 'stable':
params['revision'] = 'fp16'
if (model == 'stabilityai/stable-diffusion-xl-base-1.0' or
model == 'snowkidy/stable-diffusion-xl-base-0.9'):
if image:
pipe_class = StableDiffusionXLImg2ImgPipeline
else:
pipe_class = StableDiffusionXLPipeline
else:
if image:
pipe_class = StableDiffusionImg2ImgPipeline
else:
pipe_class = StableDiffusionPipeline
torch.cuda.set_per_process_memory_fraction(mem_fraction)
pipe = pipe_class.from_pretrained(
pipe = DiffusionPipeline.from_pretrained(
model, **params)
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(
pipe.scheduler.config)
if not image:
pipe.enable_vae_slicing()
pipe.enable_xformers_memory_efficient_attention()
return pipe.to('cuda')
if over_mem:
if not image:
pipe.enable_vae_slicing()
pipe.enable_vae_tiling()
pipe.enable_model_cpu_offload()
else:
if sys.version_info[1] < 11:
# torch.compile only supported on python < 3.11
pipe.unet = torch.compile(
pipe.unet, mode='reduce-overhead', fullgraph=True)
pipe = pipe.to('cuda')
return pipe
def txt2img(
@ -116,12 +138,6 @@ def txt2img(
steps: int = 28,
seed: Optional[int] = None
):
assert torch.cuda.is_available()
torch.cuda.empty_cache()
torch.cuda.set_per_process_memory_fraction(1.0)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
login(token=hf_token)
pipe = pipeline_for(model)
@ -149,12 +165,6 @@ def img2img(
steps: int = 28,
seed: Optional[int] = None
):
assert torch.cuda.is_available()
torch.cuda.empty_cache()
torch.cuda.set_per_process_memory_fraction(1.0)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
login(token=hf_token)
pipe = pipeline_for(model, image=True)
@ -195,12 +205,6 @@ def upscale(
output: str = 'output.png',
model_path: str = 'weights/RealESRGAN_x4plus.pth'
):
assert torch.cuda.is_available()
torch.cuda.empty_cache()
torch.cuda.set_per_process_memory_fraction(1.0)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
input_img = Image.open(img_path).convert('RGB')
upscaler = init_upscaler(model_path=model_path)
@ -209,17 +213,26 @@ def upscale(
convert_from_image_to_cv2(input_img), outscale=4)
image = convert_from_cv2_to_image(up_img)
image.save(output)
def download_all_models(hf_token: str):
async def download_upscaler():
print('downloading upscaler...')
weights_path = Path('weights')
weights_path.mkdir(exist_ok=True)
upscaler_url = 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth'
save_path = weights_path / 'RealESRGAN_x4plus.pth'
response = await asks.get(upscaler_url)
with open(save_path, 'wb') as f:
f.write(response.content)
print('done')
def download_all_models(hf_token: str, hf_home: str):
assert torch.cuda.is_available()
trio.run(download_upscaler)
login(token=hf_token)
for model in MODELS:
print(f'DOWNLOADING {model.upper()}')
pipeline_for(model)
print(f'DOWNLOADING IMAGE {model.upper()}')
pipeline_for(model, image=True)
pipeline_for(model, cache_dir=hf_home)