From 5e017ffac01ac64a9b4c15465861d8516280e997 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sun, 28 May 2023 18:23:51 -0300 Subject: [PATCH] Telegram frontend fixes and create pinner --- requirements.txt | 2 +- skynet/cli.py | 70 +++++++++- skynet/config.py | 4 +- skynet/db/functions.py | 65 ++------- skynet/dgpu.py | 4 +- skynet/frontend/telegram.py | 272 +++++++++++++++++++++--------------- skynet/ipfs.py | 9 ++ 7 files changed, 253 insertions(+), 173 deletions(-) diff --git a/requirements.txt b/requirements.txt index b6f2e30..7a8ec39 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,4 @@ aiohttp psycopg2-binary pyTelegramBotAPI -py-leap@git+https://github.com/guilledk/py-leap.git@v0.1a11 +py-leap@git+https://github.com/guilledk/py-leap.git@v0.1a13 diff --git a/skynet/cli.py b/skynet/cli.py index 4e956f2..9c30c15 100644 --- a/skynet/cli.py +++ b/skynet/cli.py @@ -6,6 +6,7 @@ import logging import random from typing import Optional +from datetime import datetime, timedelta from functools import partial import trio @@ -16,8 +17,10 @@ import requests from leap.cleos import CLEOS, default_nodeos_image from leap.sugar import get_container, collect_stdout +from leap.hyperion import HyperionAPI from .db import open_new_database +from .ipfs import IPFSDocker from .config import * from .nodeos import open_cleos, open_nodeos from .constants import ALGOS @@ -91,7 +94,7 @@ def download(): @click.option( '--account', '-A', default=None) @click.option( - '--permission', '-p', default=None) + '--permission', '-P', default=None) @click.option( '--key', '-k', default=None) @click.option( @@ -266,7 +269,7 @@ def db(): @run.command() def nodeos(): - logging.basicConfig(level=logging.INFO) + logging.basicConfig(filename='skynet-nodeos.log', level=logging.INFO) with open_nodeos(cleanup=False): ... @@ -280,6 +283,8 @@ def nodeos(): '--key', '-k', default=None) @click.option( '--node-url', '-n', default='http://skynet.ancap.tech') +@click.option( + '--ipfs-url', '-n', default='/ip4/169.197.142.4/tcp/4001/p2p/12D3KooWKHKPFuqJPeqYgtUJtfZTHvEArRX2qvThYBrjuTuPg2Nx') @click.option( '--algos', '-A', default=json.dumps(['midj'])) def dgpu( @@ -288,6 +293,7 @@ def dgpu( permission: str, key: str | None, node_url: str, + ipfs_url: str, algos: list[str] ): from .dgpu import open_dgpu_node @@ -312,7 +318,9 @@ def dgpu( partial( open_dgpu_node, account, permission, - cleos, key=key, initial_algos=json.loads(algos) + cleos, + ipfs_url, + key=key, initial_algos=json.loads(algos) )) finally: @@ -323,11 +331,13 @@ def dgpu( @run.command() @click.option('--loglevel', '-l', default='warning', help='logging level') @click.option( - '--account', '-a', default='telegram1') + '--account', '-a', default='telegram') @click.option( '--permission', '-p', default='active') @click.option( '--key', '-k', default=None) +@click.option( + '--hyperion-url', '-n', default='http://test1.us.telos.net:42001') @click.option( '--node-url', '-n', default='http://skynet.ancap.tech') @click.option( @@ -342,6 +352,7 @@ def telegram( permission: str, key: str | None, node_url: str, + hyperion_url: str, db_host: str, db_user: str, db_pass: str @@ -357,6 +368,57 @@ def telegram( account, permission, node_url, + hyperion_url, db_host, db_user, db_pass, key=key )) + + +@run.command() +@click.option('--loglevel', '-l', default='warning', help='logging level') +@click.option( + '--container', '-c', default='ipfs_host') +@click.option( + '--hyperion-url', '-n', default='http://127.0.0.1:42001') +def pinner(loglevel, container): + dclient = docker.from_env() + + container = dclient.containers.get(conatiner) + ipfs_node = IPFSDocker(container) + + last_pinned: dict[str, datetime] = {} + + def cleanup_pinned(now: datetime): + for cid in last_pinned.keys(): + ts = last_pinned[cid] + if now - ts > timedelta(minutes=1): + del last_pinned[cid] + + try: + while True: + # get all submits in the last minute + now = dateimte.now() + half_min_ago = now - timedelta(seconds=30) + submits = hyperion.get_actions( + account='telos.gpu', + filter='telos.gpu:submit', + sort='desc', + after=half_min_ago.isoformat() + ) + + # filter for the ones not already pinned + actions = [ + action + for action in submits['actions'] + if action['act']['data']['ipfs_hash'] + not in last_pinned + ] + + # pin and remember + for action in actions: + cid = action['act']['data']['ipfs_hash'] + last_pinned[cid] = now + + ipfs_node.pin(cid) + + cleanup_pinned(now) diff --git a/skynet/config.py b/skynet/config.py index 95bdddf..ace1dd0 100644 --- a/skynet/config.py +++ b/skynet/config.py @@ -40,7 +40,7 @@ def init_env_from_config( def load_account_info( - key, account, permission + key, account, permission, file_path=DEFAULT_CONFIG_PATH ): _, _, _, config = init_env_from_config() @@ -54,4 +54,4 @@ def load_account_info( if not permission: permission = config['skynet.account']['permission'] - return + return key, account, permission diff --git a/skynet/db/functions.py b/skynet/db/functions.py index 4cea259..da35ae0 100644 --- a/skynet/db/functions.py +++ b/skynet/db/functions.py @@ -177,25 +177,10 @@ async def open_database_connection( yield _db_call -async def get_user(conn, uid: str): - if isinstance(uid, str): - proto, uid = try_decode_uid(uid) - - match proto: - case 'tg': - stmt = await conn.prepare( - 'SELECT * FROM skynet.user WHERE tg_id = $1') - user = await stmt.fetchval(uid) - - case _: - user = None - - return user - - else: # asumme is our uid - stmt = await conn.prepare( - 'SELECT * FROM skynet.user WHERE id = $1') - return await stmt.fetchval(uid) +async def get_user(conn, uid: int): + stmt = await conn.prepare( + 'SELECT * FROM skynet.user WHERE id = $1') + return await stmt.fetchval(uid) async def get_user_config(conn, user: int): @@ -210,44 +195,24 @@ async def get_last_prompt_of(conn, user: int): return await stmt.fetchval(user) -async def new_user(conn, uid: str): +async def new_user(conn, uid: int): if await get_user(conn, uid): raise ValueError('User already present on db') logging.info(f'new user! {uid}') date = datetime.utcnow() - - proto, pid = try_decode_uid(uid) - async with conn.transaction(): - match proto: - case 'tg': - tg_id = pid - stmt = await conn.prepare(''' - INSERT INTO skynet.user( - tg_id, generated, joined, last_prompt, role) + stmt = await conn.prepare(''' + INSERT INTO skynet.user( + id, generated, joined, last_prompt, role) - VALUES($1, $2, $3, $4, $5) - ON CONFLICT DO NOTHING - ''') - await stmt.fetch( - tg_id, 0, date, None, DEFAULT_ROLE - ) - new_uid = await get_user(conn, uid) - - case None: - stmt = await conn.prepare(''' - INSERT INTO skynet.user( - id, generated, joined, last_prompt, role) - - VALUES($1, $2, $3, $4, $5) - ON CONFLICT DO NOTHING - ''') - await stmt.fetch( - pid, 0, date, None, DEFAULT_ROLE - ) - new_uid = pid + VALUES($1, $2, $3, $4, $5) + ON CONFLICT DO NOTHING + ''') + await stmt.fetch( + uid, 0, date, None, DEFAULT_ROLE + ) stmt = await conn.prepare(''' INSERT INTO skynet.user_config( @@ -268,8 +233,6 @@ async def new_user(conn, uid: str): DEFAULT_UPSCALER ) - return new_uid - async def get_or_create_user(conn, uid: str): user = await get_user(conn, uid) diff --git a/skynet/dgpu.py b/skynet/dgpu.py index 37820b4..32c3675 100644 --- a/skynet/dgpu.py +++ b/skynet/dgpu.py @@ -57,8 +57,9 @@ async def open_dgpu_node( account: str, permission: str, cleos: CLEOS, + remote_ipfs_node: str, key: str = None, - initial_algos: Optional[List[str]] = None + initial_algos: Optional[List[str]] = None, ): logging.basicConfig(level=logging.INFO) @@ -293,6 +294,7 @@ async def open_dgpu_node( config = await get_global_config() with open_ipfs_node() as ipfs_node: + ipfs_node.connect(remote_ipfs_node) try: while True: maybe_withdraw_all() diff --git a/skynet/frontend/telegram.py b/skynet/frontend/telegram.py index f3bef2f..0fcf67a 100644 --- a/skynet/frontend/telegram.py +++ b/skynet/frontend/telegram.py @@ -5,6 +5,7 @@ import zlib import logging import asyncio +from hashlib import sha256 from datetime import datetime import docker @@ -18,6 +19,7 @@ from telebot.types import ( InputFile, InputMediaPhoto, InlineKeyboardButton, InlineKeyboardMarkup ) from telebot.async_telebot import AsyncTeleBot +from telebot.formatting import hlink from ..db import open_new_database, open_database_connection from ..constants import * @@ -44,25 +46,143 @@ def prepare_metainfo_caption(tguser, meta: dict) -> str: else: user = f'{tguser.first_name} id: {tguser.id}' - meta_str = f'by {user}\n' - meta_str += f'prompt: \"{prompt}\"\n' - meta_str += f'seed: {meta["seed"]}\n' - meta_str += f'step: {meta["step"]}\n' - meta_str += f'guidance: {meta["guidance"]}\n' + meta_str = f'by {user}\n' + + meta_str += f'prompt: {prompt}\n' + meta_str += f'seed: {meta["seed"]}\n' + meta_str += f'step: {meta["step"]}\n' + meta_str += f'guidance: {meta["guidance"]}\n' if meta['strength']: - meta_str += f'strength: {meta["strength"]}\n' - meta_str += f'algo: \"{meta["algo"]}\"\n' + meta_str += f'strength: {meta["strength"]}\n' + meta_str += f'algo: {meta["algo"]}\n' if meta['upscaler']: - meta_str += f'upscaler: \"{meta["upscaler"]}\"\n' - meta_str += f'skynet v{VERSION}' + meta_str += f'upscaler: {meta["upscaler"]}\n' + + meta_str += f'Made with Skynet {VERSION}\n' + meta_str += f'JOIN THE SWARM: @skynetgpu' return meta_str +def generate_reply_caption( + tguser, # telegram user + params: dict, + ipfs_hash: str, + tx_hash: str +): + ipfs_link = hlink( + 'Get your image on IPFS', + f'http://test1.us.telos.net:8080/ipfs/{ipfs_hash}/image.png' + ) + explorer_link = hlink( + 'SKYNET Transaction Explorer', + f'http://test1.us.telos.net:42001/v2/explore/transaction/{tx_hash}' + ) + + meta_info = prepare_metainfo_caption(tguser, params) + + final_msg = '\n'.join([ + 'Worker finished your task!', + ipfs_link, + explorer_link, + f'PARAMETER INFO:\n{meta_info}' + ]) + + final_msg = '\n'.join([ + f'{ipfs_link}', + f'{explorer_link}', + f'{meta_info}' + ]) + + logging.info(final_msg) + + return final_msg + + +async def get_global_config(cleos): + return (await cleos.aget_table( + 'telos.gpu', 'telos.gpu', 'config'))[0] + +async def get_user_nonce(cleos, user: str): + return (await cleos.aget_table( + 'telos.gpu', 'telos.gpu', 'users', + index_position=1, + key_type='name', + lower_bound=user, + upper_bound=user + ))[0]['nonce'] + +async def work_request( + bot, cleos, hyperion, + message, + account: str, + permission: str, + params: dict +): + body = json.dumps({ + 'method': 'diffuse', + 'params': params + }) + user = message.from_user + chat = message.chat + request_time = datetime.now().isoformat() + ec, out = cleos.push_action( + 'telos.gpu', 'enqueue', [account, body, '', '20.0000 GPU'], f'{account}@{permission}' + ) + out = collect_stdout(out) + if ec != 0: + await bot.reply_to(message, out) + return + + nonce = await get_user_nonce(cleos, account) + request_hash = sha256( + (str(nonce) + body).encode('utf-8')).hexdigest().upper() + + request_id = int(out) + logging.info(f'{request_id} enqueued.') + + config = await get_global_config(cleos) + + tx_hash = None + ipfs_hash = None + for i in range(60): + submits = await hyperion.aget_actions( + account=account, + filter='telos.gpu:submit', + sort='desc', + after=request_time + ) + actions = [ + action + for action in submits['actions'] + if action[ + 'act']['data']['request_hash'] == request_hash + ] + if len(actions) > 0: + tx_hash = actions[0]['trx_id'] + ipfs_hash = actions[0]['act']['data']['ipfs_hash'] + break + + await asyncio.sleep(1) + + if not ipfs_hash: + await bot.reply_to(message, 'timeout processing request') + return + + await bot.reply_to( + message, + generate_reply_caption( + user, params, ipfs_hash, tx_hash), + reply_markup=build_redo_menu(), + parse_mode='HTML' + ) + + async def run_skynet_telegram( tg_token: str, account: str, permission: str, node_url: str, + hyperion_url: str, db_host: str, db_user: str, db_pass: str, @@ -78,7 +198,7 @@ async def run_skynet_telegram( remove=True) cleos = CLEOS(dclient, vtestnet, url=node_url, remote=node_url) - hyperion = HyperionAPI(node_url) + hyperion = HyperionAPI(hyperion_url) logging.basicConfig(level=logging.INFO) @@ -88,10 +208,6 @@ async def run_skynet_telegram( bot = AsyncTeleBot(tg_token) logging.info(f'tg_token: {tg_token}') - async def get_global_config(): - return (await cleos.aget_table( - 'telos.gpu', 'telos.gpu', 'config'))[0] - async with open_database_connection( db_user, db_pass, db_host ) as db_call: @@ -117,13 +233,12 @@ async def run_skynet_telegram( @bot.message_handler(commands=['txt2img']) async def send_txt2img(message): + user = message.from_user.id chat = message.chat reply_id = None if chat.type == 'group' and chat.id == GROUP_ID: reply_id = message.message_id - user_id = f'tg+{message.from_user.id}' - prompt = ' '.join(message.text.split(' ')[1:]) if len(prompt) == 0: @@ -131,63 +246,25 @@ async def run_skynet_telegram( return logging.info(f'mid: {message.id}') - user = await db_call('get_or_create_user', user_id) + + await db_call('get_or_create_user', user) user_config = {**(await db_call('get_user_config', user))} del user_config['id'] - req = json.dumps({ - 'method': 'diffuse', - 'params': { - 'prompt': prompt, - **user_config - } - }) + params = { + 'prompt': prompt, + **user_config + } - request_time = datetime.datetime.now().isoformat() - ec, out = cleos.push_action( - 'telos.gpu', 'enqueue', [account, req, ''], f'{account}@{permission}' - ) - out = collect_stdout(out) - if ec != 0: - await bot.reply_to(message, out) - return + await db_call('update_user_stats', user, last_prompt=prompt) - request_id = int(out) - logging.info(f'{request_id} enqueued.') - - config = await get_global_config() - - ipfs_hash = None - sha_hash = None - for i in range(60): - submits = await hyperion.aget_actions( - account=account, - filter='telos.gpu:submit', - sort='desc', - after=request_Time - ) - actions = submits['actions'] - if len(actions) > 0: - ipfs_hash = results[0]['ipfs_hash'] - sha_hash = results[0]['result_hash'] - break - else: - await asyncio.sleep(1) - - if not ipfs_hash: - await bot.reply_to(message, 'timeout processing request') - return - - ipfs_link = f'https://ipfs.io/ipfs/{ipfs_hash}/image.png' - - await bot.reply_to( - message, - ipfs_link, - reply_markup=build_redo_menu() - ) + await work_request( + bot, cleos, hyperion, + message, account, permission, params) @bot.message_handler(func=lambda message: True, content_types=['photo']) async def send_img2img(message): + user = message.from_user.id chat = message.chat reply_id = None if chat.type == 'group' and chat.id == GROUP_ID: @@ -261,7 +338,7 @@ async def run_skynet_telegram( await bot.reply_to( message, ipfs_link + '\n' + - prepare_metainfo_caption(message.from_user, result['meta']['meta']), + prepare_metainfo_caption(user, result['meta']['meta']), reply_to_message_id=reply_id, reply_markup=build_redo_menu() ) @@ -277,6 +354,7 @@ async def run_skynet_telegram( @bot.message_handler(commands=['redo']) async def redo(message): + user = message.from_user.id chat = message.chat reply_id = None if chat.type == 'group' and chat.id == GROUP_ID: @@ -286,65 +364,31 @@ async def run_skynet_telegram( del user_config['id'] prompt = await db_call('get_last_prompt_of', user) - req = json.dumps({ - 'method': 'diffuse', - 'params': { - 'prompt': prompt, - **user_config - } - }) - - ec, out = cleos.push_action( - 'telos.gpu', 'enqueue', [account, req, ''], f'{account}@{permission}' - ) - if ec != 0: - await bot.reply_to(message, out) + if not prompt: + await bot.reply_to( + message, + 'no last prompt found, do a txt2img cmd first!' + ) return - request_id = int(out) - logging.info(f'{request_id} enqueued.') + params = { + 'prompt': prompt, + **user_config + } - ipfs_hash = None - sha_hash = None - for i in range(60): - result = cleos.get_table( - 'telos.gpu', 'telos.gpu', 'results', - index_position=2, - key_type='i64', - lower_bound=request_id, - upper_bound=request_id - ) - if len(results) > 0: - ipfs_hash = result[0]['ipfs_hash'] - sha_hash = result[0]['result_hash'] - break - else: - await asyncio.sleep(1) - - if not ipfs_hash: - await bot.reply_to(message, 'timeout processing request') - - ipfs_link = f'https://ipfs.io/ipfs/{ipfs_hash}/image.png' - - await bot.reply_to( - message, - ipfs_link + '\n' + - prepare_metainfo_caption(message.from_user, result['meta']['meta']), - reply_to_message_id=reply_id, - reply_markup=build_redo_menu() - ) - return + await work_request( + bot, cleos, hyperion, + message, account, permission, params) @bot.message_handler(commands=['config']) async def set_config(message): - rpc_params = {} + user = message.from_user.id try: attr, val, reply_txt = validate_user_config_request( message.text) logging.info(f'user config update: {attr} to {val}') - await db_call('update_user_config', - user, req.params['attr'], req.params['val']) + await db_call('update_user_config', user, attr, val) logging.info('done') except BaseException as e: diff --git a/skynet/ipfs.py b/skynet/ipfs.py index acd63c9..53a2c28 100644 --- a/skynet/ipfs.py +++ b/skynet/ipfs.py @@ -27,6 +27,15 @@ class IPFSDocker: ['ipfs', 'pin', 'add', ipfs_hash]) assert ec == 0 + def connect(self, remote_node: str): + ec, out = self._container.exec_run( + ['ipfs', 'swarm', 'connect', remote_node]) + if ec != 0: + logging.error(out) + + assert ec == 0 + + @cm def open_ipfs_node(): dclient = docker.from_env()