diff --git a/requirements.cuda.0.txt b/requirements.cuda.0.txt index e31de88..f796537 100644 --- a/requirements.cuda.0.txt +++ b/requirements.cuda.0.txt @@ -3,6 +3,7 @@ triton accelerate transformers huggingface_hub -diffusers[torch] +diffusers[torch]>=0.18.0 +invisible-watermark torch==1.13.0+cu117 --extra-index-url https://download.pytorch.org/whl/cu117 diff --git a/requirements.txt b/requirements.txt index a30a623..6a1a32e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,5 +10,6 @@ docker aiohttp psycopg2-binary pyTelegramBotAPI +discord.py py-leap@git+https://github.com/guilledk/py-leap.git@v0.1a14 diff --git a/skynet.ini.example b/skynet.ini.example index b498dd8..1f9eab0 100644 --- a/skynet.ini.example +++ b/skynet.ini.example @@ -22,3 +22,14 @@ hyperion_url = https://skynet.ancap.tech ipfs_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv token = XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +[skynet.discord] +account = discord +permission = active +key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + +node_url = https://skynet.ancap.tech +hyperion_url = https://skynet.ancap.tech +ipfs_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv + +token = XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx diff --git a/skynet/cli.py b/skynet/cli.py index 1f7d24c..385b056 100755 --- a/skynet/cli.py +++ b/skynet/cli.py @@ -24,6 +24,7 @@ from .config import * from .nodeos import open_cleos, open_nodeos from .constants import * from .frontend.telegram import SkynetTelegramFrontend +from .frontend.discord import SkynetDiscordFrontend @click.group() @@ -43,7 +44,7 @@ def skynet(*args, **kwargs): @click.option('--seed', '-S', default=None) def txt2img(*args, **kwargs): from . import utils - _, hf_token, _ = init_env_from_config() + _, hf_token, _, _ = init_env_from_config() utils.txt2img(hf_token, **kwargs) @click.command() @@ -58,7 +59,7 @@ def txt2img(*args, **kwargs): @click.option('--seed', '-S', default=None) def img2img(model, prompt, input, output, strength, guidance, steps, seed): from . import utils - _, hf_token, _ = init_env_from_config() + _, hf_token, _, _ = init_env_from_config() utils.img2img( hf_token, model=model, @@ -86,7 +87,7 @@ def upscale(input, output, model): @skynet.command() def download(): from . import utils - _, hf_token, _ = init_env_from_config() + _, hf_token, _, _ = init_env_from_config() utils.download_all_models(hf_token) @skynet.command() @@ -408,7 +409,7 @@ def telegram( ): logging.basicConfig(level=loglevel) - _, _, tg_token = init_env_from_config() + _, _, tg_token, _ = init_env_from_config() key, account, permission = load_account_info( 'telegram', key, account, permission) @@ -435,6 +436,66 @@ def telegram( asyncio.run(_async_main()) +@run.command() +@click.option('--loglevel', '-l', default='INFO', help='logging level') +@click.option( + '--account', '-a', default='discord') +@click.option( + '--permission', '-p', default='active') +@click.option( + '--key', '-k', default=None) +@click.option( + '--hyperion-url', '-y', default=f'https://{DEFAULT_DOMAIN}') +@click.option( + '--node-url', '-n', default=f'https://{DEFAULT_DOMAIN}') +@click.option( + '--ipfs-url', '-i', default=DEFAULT_IPFS_REMOTE) +@click.option( + '--db-host', '-h', default='localhost:5432') +@click.option( + '--db-user', '-u', default='skynet') +@click.option( + '--db-pass', '-u', default='password') +def discord( + loglevel: str, + account: str, + permission: str, + key: str | None, + hyperion_url: str, + ipfs_url: str, + node_url: str, + db_host: str, + db_user: str, + db_pass: str +): + logging.basicConfig(level=loglevel) + + _, _, _, dc_token = init_env_from_config() + + key, account, permission = load_account_info( + 'discord', key, account, permission) + + node_url, _, ipfs_url = load_endpoint_info( + 'discord', node_url, None, None) + + async def _async_main(): + frontend = SkynetDiscordFrontend( + # dc_token, + account, + permission, + node_url, + hyperion_url, + db_host, db_user, db_pass, + remote_ipfs_node=ipfs_url, + key=key + ) + + async with frontend.open(): + await frontend.bot.start(dc_token) + + asyncio.run(_async_main()) + + @run.command() @click.option('--loglevel', '-l', default='INFO', help='logging level') @click.option('--name', '-n', default='skynet-ipfs', help='container name') diff --git a/skynet/config.py b/skynet/config.py index b025feb..d668a41 100755 --- a/skynet/config.py +++ b/skynet/config.py @@ -23,6 +23,7 @@ def init_env_from_config( hf_token: str | None = None, hf_home: str | None = None, tg_token: str | None = None, + dc_token: str | None = None, file_path=DEFAULT_CONFIG_PATH ): config = load_skynet_ini(file_path=file_path) @@ -52,7 +53,14 @@ def init_env_from_config( if 'token' in sub_config: tg_token = sub_config['token'] - return hf_home, hf_token, tg_token + if 'DC_TOKEN' in os.environ: + dc_token = os.environ['DC_TOKEN'] + elif 'skynet.discord' in config: + sub_config = config['skynet.discord'] + if 'token' in sub_config: + dc_token = sub_config['token'] + + return hf_home, hf_token, tg_token, dc_token def load_account_info( diff --git a/skynet/constants.py b/skynet/constants.py old mode 100644 new mode 100755 index d3e319d..4dc1c48 --- a/skynet/constants.py +++ b/skynet/constants.py @@ -5,15 +5,17 @@ VERSION = '0.1a10' DOCKER_RUNTIME_CUDA = 'skynet:runtime-cuda' MODELS = { - 'prompthero/openjourney': { 'short': 'midj'}, - 'runwayml/stable-diffusion-v1-5': { 'short': 'stable'}, - 'Linaqruf/anything-v3.0': { 'short': 'hdanime'}, - 'hakurei/waifu-diffusion': { 'short': 'waifu'}, - 'nitrosocke/Ghibli-Diffusion': { 'short': 'ghibli'}, - 'dallinmackay/Van-Gogh-diffusion': { 'short': 'van-gogh'}, - 'lambdalabs/sd-pokemon-diffusers': { 'short': 'pokemon'}, - 'Envvi/Inkpunk-Diffusion': { 'short': 'ink'}, - 'nousr/robo-diffusion': { 'short': 'robot'} + 'prompthero/openjourney': { 'short': 'midj'}, + 'runwayml/stable-diffusion-v1-5': { 'short': 'stable'}, + 'stabilityai/stable-diffusion-2-1-base': { 'short': 'stable2'}, + 'snowkidy/stable-diffusion-xl-base-0.9': { 'short': 'stablexl'}, + 'Linaqruf/anything-v3.0': { 'short': 'hdanime'}, + 'hakurei/waifu-diffusion': { 'short': 'waifu'}, + 'nitrosocke/Ghibli-Diffusion': { 'short': 'ghibli'}, + 'dallinmackay/Van-Gogh-diffusion': { 'short': 'van-gogh'}, + 'lambdalabs/sd-pokemon-diffusers': { 'short': 'pokemon'}, + 'Envvi/Inkpunk-Diffusion': { 'short': 'ink'}, + 'nousr/robo-diffusion': { 'short': 'robot'} } SHORT_NAMES = [ @@ -34,6 +36,7 @@ commands work on a user per user basis! config is individual to each user! /txt2img TEXT - request an image based on a prompt +/img2img TEXT - request an image base on an image and a promtp /redo - redo last command (only works for txt2img for now!) @@ -82,6 +85,28 @@ COOL_WORDS = [ 'michelangelo' ] +CLEAN_COOL_WORDS = [ + 'cyberpunk', + 'soviet propaganda poster', + 'rastafari', + 'cannabis', + 'art deco', + 'H R Giger Necronom IV', + 'dimethyltryptamine', + 'lysergic', + 'psilocybin', + 'trippy', + 'lucy in the sky with diamonds', + 'fractal', + 'da vinci', + 'pencil illustration', + 'blueprint', + 'internal diagram', + 'baroque', + 'the last judgment', + 'michelangelo' +] + HELP_TOPICS = { 'step': ''' Diffusion models are iterative processes – a repeated cycle that starts with a\ @@ -109,8 +134,8 @@ MP_ENABLED_ROLES = ['god'] MIN_STEP = 1 MAX_STEP = 100 -MAX_WIDTH = 512 -MAX_HEIGHT = 656 +MAX_WIDTH = 1024 +MAX_HEIGHT = 1024 MAX_GUIDANCE = 20 DEFAULT_SEED = None diff --git a/skynet/db/functions.py b/skynet/db/functions.py index d98b099..f52703e 100644 --- a/skynet/db/functions.py +++ b/skynet/db/functions.py @@ -96,7 +96,8 @@ def open_new_database(cleanup=True): 'POSTGRES_PASSWORD': rpassword }, detach=True, - remove=True + # could remove this if we ant the dockers to be persistent. + # remove=True ) try: diff --git a/skynet/frontend/discord/__init__.py b/skynet/frontend/discord/__init__.py new file mode 100644 index 0000000..7687c3b --- /dev/null +++ b/skynet/frontend/discord/__init__.py @@ -0,0 +1,267 @@ +#!/usr/bin/python + +from json import JSONDecodeError +import random +import logging +import asyncio + +from decimal import Decimal +from hashlib import sha256 +from datetime import datetime +from contextlib import ExitStack, AsyncExitStack +from contextlib import asynccontextmanager as acm + +from leap.cleos import CLEOS +from leap.sugar import Name, asset_from_str, collect_stdout +from leap.hyperion import HyperionAPI +# from telebot.types import InputMediaPhoto + +import discord +import io + +from skynet.db import open_new_database, open_database_connection +from skynet.ipfs import get_ipfs_file +from skynet.ipfs.docker import open_ipfs_node +from skynet.constants import * + +from . import * +from .bot import DiscordBot + +from .utils import * +from .handlers import create_handler_context +from .ui import SkynetView + + +class SkynetDiscordFrontend: + + def __init__( + self, + # token: str, + account: str, + permission: str, + node_url: str, + hyperion_url: str, + db_host: str, + db_user: str, + db_pass: str, + remote_ipfs_node: str, + key: str + ): + # self.token = token + self.account = account + self.permission = permission + self.node_url = node_url + self.hyperion_url = hyperion_url + self.db_host = db_host + self.db_user = db_user + self.db_pass = db_pass + self.remote_ipfs_node = remote_ipfs_node + self.key = key + + self.bot = DiscordBot(self) + self.cleos = CLEOS(None, None, url=node_url, remote=node_url) + self.hyperion = HyperionAPI(hyperion_url) + + self._exit_stack = ExitStack() + self._async_exit_stack = AsyncExitStack() + + async def start(self): + self.ipfs_node = self._exit_stack.enter_context( + open_ipfs_node()) + + self.ipfs_node.connect(self.remote_ipfs_node) + logging.info( + f'connected to remote ipfs node: {self.remote_ipfs_node}') + + self.db_call = await self._async_exit_stack.enter_async_context( + open_database_connection( + self.db_user, self.db_pass, self.db_host)) + + create_handler_context(self) + + async def stop(self): + await self._async_exit_stack.aclose() + self._exit_stack.close() + + @acm + async def open(self): + await self.start() + yield self + await self.stop() + + # maybe do this? + # async def update_status_message( + # self, status_msg, new_text: str, **kwargs + # ): + # await self.db_call( + # 'update_user_request_by_sid', status_msg.id, new_text) + # return await self.bot.edit_message_text( + # new_text, + # chat_id=status_msg.chat.id, + # message_id=status_msg.id, + # **kwargs + # ) + + # async def append_status_message( + # self, status_msg, add_text: str, **kwargs + # ): + # request = await self.db_call('get_user_request_by_sid', status_msg.id) + # await self.update_status_message( + # status_msg, + # request['status'] + add_text, + # **kwargs + # ) + + async def work_request( + self, + user, + status_msg, + method: str, + params: dict, + ctx: discord.ext.commands.context.Context | discord.Message, + file_id: str | None = None, + binary_data: str = '' + ): + send = ctx.channel.send + + if params['seed'] == None: + params['seed'] = random.randint(0, 0xFFFFFFFF) + + sanitized_params = {} + for key, val in params.items(): + if isinstance(val, Decimal): + val = str(val) + + sanitized_params[key] = val + + body = json.dumps({ + 'method': 'diffuse', + 'params': sanitized_params + }) + request_time = datetime.now().isoformat() + + await status_msg.delete() + msg_text = f'processing a \'{method}\' request by {user.name}\n[{timestamp_pretty()}] *broadcasting transaction to chain...* ' + embed = discord.Embed( + title='live updates', + description=msg_text, + color=discord.Color.blue()) + + message = await send(embed=embed) + + reward = '20.0000 GPU' + res = await self.cleos.a_push_action( + 'telos.gpu', + 'enqueue', + { + 'user': Name(self.account), + 'request_body': body, + 'binary_data': binary_data, + 'reward': asset_from_str(reward), + 'min_verification': 1 + }, + self.account, self.key, permission=self.permission + ) + + if 'code' in res or 'statusCode' in res: + logging.error(json.dumps(res, indent=4)) + await self.bot.channel.send( + status_msg, + 'skynet has suffered an internal error trying to fill this request') + return + + enqueue_tx_id = res['transaction_id'] + enqueue_tx_link = f'[**Your request on Skynet Explorer**](https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{enqueue_tx_id})' + + msg_text += f'**broadcasted!** \n{enqueue_tx_link}\n[{timestamp_pretty()}] *workers are processing request...* ' + embed = discord.Embed( + title='live updates', + description=msg_text, + color=discord.Color.blue()) + + await message.edit(embed=embed) + + out = collect_stdout(res) + + request_id, nonce = out.split(':') + + request_hash = sha256( + (nonce + body + binary_data).encode('utf-8')).hexdigest().upper() + + request_id = int(request_id) + + logging.info(f'{request_id} enqueued.') + + tx_hash = None + ipfs_hash = None + for i in range(60): + try: + submits = await self.hyperion.aget_actions( + account=self.account, + filter='telos.gpu:submit', + sort='desc', + after=request_time + ) + actions = [ + action + for action in submits['actions'] + if action[ + 'act']['data']['request_hash'] == request_hash + ] + if len(actions) > 0: + tx_hash = actions[0]['trx_id'] + data = actions[0]['act']['data'] + ipfs_hash = data['ipfs_hash'] + worker = data['worker'] + logging.info('Found matching submit!') + break + + except JSONDecodeError: + logging.error(f'network error while getting actions, retry..') + + await asyncio.sleep(1) + + if not ipfs_hash: + + timeout_text = f'\n[{timestamp_pretty()}] **timeout processing request**' + embed = discord.Embed( + title='live updates', + description=timeout_text, + color=discord.Color.blue()) + + await message.edit(embed=embed) + return + + tx_link = f'[**Your result on Skynet Explorer**](https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{tx_hash})' + + msg_text += f'**request processed!**\n{tx_link}\n[{timestamp_pretty()}] *trying to download image...*\n ' + embed = discord.Embed( + title='live updates', + description=msg_text, + color=discord.Color.blue()) + + await message.edit(embed=embed) + + # attempt to get the image and send it + ipfs_link = f'https://ipfs.{DEFAULT_DOMAIN}/ipfs/{ipfs_hash}/image.png' + resp = await get_ipfs_file(ipfs_link) + + # reword this function, may not need caption + caption, embed = generate_reply_caption( + user, params, tx_hash, worker, reward) + + if not resp or resp.status_code != 200: + logging.error(f'couldn\'t get ipfs hosted image at {ipfs_link}!') + embed.add_field(name='Error', value=f'couldn\'t get ipfs hosted image [**here**]({ipfs_link})!') + await message.edit(embed=embed, view=SkynetView(self)) + else: + logging.info(f'success! sending generated image') + await message.delete() + if file_id: # img2img + embed.set_thumbnail( + url='https://ipfs.skygpu.net/ipfs/' + binary_data + '/image.png') + embed.set_image(url=ipfs_link) + await send(embed=embed, view=SkynetView(self)) + else: # txt2img + embed.set_image(url=ipfs_link) + await send(embed=embed, view=SkynetView(self)) diff --git a/skynet/frontend/discord/bot.py b/skynet/frontend/discord/bot.py new file mode 100644 index 0000000..accc926 --- /dev/null +++ b/skynet/frontend/discord/bot.py @@ -0,0 +1,89 @@ +# import os +import discord +import asyncio +# from dotenv import load_dotenv +# from pathlib import Path +from discord.ext import commands +from .ui import SkynetView + + +# # Auth +# current_dir = Path(__file__).resolve().parent +# # parent_dir = current_dir.parent +# env_file_path = current_dir / ".env" +# load_dotenv(dotenv_path=env_file_path) +# +# discordToken = os.getenv("DISCORD_TOKEN") + + +# Actual Discord bot. +class DiscordBot(commands.Bot): + + def __init__(self, bot, *args, **kwargs): + self.bot = bot + intents = discord.Intents( + messages=True, + guilds=True, + typing=True, + members=True, + presences=True, + reactions=True, + message_content=True, + voice_states=True + ) + super().__init__(command_prefix='/', intents=intents, *args, **kwargs) + + # async def setup_hook(self): + # db.poll_db.start() + + async def on_ready(self): + print(f'{self.user.name} has connected to Discord!') + for guild in self.guilds: + for channel in guild.channels: + if channel.name == "skynet": + await channel.send('Skynet bot online', view=SkynetView(self.bot)) + # intro_msg = await channel.send('Welcome to the Skynet discord bot.\nSkynet is a decentralized compute layer, focused on supporting AI paradigms. Skynet leverages blockchain technology to manage work requests and fills. We are currently featuring image generation and support 11 different models. Get started with the /help command, or just click on some buttons. Here is an example command to generate an image:\n/txt2img a big red tractor in a giant field of corn') + intro_msg = await channel.send("Welcome to Skynet's Discord Bot,\n\nSkynet operates as a decentralized compute layer, offering a wide array of support for diverse AI paradigms through the use of blockchain technology. Our present focus is image generation, powered by 11 distinct models.\n\nTo begin exploring, use the '/help' command or directly interact with the provided buttons. Here is an example command to generate an image:\n\n'/txt2img a big red tractor in a giant field of corn'") + await intro_msg.pin() + + print("\n==============") + print("Logged in as") + print(self.user.name) + print(self.user.id) + print("==============") + + async def on_message(self, message): + if isinstance(message.channel, discord.DMChannel): + return + elif message.channel.name != 'skynet': + return + elif message.author == self.user: + return + await self.process_commands(message) + # await asyncio.sleep(3) + # await message.channel.send('', view=SkynetView(self.bot)) + + async def on_command_error(self, ctx, error): + if isinstance(error, commands.MissingRequiredArgument): + await ctx.send('You missed a required argument, please try again.') + + # async def on_message(self, message): + # print(f"message from {message.author} what he said {message.content}") + # await message.channel.send(message.content) + +# bot=DiscordBot() +# @bot.command(name='config', help='Responds with the configuration') +# async def config(ctx): +# response = "This is the bot configuration" # Put your bot configuration here +# await ctx.send(response) +# +# @bot.command(name='helper', help='Responds with a help') +# async def helper(ctx): +# response = "This is help information" # Put your help response here +# await ctx.send(response) +# +# @bot.command(name='txt2img', help='Responds with an image') +# async def txt2img(ctx, *, arg): +# response = f"This is your prompt: {arg}" +# await ctx.send(response) +# bot.run(discordToken) diff --git a/skynet/frontend/discord/handlers.py b/skynet/frontend/discord/handlers.py new file mode 100644 index 0000000..c6f2735 --- /dev/null +++ b/skynet/frontend/discord/handlers.py @@ -0,0 +1,601 @@ +#!/usr/bin/python + +import io +import json +import logging + +from datetime import datetime, timedelta + +from PIL import Image +# from telebot.types import CallbackQuery, Message + +from skynet.frontend import validate_user_config_request +from skynet.constants import * +from .ui import SkynetView + + +def create_handler_context(frontend: 'SkynetDiscordFrontend'): + + bot = frontend.bot + cleos = frontend.cleos + db_call = frontend.db_call + work_request = frontend.work_request + + ipfs_node = frontend.ipfs_node + + @bot.command(name='config', help='Responds with the configuration') + async def set_config(ctx): + + user = ctx.author + try: + attr, val, reply_txt = validate_user_config_request( + ctx.message.content) + + logging.info(f'user config update: {attr} to {val}') + await db_call('update_user_config', user.id, attr, val) + logging.info('done') + + except BaseException as e: + reply_txt = str(e) + + finally: + await ctx.reply(content=reply_txt, view=SkynetView(frontend)) + + bot.remove_command('help') + @bot.command(name='help', help='Responds with a help') + async def help(ctx): + splt_msg = ctx.message.content.split(' ') + + if len(splt_msg) == 1: + await ctx.send(content=f'```{HELP_TEXT}```', view=SkynetView(frontend)) + + else: + param = splt_msg[1] + if param in HELP_TOPICS: + await ctx.send(content=f'```{HELP_TOPICS[param]}```', view=SkynetView(frontend)) + + else: + await ctx.send(content=f'```{HELP_UNKWNOWN_PARAM}```', view=SkynetView(frontend)) + + @bot.command(name='cool', help='Display a list of cool prompt words') + async def send_cool_words(ctx): + clean_cool_word = '\n'.join(CLEAN_COOL_WORDS) + await ctx.send(content=f'```{clean_cool_word}```', view=SkynetView(frontend)) + + @bot.command(name='stats', help='See user statistics' ) + async def user_stats(ctx): + user = ctx.author + + await db_call('get_or_create_user', user.id) + generated, joined, role = await db_call('get_user_stats', user.id) + + stats_str = f'```generated: {generated}\n' + stats_str += f'joined: {joined}\n' + stats_str += f'role: {role}\n```' + + await ctx.reply(stats_str, view=SkynetView(frontend)) + + @bot.command(name='donate', help='See donate info') + async def donation_info(ctx): + await ctx.reply( + f'```\n{DONATION_INFO}```', view=SkynetView(frontend)) + + @bot.command(name='txt2img', help='Responds with an image') + async def send_txt2img(ctx): + + # grab user from ctx + user = ctx.author + user_row = await db_call('get_or_create_user', user.id) + + # init new msg + init_msg = 'started processing txt2img request...' + status_msg = await ctx.send(init_msg) + await db_call( + 'new_user_request', user.id, ctx.message.id, status_msg.id, status=init_msg) + + prompt = ' '.join(ctx.message.content.split(' ')[1:]) + + if len(prompt) == 0: + await status_msg.edit(content= + 'Empty text prompt ignored.' + ) + await db_call('update_user_request', status_msg.id, 'Empty text prompt ignored.') + return + + logging.info(f'mid: {ctx.message.id}') + + user_config = {**user_row} + del user_config['id'] + + params = { + 'prompt': prompt, + **user_config + } + + await db_call( + 'update_user_stats', user.id, 'txt2img', last_prompt=prompt) + + ec = await work_request(user, status_msg, 'txt2img', params, ctx) + + if ec == None: + await db_call('increment_generated', user.id) + + @bot.command(name='redo', help='Redo last request') + async def redo(ctx): + init_msg = 'started processing redo request...' + status_msg = await ctx.send(init_msg) + user = ctx.author + + method = await db_call('get_last_method_of', user.id) + prompt = await db_call('get_last_prompt_of', user.id) + + file_id = None + binary = '' + if method == 'img2img': + file_id = await db_call('get_last_file_of', user.id) + binary = await db_call('get_last_binary_of', user.id) + + if not prompt: + await status_msg.edit( + content='no last prompt found, do a txt2img cmd first!', + view=SkynetView(frontend) + ) + return + + user_row = await db_call('get_or_create_user', user.id) + await db_call( + 'new_user_request', user.id, ctx.message.id, status_msg.id, status=init_msg) + user_config = {**user_row} + del user_config['id'] + + params = { + 'prompt': prompt, + **user_config + } + + ec = await work_request( + user, status_msg, 'redo', params, ctx, + file_id=file_id, + binary_data=binary + ) + + if ec == None: + await db_call('increment_generated', user.id) + + @bot.command(name='img2img', help='Responds with an image') + async def send_img2img(ctx): + # if isinstance(message_or_query, CallbackQuery): + # query = message_or_query + # message = query.message + # user = query.from_user + # chat = query.message.chat + # + # else: + # message = message_or_query + # user = message.from_user + # chat = message.chat + + # reply_id = None + # if chat.type == 'group' and chat.id == GROUP_ID: + # reply_id = message.message_id + # + user = ctx.author + user_row = await db_call('get_or_create_user', user.id) + + # init new msg + init_msg = 'started processing img2img request...' + status_msg = await ctx.send(init_msg) + await db_call( + 'new_user_request', user.id, ctx.message.id, status_msg.id, status=init_msg) + + if not ctx.message.content.startswith('/img2img'): + await ctx.reply( + 'For image to image you need to add /img2img to the beggining of your caption' + ) + return + + prompt = ' '.join(ctx.message.content.split(' ')[1:]) + + if len(prompt) == 0: + await ctx.reply('Empty text prompt ignored.') + return + + # file_id = message.photo[-1].file_id + # file_path = (await bot.get_file(file_id)).file_path + # image_raw = await bot.download_file(file_path) + # + + file = ctx.message.attachments[-1] + file_id = str(file.id) + # file bytes + image_raw = await file.read() + with Image.open(io.BytesIO(image_raw)) as image: + w, h = image.size + + if w > 512 or h > 512: + logging.warning(f'user sent img of size {image.size}') + image.thumbnail((512, 512)) + logging.warning(f'resized it to {image.size}') + + image.save(f'ipfs-docker-staging/image.png', format='PNG') + + ipfs_hash = ipfs_node.add('image.png') + ipfs_node.pin(ipfs_hash) + + logging.info(f'published input image {ipfs_hash} on ipfs') + + logging.info(f'mid: {ctx.message.id}') + + user_config = {**user_row} + del user_config['id'] + + params = { + 'prompt': prompt, + **user_config + } + + await db_call( + 'update_user_stats', + user.id, + 'img2img', + last_file=file_id, + last_prompt=prompt, + last_binary=ipfs_hash + ) + + ec = await work_request( + user, status_msg, 'img2img', params, ctx, + file_id=file_id, + binary_data=ipfs_hash + ) + + if ec == None: + await db_call('increment_generated', user.id) + + + + # TODO: DELETE BELOW + # user = 'testworker3' + # status_msg = 'status' + # params = { + # 'prompt': arg, + # 'seed': None, + # 'step': 35, + # 'guidance': 7.5, + # 'strength': 0.5, + # 'width': 512, + # 'height': 512, + # 'upscaler': None, + # 'model': 'prompthero/openjourney', + # } + # + # ec = await work_request(user, status_msg, 'txt2img', params, ctx) + # print(ec) + + # if ec == 0: + # await db_call('increment_generated', user.id) + + # response = f"This is your prompt: {arg}" + # await ctx.send(response) + + # generic / simple handlers + + # @bot.message_handler(commands=['help']) + # async def send_help(message): + # splt_msg = message.text.split(' ') + # + # if len(splt_msg) == 1: + # await bot.reply_to(message, HELP_TEXT) + # + # else: + # param = splt_msg[1] + # if param in HELP_TOPICS: + # await bot.reply_to(message, HELP_TOPICS[param]) + # + # else: + # await bot.reply_to(message, HELP_UNKWNOWN_PARAM) + # + # @bot.message_handler(commands=['cool']) + # async def send_cool_words(message): + # await bot.reply_to(message, '\n'.join(COOL_WORDS)) + # + # @bot.message_handler(commands=['queue']) + # async def queue(message): + # an_hour_ago = datetime.now() - timedelta(hours=1) + # queue = await cleos.aget_table( + # 'telos.gpu', 'telos.gpu', 'queue', + # index_position=2, + # key_type='i64', + # sort='desc', + # lower_bound=int(an_hour_ago.timestamp()) + # ) + # await bot.reply_to( + # message, f'Total requests on skynet queue: {len(queue)}') + + + # @bot.message_handler(commands=['config']) + # async def set_config(message): + # user = message.from_user.id + # try: + # attr, val, reply_txt = validate_user_config_request( + # message.text) + # + # logging.info(f'user config update: {attr} to {val}') + # await db_call('update_user_config', user, attr, val) + # logging.info('done') + # + # except BaseException as e: + # reply_txt = str(e) + # + # finally: + # await bot.reply_to(message, reply_txt) + # + # @bot.message_handler(commands=['stats']) + # async def user_stats(message): + # user = message.from_user.id + # + # await db_call('get_or_create_user', user) + # generated, joined, role = await db_call('get_user_stats', user) + # + # stats_str = f'generated: {generated}\n' + # stats_str += f'joined: {joined}\n' + # stats_str += f'role: {role}\n' + # + # await bot.reply_to( + # message, stats_str) + # + # @bot.message_handler(commands=['donate']) + # async def donation_info(message): + # await bot.reply_to( + # message, DONATION_INFO) + # + # @bot.message_handler(commands=['say']) + # async def say(message): + # chat = message.chat + # user = message.from_user + # + # if (chat.type == 'group') or (user.id != 383385940): + # return + # + # await bot.send_message(GROUP_ID, message.text[4:]) + + + # generic txt2img handler + + # async def _generic_txt2img(message_or_query): + # if isinstance(message_or_query, CallbackQuery): + # query = message_or_query + # message = query.message + # user = query.from_user + # chat = query.message.chat + # + # else: + # message = message_or_query + # user = message.from_user + # chat = message.chat + # + # reply_id = None + # if chat.type == 'group' and chat.id == GROUP_ID: + # reply_id = message.message_id + # + # user_row = await db_call('get_or_create_user', user.id) + # + # # init new msg + # init_msg = 'started processing txt2img request...' + # status_msg = await bot.reply_to(message, init_msg) + # await db_call( + # 'new_user_request', user.id, message.id, status_msg.id, status=init_msg) + # + # prompt = ' '.join(message.text.split(' ')[1:]) + # + # if len(prompt) == 0: + # await bot.edit_message_text( + # 'Empty text prompt ignored.', + # chat_id=status_msg.chat.id, + # message_id=status_msg.id + # ) + # await db_call('update_user_request', status_msg.id, 'Empty text prompt ignored.') + # return + # + # logging.info(f'mid: {message.id}') + # + # user_config = {**user_row} + # del user_config['id'] + # + # params = { + # 'prompt': prompt, + # **user_config + # } + # + # await db_call( + # 'update_user_stats', user.id, 'txt2img', last_prompt=prompt) + # + # ec = await work_request(user, status_msg, 'txt2img', params) + + # if ec == 0: + # await db_call('increment_generated', user.id) + # + # + # # generic img2img handler + # + # async def _generic_img2img(message_or_query): + # if isinstance(message_or_query, CallbackQuery): + # query = message_or_query + # message = query.message + # user = query.from_user + # chat = query.message.chat + # + # else: + # message = message_or_query + # user = message.from_user + # chat = message.chat + # + # reply_id = None + # if chat.type == 'group' and chat.id == GROUP_ID: + # reply_id = message.message_id + # + # user_row = await db_call('get_or_create_user', user.id) + # + # # init new msg + # init_msg = 'started processing txt2img request...' + # status_msg = await bot.reply_to(message, init_msg) + # await db_call( + # 'new_user_request', user.id, message.id, status_msg.id, status=init_msg) + # + # if not message.caption.startswith('/img2img'): + # await bot.reply_to( + # message, + # 'For image to image you need to add /img2img to the beggining of your caption' + # ) + # return + # + # prompt = ' '.join(message.caption.split(' ')[1:]) + # + # if len(prompt) == 0: + # await bot.reply_to(message, 'Empty text prompt ignored.') + # return + # + # file_id = message.photo[-1].file_id + # file_path = (await bot.get_file(file_id)).file_path + # image_raw = await bot.download_file(file_path) + + # with Image.open(io.BytesIO(image_raw)) as image: + # w, h = image.size + # + # if w > 512 or h > 512: + # logging.warning(f'user sent img of size {image.size}') + # image.thumbnail((512, 512)) + # logging.warning(f'resized it to {image.size}') + # + # image.save(f'ipfs-docker-staging/image.png', format='PNG') + # + # ipfs_hash = ipfs_node.add('image.png') + # ipfs_node.pin(ipfs_hash) + # + # logging.info(f'published input image {ipfs_hash} on ipfs') + # + # logging.info(f'mid: {message.id}') + # + # user_config = {**user_row} + # del user_config['id'] + # + # params = { + # 'prompt': prompt, + # **user_config + # } + # + # await db_call( + # 'update_user_stats', + # user.id, + # 'img2img', + # last_file=file_id, + # last_prompt=prompt, + # last_binary=ipfs_hash + # ) + # + # ec = await work_request( + # user, status_msg, 'img2img', params, + # file_id=file_id, + # binary_data=ipfs_hash + # ) + # + # if ec == 0: + # await db_call('increment_generated', user.id) + # + + # generic redo handler + + # async def _redo(message_or_query): + # is_query = False + # if isinstance(message_or_query, CallbackQuery): + # is_query = True + # query = message_or_query + # message = query.message + # user = query.from_user + # chat = query.message.chat + # + # elif isinstance(message_or_query, Message): + # message = message_or_query + # user = message.from_user + # chat = message.chat + # + # init_msg = 'started processing redo request...' + # if is_query: + # status_msg = await bot.send_message(chat.id, init_msg) + # + # else: + # status_msg = await bot.reply_to(message, init_msg) + # + # method = await db_call('get_last_method_of', user.id) + # prompt = await db_call('get_last_prompt_of', user.id) + # + # file_id = None + # binary = '' + # if method == 'img2img': + # file_id = await db_call('get_last_file_of', user.id) + # binary = await db_call('get_last_binary_of', user.id) + # + # if not prompt: + # await bot.reply_to( + # message, + # 'no last prompt found, do a txt2img cmd first!' + # ) + # return + # + # + # user_row = await db_call('get_or_create_user', user.id) + # await db_call( + # 'new_user_request', user.id, message.id, status_msg.id, status=init_msg) + # user_config = {**user_row} + # del user_config['id'] + # + # params = { + # 'prompt': prompt, + # **user_config + # } + # + # await work_request( + # user, status_msg, 'redo', params, + # file_id=file_id, + # binary_data=binary + # ) + + + # "proxy" handlers just request routers + + # @bot.message_handler(commands=['txt2img']) + # async def send_txt2img(message): + # await _generic_txt2img(message) + # + # @bot.message_handler(func=lambda message: True, content_types=[ + # 'photo', 'document']) + # async def send_img2img(message): + # await _generic_img2img(message) + # + # @bot.message_handler(commands=['img2img']) + # async def img2img_missing_image(message): + # await bot.reply_to( + # message, + # 'seems you tried to do an img2img command without sending image' + # ) + # + # @bot.message_handler(commands=['redo']) + # async def redo(message): + # await _redo(message) + # + # @bot.callback_query_handler(func=lambda call: True) + # async def callback_query(call): + # msg = json.loads(call.data) + # logging.info(call.data) + # method = msg.get('method') + # match method: + # case 'redo': + # await _redo(call) + + + # catch all handler for things we dont support + + # @bot.message_handler(func=lambda message: True) + # async def echo_message(message): + # if message.text[0] == '/': + # await bot.reply_to(message, UNKNOWN_CMD_TEXT) diff --git a/skynet/frontend/discord/ui.py b/skynet/frontend/discord/ui.py new file mode 100644 index 0000000..95d71b1 --- /dev/null +++ b/skynet/frontend/discord/ui.py @@ -0,0 +1,311 @@ +import io +import discord +from PIL import Image +import logging +from skynet.constants import * +from skynet.frontend import validate_user_config_request + + +class SkynetView(discord.ui.View): + + def __init__(self, bot): + self.bot = bot + super().__init__(timeout=None) + self.add_item(RedoButton('redo', discord.ButtonStyle.primary, self.bot)) + self.add_item(Txt2ImgButton('txt2img', discord.ButtonStyle.primary, self.bot)) + self.add_item(Img2ImgButton('img2img', discord.ButtonStyle.primary, self.bot)) + self.add_item(StatsButton('stats', discord.ButtonStyle.secondary, self.bot)) + self.add_item(DonateButton('donate', discord.ButtonStyle.secondary, self.bot)) + self.add_item(ConfigButton('config', discord.ButtonStyle.secondary, self.bot)) + self.add_item(HelpButton('help', discord.ButtonStyle.secondary, self.bot)) + self.add_item(CoolButton('cool', discord.ButtonStyle.secondary, self.bot)) + + +class Txt2ImgButton(discord.ui.Button): + + def __init__(self, label: str, style: discord.ButtonStyle, bot): + self.bot = bot + super().__init__(label=label, style=style) + + async def callback(self, interaction): + db_call = self.bot.db_call + work_request = self.bot.work_request + msg = await grab('Enter your prompt:', interaction) + # grab user from msg + user = msg.author + user_row = await db_call('get_or_create_user', user.id) + + # init new msg + init_msg = 'started processing txt2img request...' + status_msg = await msg.channel.send(init_msg) + await db_call( + 'new_user_request', user.id, msg.id, status_msg.id, status=init_msg) + + prompt = msg.content + + if len(prompt) == 0: + await status_msg.edit(content= + 'Empty text prompt ignored.' + ) + await db_call('update_user_request', status_msg.id, 'Empty text prompt ignored.') + return + + logging.info(f'mid: {msg.id}') + + user_config = {**user_row} + del user_config['id'] + + params = { + 'prompt': prompt, + **user_config + } + + await db_call( + 'update_user_stats', user.id, 'txt2img', last_prompt=prompt) + + ec = await work_request(user, status_msg, 'txt2img', params, msg) + + if ec == None: + await db_call('increment_generated', user.id) + + +class Img2ImgButton(discord.ui.Button): + + def __init__(self, label: str, style: discord.ButtonStyle, bot): + self.bot = bot + super().__init__(label=label, style=style) + + async def callback(self, interaction): + db_call = self.bot.db_call + work_request = self.bot.work_request + ipfs_node = self.bot.ipfs_node + msg = await grab('Attach an Image. Enter your prompt:', interaction) + + user = msg.author + user_row = await db_call('get_or_create_user', user.id) + + # init new msg + init_msg = 'started processing img2img request...' + status_msg = await msg.channel.send(init_msg) + await db_call( + 'new_user_request', user.id, msg.id, status_msg.id, status=init_msg) + + # if not msg.content.startswith('/img2img'): + # await msg.reply( + # 'For image to image you need to add /img2img to the beggining of your caption' + # ) + # return + + prompt = msg.content + + if len(prompt) == 0: + await msg.reply('Empty text prompt ignored.') + return + + # file_id = message.photo[-1].file_id + # file_path = (await bot.get_file(file_id)).file_path + # image_raw = await bot.download_file(file_path) + # + + file = msg.attachments[-1] + file_id = str(file.id) + # file bytes + image_raw = await file.read() + with Image.open(io.BytesIO(image_raw)) as image: + w, h = image.size + + if w > 512 or h > 512: + logging.warning(f'user sent img of size {image.size}') + image.thumbnail((512, 512)) + logging.warning(f'resized it to {image.size}') + + image.save(f'ipfs-docker-staging/image.png', format='PNG') + + ipfs_hash = ipfs_node.add('image.png') + ipfs_node.pin(ipfs_hash) + + logging.info(f'published input image {ipfs_hash} on ipfs') + + logging.info(f'mid: {msg.id}') + + user_config = {**user_row} + del user_config['id'] + + params = { + 'prompt': prompt, + **user_config + } + + await db_call( + 'update_user_stats', + user.id, + 'img2img', + last_file=file_id, + last_prompt=prompt, + last_binary=ipfs_hash + ) + + ec = await work_request( + user, status_msg, 'img2img', params, msg, + file_id=file_id, + binary_data=ipfs_hash + ) + + if ec == None: + await db_call('increment_generated', user.id) + + +class RedoButton(discord.ui.Button): + + def __init__(self, label: str, style: discord.ButtonStyle, bot): + self.bot = bot + super().__init__(label=label, style=style) + + async def callback(self, interaction): + db_call = self.bot.db_call + work_request = self.bot.work_request + init_msg = 'started processing redo request...' + await interaction.response.send_message(init_msg) + status_msg = await interaction.original_response() + user = interaction.user + + method = await db_call('get_last_method_of', user.id) + prompt = await db_call('get_last_prompt_of', user.id) + + file_id = None + binary = '' + if method == 'img2img': + file_id = await db_call('get_last_file_of', user.id) + binary = await db_call('get_last_binary_of', user.id) + + if not prompt: + await status_msg.edit( + content='no last prompt found, do a txt2img cmd first!', + view=SkynetView(self.bot) + ) + return + + user_row = await db_call('get_or_create_user', user.id) + await db_call( + 'new_user_request', user.id, interaction.id, status_msg.id, status=init_msg) + user_config = {**user_row} + del user_config['id'] + + params = { + 'prompt': prompt, + **user_config + } + ec = await work_request( + user, status_msg, 'redo', params, interaction, + file_id=file_id, + binary_data=binary + ) + + if ec == None: + await db_call('increment_generated', user.id) + + +class ConfigButton(discord.ui.Button): + + def __init__(self, label: str, style: discord.ButtonStyle, bot): + self.bot = bot + super().__init__(label=label, style=style) + + async def callback(self, interaction): + db_call = self.bot.db_call + msg = await grab('What params do you want to change? (format: )', interaction) + + user = interaction.user + try: + attr, val, reply_txt = validate_user_config_request( + '/config ' + msg.content) + + logging.info(f'user config update: {attr} to {val}') + await db_call('update_user_config', user.id, attr, val) + logging.info('done') + + except BaseException as e: + reply_txt = str(e) + + finally: + await msg.reply(content=reply_txt, view=SkynetView(self.bot)) + + +class StatsButton(discord.ui.Button): + + def __init__(self, label: str, style: discord.ButtonStyle, bot): + self.bot = bot + super().__init__(label=label, style=style) + + async def callback(self, interaction): + db_call = self.bot.db_call + + user = interaction.user + + await db_call('get_or_create_user', user.id) + generated, joined, role = await db_call('get_user_stats', user.id) + + stats_str = f'```generated: {generated}\n' + stats_str += f'joined: {joined}\n' + stats_str += f'role: {role}\n```' + + await interaction.response.send_message( + content=stats_str, view=SkynetView(self.bot)) + + +class DonateButton(discord.ui.Button): + + def __init__(self, label: str, style: discord.ButtonStyle, bot): + self.bot = bot + super().__init__(label=label, style=style) + + async def callback(self, interaction): + await interaction.response.send_message( + content=f'```\n{DONATION_INFO}```', + view=SkynetView(self.bot)) + + +class CoolButton(discord.ui.Button): + + def __init__(self, label: str, style: discord.ButtonStyle, bot): + self.bot = bot + super().__init__(label=label, style=style) + + async def callback(self, interaction): + clean_cool_word = '\n'.join(CLEAN_COOL_WORDS) + await interaction.response.send_message( + content=f'```{clean_cool_word}```', + view=SkynetView(self.bot)) + + +class HelpButton(discord.ui.Button): + + def __init__(self, label: str, style: discord.ButtonStyle, bot): + self.bot = bot + super().__init__(label=label, style=style) + + async def callback(self, interaction): + msg = await grab('What would you like help with? (a for all)', interaction) + + param = msg.content + + if param == 'a': + await msg.reply(content=f'```{HELP_TEXT}```', view=SkynetView(self.bot)) + + else: + if param in HELP_TOPICS: + await msg.reply(content=f'```{HELP_TOPICS[param]}```', view=SkynetView(self.bot)) + + else: + await msg.reply(content=f'```{HELP_UNKWNOWN_PARAM}```', view=SkynetView(self.bot)) + + +async def grab(prompt, interaction): + def vet(m): + return m.author == interaction.user and m.channel == interaction.channel + + await interaction.response.send_message(prompt, ephemeral=True) + message = await interaction.client.wait_for('message', check=vet) + return message + + diff --git a/skynet/frontend/discord/utils.py b/skynet/frontend/discord/utils.py new file mode 100644 index 0000000..5e6522d --- /dev/null +++ b/skynet/frontend/discord/utils.py @@ -0,0 +1,121 @@ +#!/usr/bin/python + +import json +import logging +import traceback + +from datetime import datetime, timezone + +from telebot.types import InlineKeyboardButton, InlineKeyboardMarkup +from telebot.async_telebot import ExceptionHandler +from telebot.formatting import hlink +import discord + +from skynet.constants import * + + +def timestamp_pretty(): + return datetime.now(timezone.utc).strftime('%H:%M:%S') + + +def tg_user_pretty(tguser): + if tguser.username: + return f'@{tguser.username}' + else: + return f'{tguser.first_name} id: {tguser.id}' + + +class SKYExceptionHandler(ExceptionHandler): + + def handle(exception): + traceback.print_exc() + + +def build_redo_menu(): + btn_redo = InlineKeyboardButton("Redo", callback_data=json.dumps({'method': 'redo'})) + inline_keyboard = InlineKeyboardMarkup() + inline_keyboard.add(btn_redo) + return inline_keyboard + + +def prepare_metainfo_caption(user, worker: str, reward: str, meta: dict, embed) -> str: + prompt = meta["prompt"] + if len(prompt) > 256: + prompt = prompt[:256] + + gen_str = f'generated by {user.name}\n' + gen_str += f'performed by {worker}\n' + gen_str += f'reward: {reward}\n' + + embed.add_field( + name='General Info', value=f'```{gen_str}```', inline=False) + # meta_str = f'__by {user.name}__\n' + # meta_str += f'*performed by {worker}*\n' + # meta_str += f'__**reward: {reward}**__\n' + embed.add_field(name='Prompt', value=f'```{prompt}\n```', inline=False) + + # meta_str = f'`prompt:` {prompt}\n' + + meta_str = f'seed: {meta["seed"]}\n' + meta_str += f'step: {meta["step"]}\n' + meta_str += f'guidance: {meta["guidance"]}\n' + + if meta['strength']: + meta_str += f'strength: {meta["strength"]}\n' + meta_str += f'algo: {meta["model"]}\n' + if meta['upscaler']: + meta_str += f'upscaler: {meta["upscaler"]}\n' + + embed.add_field(name='Parameters', value=f'```{meta_str}```', inline=False) + + foot_str = f'Made with Skynet v{VERSION}\n' + foot_str += f'JOIN THE SWARM: https://discord.gg/JYM4YPMgK' + + embed.set_footer(text=foot_str) + + return meta_str + + +def generate_reply_caption( + user, # discord user + params: dict, + tx_hash: str, + worker: str, + reward: str +): + explorer_link = discord.Embed( + title='[SKYNET Transaction Explorer]', + url=f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{tx_hash}', + color=discord.Color.blue()) + + meta_info = prepare_metainfo_caption(user, worker, reward, params, explorer_link) + + # why do we have this? + final_msg = '\n'.join([ + 'Worker finished your task!', + # explorer_link, + f'PARAMETER INFO:\n{meta_info}' + ]) + + final_msg = '\n'.join([ + # f'***{explorer_link}***', + f'{meta_info}' + ]) + + logging.info(final_msg) + + return final_msg, explorer_link + + +async def get_global_config(cleos): + return (await cleos.aget_table( + 'telos.gpu', 'telos.gpu', 'config'))[0] + +async def get_user_nonce(cleos, user: str): + return (await cleos.aget_table( + 'telos.gpu', 'telos.gpu', 'users', + index_position=1, + key_type='name', + lower_bound=user, + upper_bound=user + ))[0]['nonce'] diff --git a/skynet/ipfs/__init__.py b/skynet/ipfs/__init__.py index bb4e0fe..1a0d4ef 100644 --- a/skynet/ipfs/__init__.py +++ b/skynet/ipfs/__init__.py @@ -27,7 +27,7 @@ class IPFSHTTP: async def get_ipfs_file(ipfs_link: str): logging.info(f'attempting to get image at {ipfs_link}') resp = None - for i in range(10): + for i in range(20): try: resp = await asks.get(ipfs_link, timeout=3) diff --git a/skynet/utils.py b/skynet/utils.py old mode 100644 new mode 100755 index 2837118..79932a5 --- a/skynet/utils.py +++ b/skynet/utils.py @@ -82,6 +82,8 @@ def pipeline_for(model: str, mem_fraction: float = 1.0, image=False) -> Diffusio if image: pipe_class = StableDiffusionImg2ImgPipeline + elif model == 'snowkidy/stable-diffusion-xl-base-0.9': + pipe_class = DiffusionPipeline else: pipe_class = StableDiffusionPipeline