Telegram frontend fixes and create pinner

add-txt2txt-models
Guillermo Rodriguez 2023-05-28 18:23:51 -03:00
parent 1d7d11a9c1
commit 5e017ffac0
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
7 changed files with 253 additions and 173 deletions

View File

@ -10,4 +10,4 @@ aiohttp
psycopg2-binary
pyTelegramBotAPI
py-leap@git+https://github.com/guilledk/py-leap.git@v0.1a11
py-leap@git+https://github.com/guilledk/py-leap.git@v0.1a13

View File

@ -6,6 +6,7 @@ import logging
import random
from typing import Optional
from datetime import datetime, timedelta
from functools import partial
import trio
@ -16,8 +17,10 @@ import requests
from leap.cleos import CLEOS, default_nodeos_image
from leap.sugar import get_container, collect_stdout
from leap.hyperion import HyperionAPI
from .db import open_new_database
from .ipfs import IPFSDocker
from .config import *
from .nodeos import open_cleos, open_nodeos
from .constants import ALGOS
@ -91,7 +94,7 @@ def download():
@click.option(
'--account', '-A', default=None)
@click.option(
'--permission', '-p', default=None)
'--permission', '-P', default=None)
@click.option(
'--key', '-k', default=None)
@click.option(
@ -266,7 +269,7 @@ def db():
@run.command()
def nodeos():
logging.basicConfig(level=logging.INFO)
logging.basicConfig(filename='skynet-nodeos.log', level=logging.INFO)
with open_nodeos(cleanup=False):
...
@ -280,6 +283,8 @@ def nodeos():
'--key', '-k', default=None)
@click.option(
'--node-url', '-n', default='http://skynet.ancap.tech')
@click.option(
'--ipfs-url', '-n', default='/ip4/169.197.142.4/tcp/4001/p2p/12D3KooWKHKPFuqJPeqYgtUJtfZTHvEArRX2qvThYBrjuTuPg2Nx')
@click.option(
'--algos', '-A', default=json.dumps(['midj']))
def dgpu(
@ -288,6 +293,7 @@ def dgpu(
permission: str,
key: str | None,
node_url: str,
ipfs_url: str,
algos: list[str]
):
from .dgpu import open_dgpu_node
@ -312,7 +318,9 @@ def dgpu(
partial(
open_dgpu_node,
account, permission,
cleos, key=key, initial_algos=json.loads(algos)
cleos,
ipfs_url,
key=key, initial_algos=json.loads(algos)
))
finally:
@ -323,11 +331,13 @@ def dgpu(
@run.command()
@click.option('--loglevel', '-l', default='warning', help='logging level')
@click.option(
'--account', '-a', default='telegram1')
'--account', '-a', default='telegram')
@click.option(
'--permission', '-p', default='active')
@click.option(
'--key', '-k', default=None)
@click.option(
'--hyperion-url', '-n', default='http://test1.us.telos.net:42001')
@click.option(
'--node-url', '-n', default='http://skynet.ancap.tech')
@click.option(
@ -342,6 +352,7 @@ def telegram(
permission: str,
key: str | None,
node_url: str,
hyperion_url: str,
db_host: str,
db_user: str,
db_pass: str
@ -357,6 +368,57 @@ def telegram(
account,
permission,
node_url,
hyperion_url,
db_host, db_user, db_pass,
key=key
))
@run.command()
@click.option('--loglevel', '-l', default='warning', help='logging level')
@click.option(
'--container', '-c', default='ipfs_host')
@click.option(
'--hyperion-url', '-n', default='http://127.0.0.1:42001')
def pinner(loglevel, container):
dclient = docker.from_env()
container = dclient.containers.get(conatiner)
ipfs_node = IPFSDocker(container)
last_pinned: dict[str, datetime] = {}
def cleanup_pinned(now: datetime):
for cid in last_pinned.keys():
ts = last_pinned[cid]
if now - ts > timedelta(minutes=1):
del last_pinned[cid]
try:
while True:
# get all submits in the last minute
now = dateimte.now()
half_min_ago = now - timedelta(seconds=30)
submits = hyperion.get_actions(
account='telos.gpu',
filter='telos.gpu:submit',
sort='desc',
after=half_min_ago.isoformat()
)
# filter for the ones not already pinned
actions = [
action
for action in submits['actions']
if action['act']['data']['ipfs_hash']
not in last_pinned
]
# pin and remember
for action in actions:
cid = action['act']['data']['ipfs_hash']
last_pinned[cid] = now
ipfs_node.pin(cid)
cleanup_pinned(now)

View File

@ -40,7 +40,7 @@ def init_env_from_config(
def load_account_info(
key, account, permission
key, account, permission,
file_path=DEFAULT_CONFIG_PATH
):
_, _, _, config = init_env_from_config()
@ -54,4 +54,4 @@ def load_account_info(
if not permission:
permission = config['skynet.account']['permission']
return
return key, account, permission

View File

@ -177,22 +177,7 @@ async def open_database_connection(
yield _db_call
async def get_user(conn, uid: str):
if isinstance(uid, str):
proto, uid = try_decode_uid(uid)
match proto:
case 'tg':
stmt = await conn.prepare(
'SELECT * FROM skynet.user WHERE tg_id = $1')
user = await stmt.fetchval(uid)
case _:
user = None
return user
else: # asumme is our uid
async def get_user(conn, uid: int):
stmt = await conn.prepare(
'SELECT * FROM skynet.user WHERE id = $1')
return await stmt.fetchval(uid)
@ -210,33 +195,14 @@ async def get_last_prompt_of(conn, user: int):
return await stmt.fetchval(user)
async def new_user(conn, uid: str):
async def new_user(conn, uid: int):
if await get_user(conn, uid):
raise ValueError('User already present on db')
logging.info(f'new user! {uid}')
date = datetime.utcnow()
proto, pid = try_decode_uid(uid)
async with conn.transaction():
match proto:
case 'tg':
tg_id = pid
stmt = await conn.prepare('''
INSERT INTO skynet.user(
tg_id, generated, joined, last_prompt, role)
VALUES($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING
''')
await stmt.fetch(
tg_id, 0, date, None, DEFAULT_ROLE
)
new_uid = await get_user(conn, uid)
case None:
stmt = await conn.prepare('''
INSERT INTO skynet.user(
id, generated, joined, last_prompt, role)
@ -245,9 +211,8 @@ async def new_user(conn, uid: str):
ON CONFLICT DO NOTHING
''')
await stmt.fetch(
pid, 0, date, None, DEFAULT_ROLE
uid, 0, date, None, DEFAULT_ROLE
)
new_uid = pid
stmt = await conn.prepare('''
INSERT INTO skynet.user_config(
@ -268,8 +233,6 @@ async def new_user(conn, uid: str):
DEFAULT_UPSCALER
)
return new_uid
async def get_or_create_user(conn, uid: str):
user = await get_user(conn, uid)

View File

@ -57,8 +57,9 @@ async def open_dgpu_node(
account: str,
permission: str,
cleos: CLEOS,
remote_ipfs_node: str,
key: str = None,
initial_algos: Optional[List[str]] = None
initial_algos: Optional[List[str]] = None,
):
logging.basicConfig(level=logging.INFO)
@ -293,6 +294,7 @@ async def open_dgpu_node(
config = await get_global_config()
with open_ipfs_node() as ipfs_node:
ipfs_node.connect(remote_ipfs_node)
try:
while True:
maybe_withdraw_all()

View File

@ -5,6 +5,7 @@ import zlib
import logging
import asyncio
from hashlib import sha256
from datetime import datetime
import docker
@ -18,6 +19,7 @@ from telebot.types import (
InputFile, InputMediaPhoto, InlineKeyboardButton, InlineKeyboardMarkup
)
from telebot.async_telebot import AsyncTeleBot
from telebot.formatting import hlink
from ..db import open_new_database, open_database_connection
from ..constants import *
@ -44,25 +46,143 @@ def prepare_metainfo_caption(tguser, meta: dict) -> str:
else:
user = f'{tguser.first_name} id: {tguser.id}'
meta_str = f'by {user}\n'
meta_str += f'prompt: \"{prompt}\"\n'
meta_str += f'seed: {meta["seed"]}\n'
meta_str += f'step: {meta["step"]}\n'
meta_str += f'guidance: {meta["guidance"]}\n'
meta_str = f'<u>by {user}</u>\n'
meta_str += f'<code>prompt:</code> {prompt}\n'
meta_str += f'<code>seed: {meta["seed"]}</code>\n'
meta_str += f'<code>step: {meta["step"]}</code>\n'
meta_str += f'<code>guidance: {meta["guidance"]}</code>\n'
if meta['strength']:
meta_str += f'strength: {meta["strength"]}\n'
meta_str += f'algo: \"{meta["algo"]}\"\n'
meta_str += f'<code>strength: {meta["strength"]}</code>\n'
meta_str += f'<code>algo: {meta["algo"]}</code>\n'
if meta['upscaler']:
meta_str += f'upscaler: \"{meta["upscaler"]}\"\n'
meta_str += f'skynet v{VERSION}'
meta_str += f'<code>upscaler: {meta["upscaler"]}</code>\n'
meta_str += f'<b><u>Made with Skynet {VERSION}</u></b>\n'
meta_str += f'<b>JOIN THE SWARM: @skynetgpu</b>'
return meta_str
def generate_reply_caption(
tguser, # telegram user
params: dict,
ipfs_hash: str,
tx_hash: str
):
ipfs_link = hlink(
'Get your image on IPFS',
f'http://test1.us.telos.net:8080/ipfs/{ipfs_hash}/image.png'
)
explorer_link = hlink(
'SKYNET Transaction Explorer',
f'http://test1.us.telos.net:42001/v2/explore/transaction/{tx_hash}'
)
meta_info = prepare_metainfo_caption(tguser, params)
final_msg = '\n'.join([
'Worker finished your task!',
ipfs_link,
explorer_link,
f'PARAMETER INFO:\n{meta_info}'
])
final_msg = '\n'.join([
f'<b>{ipfs_link}</b>',
f'<i>{explorer_link}</i>',
f'{meta_info}'
])
logging.info(final_msg)
return final_msg
async def get_global_config(cleos):
return (await cleos.aget_table(
'telos.gpu', 'telos.gpu', 'config'))[0]
async def get_user_nonce(cleos, user: str):
return (await cleos.aget_table(
'telos.gpu', 'telos.gpu', 'users',
index_position=1,
key_type='name',
lower_bound=user,
upper_bound=user
))[0]['nonce']
async def work_request(
bot, cleos, hyperion,
message,
account: str,
permission: str,
params: dict
):
body = json.dumps({
'method': 'diffuse',
'params': params
})
user = message.from_user
chat = message.chat
request_time = datetime.now().isoformat()
ec, out = cleos.push_action(
'telos.gpu', 'enqueue', [account, body, '', '20.0000 GPU'], f'{account}@{permission}'
)
out = collect_stdout(out)
if ec != 0:
await bot.reply_to(message, out)
return
nonce = await get_user_nonce(cleos, account)
request_hash = sha256(
(str(nonce) + body).encode('utf-8')).hexdigest().upper()
request_id = int(out)
logging.info(f'{request_id} enqueued.')
config = await get_global_config(cleos)
tx_hash = None
ipfs_hash = None
for i in range(60):
submits = await hyperion.aget_actions(
account=account,
filter='telos.gpu:submit',
sort='desc',
after=request_time
)
actions = [
action
for action in submits['actions']
if action[
'act']['data']['request_hash'] == request_hash
]
if len(actions) > 0:
tx_hash = actions[0]['trx_id']
ipfs_hash = actions[0]['act']['data']['ipfs_hash']
break
await asyncio.sleep(1)
if not ipfs_hash:
await bot.reply_to(message, 'timeout processing request')
return
await bot.reply_to(
message,
generate_reply_caption(
user, params, ipfs_hash, tx_hash),
reply_markup=build_redo_menu(),
parse_mode='HTML'
)
async def run_skynet_telegram(
tg_token: str,
account: str,
permission: str,
node_url: str,
hyperion_url: str,
db_host: str,
db_user: str,
db_pass: str,
@ -78,7 +198,7 @@ async def run_skynet_telegram(
remove=True)
cleos = CLEOS(dclient, vtestnet, url=node_url, remote=node_url)
hyperion = HyperionAPI(node_url)
hyperion = HyperionAPI(hyperion_url)
logging.basicConfig(level=logging.INFO)
@ -88,10 +208,6 @@ async def run_skynet_telegram(
bot = AsyncTeleBot(tg_token)
logging.info(f'tg_token: {tg_token}')
async def get_global_config():
return (await cleos.aget_table(
'telos.gpu', 'telos.gpu', 'config'))[0]
async with open_database_connection(
db_user, db_pass, db_host
) as db_call:
@ -117,13 +233,12 @@ async def run_skynet_telegram(
@bot.message_handler(commands=['txt2img'])
async def send_txt2img(message):
user = message.from_user.id
chat = message.chat
reply_id = None
if chat.type == 'group' and chat.id == GROUP_ID:
reply_id = message.message_id
user_id = f'tg+{message.from_user.id}'
prompt = ' '.join(message.text.split(' ')[1:])
if len(prompt) == 0:
@ -131,63 +246,25 @@ async def run_skynet_telegram(
return
logging.info(f'mid: {message.id}')
user = await db_call('get_or_create_user', user_id)
await db_call('get_or_create_user', user)
user_config = {**(await db_call('get_user_config', user))}
del user_config['id']
req = json.dumps({
'method': 'diffuse',
'params': {
params = {
'prompt': prompt,
**user_config
}
})
request_time = datetime.datetime.now().isoformat()
ec, out = cleos.push_action(
'telos.gpu', 'enqueue', [account, req, ''], f'{account}@{permission}'
)
out = collect_stdout(out)
if ec != 0:
await bot.reply_to(message, out)
return
await db_call('update_user_stats', user, last_prompt=prompt)
request_id = int(out)
logging.info(f'{request_id} enqueued.')
config = await get_global_config()
ipfs_hash = None
sha_hash = None
for i in range(60):
submits = await hyperion.aget_actions(
account=account,
filter='telos.gpu:submit',
sort='desc',
after=request_Time
)
actions = submits['actions']
if len(actions) > 0:
ipfs_hash = results[0]['ipfs_hash']
sha_hash = results[0]['result_hash']
break
else:
await asyncio.sleep(1)
if not ipfs_hash:
await bot.reply_to(message, 'timeout processing request')
return
ipfs_link = f'https://ipfs.io/ipfs/{ipfs_hash}/image.png'
await bot.reply_to(
message,
ipfs_link,
reply_markup=build_redo_menu()
)
await work_request(
bot, cleos, hyperion,
message, account, permission, params)
@bot.message_handler(func=lambda message: True, content_types=['photo'])
async def send_img2img(message):
user = message.from_user.id
chat = message.chat
reply_id = None
if chat.type == 'group' and chat.id == GROUP_ID:
@ -261,7 +338,7 @@ async def run_skynet_telegram(
await bot.reply_to(
message,
ipfs_link + '\n' +
prepare_metainfo_caption(message.from_user, result['meta']['meta']),
prepare_metainfo_caption(user, result['meta']['meta']),
reply_to_message_id=reply_id,
reply_markup=build_redo_menu()
)
@ -277,6 +354,7 @@ async def run_skynet_telegram(
@bot.message_handler(commands=['redo'])
async def redo(message):
user = message.from_user.id
chat = message.chat
reply_id = None
if chat.type == 'group' and chat.id == GROUP_ID:
@ -286,65 +364,31 @@ async def run_skynet_telegram(
del user_config['id']
prompt = await db_call('get_last_prompt_of', user)
req = json.dumps({
'method': 'diffuse',
'params': {
if not prompt:
await bot.reply_to(
message,
'no last prompt found, do a txt2img cmd first!'
)
return
params = {
'prompt': prompt,
**user_config
}
})
ec, out = cleos.push_action(
'telos.gpu', 'enqueue', [account, req, ''], f'{account}@{permission}'
)
if ec != 0:
await bot.reply_to(message, out)
return
request_id = int(out)
logging.info(f'{request_id} enqueued.')
ipfs_hash = None
sha_hash = None
for i in range(60):
result = cleos.get_table(
'telos.gpu', 'telos.gpu', 'results',
index_position=2,
key_type='i64',
lower_bound=request_id,
upper_bound=request_id
)
if len(results) > 0:
ipfs_hash = result[0]['ipfs_hash']
sha_hash = result[0]['result_hash']
break
else:
await asyncio.sleep(1)
if not ipfs_hash:
await bot.reply_to(message, 'timeout processing request')
ipfs_link = f'https://ipfs.io/ipfs/{ipfs_hash}/image.png'
await bot.reply_to(
message,
ipfs_link + '\n' +
prepare_metainfo_caption(message.from_user, result['meta']['meta']),
reply_to_message_id=reply_id,
reply_markup=build_redo_menu()
)
return
await work_request(
bot, cleos, hyperion,
message, account, permission, params)
@bot.message_handler(commands=['config'])
async def set_config(message):
rpc_params = {}
user = message.from_user.id
try:
attr, val, reply_txt = validate_user_config_request(
message.text)
logging.info(f'user config update: {attr} to {val}')
await db_call('update_user_config',
user, req.params['attr'], req.params['val'])
await db_call('update_user_config', user, attr, val)
logging.info('done')
except BaseException as e:

View File

@ -27,6 +27,15 @@ class IPFSDocker:
['ipfs', 'pin', 'add', ipfs_hash])
assert ec == 0
def connect(self, remote_node: str):
ec, out = self._container.exec_run(
['ipfs', 'swarm', 'connect', remote_node])
if ec != 0:
logging.error(out)
assert ec == 0
@cm
def open_ipfs_node():
dclient = docker.from_env()