skynet/skynet/utils.py

219 lines
5.6 KiB
Python
Executable File

#!/usr/bin/python
import io
import os
import time
import random
from typing import Optional
from pathlib import Path
import torch
import numpy as np
from PIL import Image
from basicsr.archs.rrdbnet_arch import RRDBNet
from diffusers import (
DiffusionPipeline,
StableDiffusionPipeline,
StableDiffusionImg2ImgPipeline,
EulerAncestralDiscreteScheduler
)
from realesrgan import RealESRGANer
from huggingface_hub import login
from .constants import MODELS
def time_ms():
return int(time.time() * 1000)
def convert_from_cv2_to_image(img: np.ndarray) -> Image:
# return Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
return Image.fromarray(img)
def convert_from_image_to_cv2(img: Image) -> np.ndarray:
# return cv2.cvtColor(numpy.array(img), cv2.COLOR_RGB2BGR)
return np.asarray(img)
def convert_from_bytes_to_img(raw: bytes) -> Image:
return Image.open(io.BytesIO(raw))
def convert_from_img_to_bytes(image: Image, fmt='PNG') -> bytes:
byte_arr = io.BytesIO()
image.save(byte_arr, format=fmt)
return byte_arr.getvalue()
def convert_from_bytes_and_crop(raw: bytes, max_w: int, max_h: int) -> Image:
image = convert_from_bytes_to_img(raw)
w, h = image.size
if w > max_w or h > max_h:
image.thumbnail((512, 512))
return image.convert('RGB')
def pipeline_for(model: str, mem_fraction: float = 1.0, image=False) -> DiffusionPipeline:
assert torch.cuda.is_available()
torch.cuda.empty_cache()
torch.cuda.set_per_process_memory_fraction(mem_fraction)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
# full determinism
# https://huggingface.co/docs/diffusers/using-diffusers/reproducibility#deterministic-algorithms
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":16:8"
torch.backends.cudnn.benchmark = False
torch.use_deterministic_algorithms(True)
params = {
'torch_dtype': torch.float16,
'safety_checker': None
}
if model == 'runwayml/stable-diffusion-v1-5':
params['revision'] = 'fp16'
if image:
pipe_class = StableDiffusionImg2ImgPipeline
elif model == 'snowkidy/stable-diffusion-xl-base-0.9':
pipe_class = DiffusionPipeline
else:
pipe_class = StableDiffusionPipeline
pipe = pipe_class.from_pretrained(
model, **params)
pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(
pipe.scheduler.config)
if not image:
pipe.enable_vae_slicing()
return pipe.to('cuda')
def txt2img(
hf_token: str,
model: str = 'prompthero/openjourney',
prompt: str = 'a red old tractor in a sunny wheat field',
output: str = 'output.png',
width: int = 512, height: int = 512,
guidance: float = 10,
steps: int = 28,
seed: Optional[int] = None
):
assert torch.cuda.is_available()
torch.cuda.empty_cache()
torch.cuda.set_per_process_memory_fraction(1.0)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
login(token=hf_token)
pipe = pipeline_for(model)
seed = seed if seed else random.randint(0, 2 ** 64)
prompt = prompt
image = pipe(
prompt,
width=width,
height=height,
guidance_scale=guidance, num_inference_steps=steps,
generator=torch.Generator("cuda").manual_seed(seed)
).images[0]
image.save(output)
def img2img(
hf_token: str,
model: str = 'prompthero/openjourney',
prompt: str = 'a red old tractor in a sunny wheat field',
img_path: str = 'input.png',
output: str = 'output.png',
strength: float = 1.0,
guidance: float = 10,
steps: int = 28,
seed: Optional[int] = None
):
assert torch.cuda.is_available()
torch.cuda.empty_cache()
torch.cuda.set_per_process_memory_fraction(1.0)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
login(token=hf_token)
pipe = pipeline_for(model, image=True)
with open(img_path, 'rb') as img_file:
input_img = convert_from_bytes_and_crop(img_file.read(), 512, 512)
seed = seed if seed else random.randint(0, 2 ** 64)
prompt = prompt
image = pipe(
prompt,
image=input_img,
strength=strength,
guidance_scale=guidance, num_inference_steps=steps,
generator=torch.Generator("cuda").manual_seed(seed)
).images[0]
image.save(output)
def init_upscaler(model_path: str = 'weights/RealESRGAN_x4plus.pth'):
return RealESRGANer(
scale=4,
model_path=model_path,
dni_weight=None,
model=RRDBNet(
num_in_ch=3,
num_out_ch=3,
num_feat=64,
num_block=23,
num_grow_ch=32,
scale=4
),
half=True
)
def upscale(
img_path: str = 'input.png',
output: str = 'output.png',
model_path: str = 'weights/RealESRGAN_x4plus.pth'
):
assert torch.cuda.is_available()
torch.cuda.empty_cache()
torch.cuda.set_per_process_memory_fraction(1.0)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
input_img = Image.open(img_path).convert('RGB')
upscaler = init_upscaler(model_path=model_path)
up_img, _ = upscaler.enhance(
convert_from_image_to_cv2(input_img), outscale=4)
image = convert_from_cv2_to_image(up_img)
image.save(output)
def download_all_models(hf_token: str):
assert torch.cuda.is_available()
login(token=hf_token)
for model in MODELS:
print(f'DOWNLOADING {model.upper()}')
pipeline_for(model)
print(f'DOWNLOADING IMAGE {model.upper()}')
pipeline_for(model, image=True)