diff --git a/requirements.txt b/requirements.txt
index b6f2e30..7a8ec39 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,4 +10,4 @@ aiohttp
psycopg2-binary
pyTelegramBotAPI
-py-leap@git+https://github.com/guilledk/py-leap.git@v0.1a11
+py-leap@git+https://github.com/guilledk/py-leap.git@v0.1a13
diff --git a/skynet/cli.py b/skynet/cli.py
index 4e956f2..9c30c15 100644
--- a/skynet/cli.py
+++ b/skynet/cli.py
@@ -6,6 +6,7 @@ import logging
import random
from typing import Optional
+from datetime import datetime, timedelta
from functools import partial
import trio
@@ -16,8 +17,10 @@ import requests
from leap.cleos import CLEOS, default_nodeos_image
from leap.sugar import get_container, collect_stdout
+from leap.hyperion import HyperionAPI
from .db import open_new_database
+from .ipfs import IPFSDocker
from .config import *
from .nodeos import open_cleos, open_nodeos
from .constants import ALGOS
@@ -91,7 +94,7 @@ def download():
@click.option(
'--account', '-A', default=None)
@click.option(
- '--permission', '-p', default=None)
+ '--permission', '-P', default=None)
@click.option(
'--key', '-k', default=None)
@click.option(
@@ -266,7 +269,7 @@ def db():
@run.command()
def nodeos():
- logging.basicConfig(level=logging.INFO)
+ logging.basicConfig(filename='skynet-nodeos.log', level=logging.INFO)
with open_nodeos(cleanup=False):
...
@@ -280,6 +283,8 @@ def nodeos():
'--key', '-k', default=None)
@click.option(
'--node-url', '-n', default='http://skynet.ancap.tech')
+@click.option(
+ '--ipfs-url', '-n', default='/ip4/169.197.142.4/tcp/4001/p2p/12D3KooWKHKPFuqJPeqYgtUJtfZTHvEArRX2qvThYBrjuTuPg2Nx')
@click.option(
'--algos', '-A', default=json.dumps(['midj']))
def dgpu(
@@ -288,6 +293,7 @@ def dgpu(
permission: str,
key: str | None,
node_url: str,
+ ipfs_url: str,
algos: list[str]
):
from .dgpu import open_dgpu_node
@@ -312,7 +318,9 @@ def dgpu(
partial(
open_dgpu_node,
account, permission,
- cleos, key=key, initial_algos=json.loads(algos)
+ cleos,
+ ipfs_url,
+ key=key, initial_algos=json.loads(algos)
))
finally:
@@ -323,11 +331,13 @@ def dgpu(
@run.command()
@click.option('--loglevel', '-l', default='warning', help='logging level')
@click.option(
- '--account', '-a', default='telegram1')
+ '--account', '-a', default='telegram')
@click.option(
'--permission', '-p', default='active')
@click.option(
'--key', '-k', default=None)
+@click.option(
+ '--hyperion-url', '-n', default='http://test1.us.telos.net:42001')
@click.option(
'--node-url', '-n', default='http://skynet.ancap.tech')
@click.option(
@@ -342,6 +352,7 @@ def telegram(
permission: str,
key: str | None,
node_url: str,
+ hyperion_url: str,
db_host: str,
db_user: str,
db_pass: str
@@ -357,6 +368,57 @@ def telegram(
account,
permission,
node_url,
+ hyperion_url,
db_host, db_user, db_pass,
key=key
))
+
+
+@run.command()
+@click.option('--loglevel', '-l', default='warning', help='logging level')
+@click.option(
+ '--container', '-c', default='ipfs_host')
+@click.option(
+ '--hyperion-url', '-n', default='http://127.0.0.1:42001')
+def pinner(loglevel, container):
+ dclient = docker.from_env()
+
+ container = dclient.containers.get(conatiner)
+ ipfs_node = IPFSDocker(container)
+
+ last_pinned: dict[str, datetime] = {}
+
+ def cleanup_pinned(now: datetime):
+ for cid in last_pinned.keys():
+ ts = last_pinned[cid]
+ if now - ts > timedelta(minutes=1):
+ del last_pinned[cid]
+
+ try:
+ while True:
+ # get all submits in the last minute
+ now = dateimte.now()
+ half_min_ago = now - timedelta(seconds=30)
+ submits = hyperion.get_actions(
+ account='telos.gpu',
+ filter='telos.gpu:submit',
+ sort='desc',
+ after=half_min_ago.isoformat()
+ )
+
+ # filter for the ones not already pinned
+ actions = [
+ action
+ for action in submits['actions']
+ if action['act']['data']['ipfs_hash']
+ not in last_pinned
+ ]
+
+ # pin and remember
+ for action in actions:
+ cid = action['act']['data']['ipfs_hash']
+ last_pinned[cid] = now
+
+ ipfs_node.pin(cid)
+
+ cleanup_pinned(now)
diff --git a/skynet/config.py b/skynet/config.py
index 95bdddf..ace1dd0 100644
--- a/skynet/config.py
+++ b/skynet/config.py
@@ -40,7 +40,7 @@ def init_env_from_config(
def load_account_info(
- key, account, permission
+ key, account, permission,
file_path=DEFAULT_CONFIG_PATH
):
_, _, _, config = init_env_from_config()
@@ -54,4 +54,4 @@ def load_account_info(
if not permission:
permission = config['skynet.account']['permission']
- return
+ return key, account, permission
diff --git a/skynet/db/functions.py b/skynet/db/functions.py
index 4cea259..da35ae0 100644
--- a/skynet/db/functions.py
+++ b/skynet/db/functions.py
@@ -177,25 +177,10 @@ async def open_database_connection(
yield _db_call
-async def get_user(conn, uid: str):
- if isinstance(uid, str):
- proto, uid = try_decode_uid(uid)
-
- match proto:
- case 'tg':
- stmt = await conn.prepare(
- 'SELECT * FROM skynet.user WHERE tg_id = $1')
- user = await stmt.fetchval(uid)
-
- case _:
- user = None
-
- return user
-
- else: # asumme is our uid
- stmt = await conn.prepare(
- 'SELECT * FROM skynet.user WHERE id = $1')
- return await stmt.fetchval(uid)
+async def get_user(conn, uid: int):
+ stmt = await conn.prepare(
+ 'SELECT * FROM skynet.user WHERE id = $1')
+ return await stmt.fetchval(uid)
async def get_user_config(conn, user: int):
@@ -210,44 +195,24 @@ async def get_last_prompt_of(conn, user: int):
return await stmt.fetchval(user)
-async def new_user(conn, uid: str):
+async def new_user(conn, uid: int):
if await get_user(conn, uid):
raise ValueError('User already present on db')
logging.info(f'new user! {uid}')
date = datetime.utcnow()
-
- proto, pid = try_decode_uid(uid)
-
async with conn.transaction():
- match proto:
- case 'tg':
- tg_id = pid
- stmt = await conn.prepare('''
- INSERT INTO skynet.user(
- tg_id, generated, joined, last_prompt, role)
+ stmt = await conn.prepare('''
+ INSERT INTO skynet.user(
+ id, generated, joined, last_prompt, role)
- VALUES($1, $2, $3, $4, $5)
- ON CONFLICT DO NOTHING
- ''')
- await stmt.fetch(
- tg_id, 0, date, None, DEFAULT_ROLE
- )
- new_uid = await get_user(conn, uid)
-
- case None:
- stmt = await conn.prepare('''
- INSERT INTO skynet.user(
- id, generated, joined, last_prompt, role)
-
- VALUES($1, $2, $3, $4, $5)
- ON CONFLICT DO NOTHING
- ''')
- await stmt.fetch(
- pid, 0, date, None, DEFAULT_ROLE
- )
- new_uid = pid
+ VALUES($1, $2, $3, $4, $5)
+ ON CONFLICT DO NOTHING
+ ''')
+ await stmt.fetch(
+ uid, 0, date, None, DEFAULT_ROLE
+ )
stmt = await conn.prepare('''
INSERT INTO skynet.user_config(
@@ -268,8 +233,6 @@ async def new_user(conn, uid: str):
DEFAULT_UPSCALER
)
- return new_uid
-
async def get_or_create_user(conn, uid: str):
user = await get_user(conn, uid)
diff --git a/skynet/dgpu.py b/skynet/dgpu.py
index 37820b4..32c3675 100644
--- a/skynet/dgpu.py
+++ b/skynet/dgpu.py
@@ -57,8 +57,9 @@ async def open_dgpu_node(
account: str,
permission: str,
cleos: CLEOS,
+ remote_ipfs_node: str,
key: str = None,
- initial_algos: Optional[List[str]] = None
+ initial_algos: Optional[List[str]] = None,
):
logging.basicConfig(level=logging.INFO)
@@ -293,6 +294,7 @@ async def open_dgpu_node(
config = await get_global_config()
with open_ipfs_node() as ipfs_node:
+ ipfs_node.connect(remote_ipfs_node)
try:
while True:
maybe_withdraw_all()
diff --git a/skynet/frontend/telegram.py b/skynet/frontend/telegram.py
index f3bef2f..0fcf67a 100644
--- a/skynet/frontend/telegram.py
+++ b/skynet/frontend/telegram.py
@@ -5,6 +5,7 @@ import zlib
import logging
import asyncio
+from hashlib import sha256
from datetime import datetime
import docker
@@ -18,6 +19,7 @@ from telebot.types import (
InputFile, InputMediaPhoto, InlineKeyboardButton, InlineKeyboardMarkup
)
from telebot.async_telebot import AsyncTeleBot
+from telebot.formatting import hlink
from ..db import open_new_database, open_database_connection
from ..constants import *
@@ -44,25 +46,143 @@ def prepare_metainfo_caption(tguser, meta: dict) -> str:
else:
user = f'{tguser.first_name} id: {tguser.id}'
- meta_str = f'by {user}\n'
- meta_str += f'prompt: \"{prompt}\"\n'
- meta_str += f'seed: {meta["seed"]}\n'
- meta_str += f'step: {meta["step"]}\n'
- meta_str += f'guidance: {meta["guidance"]}\n'
+ meta_str = f'by {user}\n'
+
+ meta_str += f'prompt:
{prompt}\n'
+ meta_str += f'seed: {meta["seed"]}
\n'
+ meta_str += f'step: {meta["step"]}
\n'
+ meta_str += f'guidance: {meta["guidance"]}
\n'
if meta['strength']:
- meta_str += f'strength: {meta["strength"]}\n'
- meta_str += f'algo: \"{meta["algo"]}\"\n'
+ meta_str += f'strength: {meta["strength"]}
\n'
+ meta_str += f'algo: {meta["algo"]}
\n'
if meta['upscaler']:
- meta_str += f'upscaler: \"{meta["upscaler"]}\"\n'
- meta_str += f'skynet v{VERSION}'
+ meta_str += f'upscaler: {meta["upscaler"]}
\n'
+
+ meta_str += f'Made with Skynet {VERSION}\n'
+ meta_str += f'JOIN THE SWARM: @skynetgpu'
return meta_str
+def generate_reply_caption(
+ tguser, # telegram user
+ params: dict,
+ ipfs_hash: str,
+ tx_hash: str
+):
+ ipfs_link = hlink(
+ 'Get your image on IPFS',
+ f'http://test1.us.telos.net:8080/ipfs/{ipfs_hash}/image.png'
+ )
+ explorer_link = hlink(
+ 'SKYNET Transaction Explorer',
+ f'http://test1.us.telos.net:42001/v2/explore/transaction/{tx_hash}'
+ )
+
+ meta_info = prepare_metainfo_caption(tguser, params)
+
+ final_msg = '\n'.join([
+ 'Worker finished your task!',
+ ipfs_link,
+ explorer_link,
+ f'PARAMETER INFO:\n{meta_info}'
+ ])
+
+ final_msg = '\n'.join([
+ f'{ipfs_link}',
+ f'{explorer_link}',
+ f'{meta_info}'
+ ])
+
+ logging.info(final_msg)
+
+ return final_msg
+
+
+async def get_global_config(cleos):
+ return (await cleos.aget_table(
+ 'telos.gpu', 'telos.gpu', 'config'))[0]
+
+async def get_user_nonce(cleos, user: str):
+ return (await cleos.aget_table(
+ 'telos.gpu', 'telos.gpu', 'users',
+ index_position=1,
+ key_type='name',
+ lower_bound=user,
+ upper_bound=user
+ ))[0]['nonce']
+
+async def work_request(
+ bot, cleos, hyperion,
+ message,
+ account: str,
+ permission: str,
+ params: dict
+):
+ body = json.dumps({
+ 'method': 'diffuse',
+ 'params': params
+ })
+ user = message.from_user
+ chat = message.chat
+ request_time = datetime.now().isoformat()
+ ec, out = cleos.push_action(
+ 'telos.gpu', 'enqueue', [account, body, '', '20.0000 GPU'], f'{account}@{permission}'
+ )
+ out = collect_stdout(out)
+ if ec != 0:
+ await bot.reply_to(message, out)
+ return
+
+ nonce = await get_user_nonce(cleos, account)
+ request_hash = sha256(
+ (str(nonce) + body).encode('utf-8')).hexdigest().upper()
+
+ request_id = int(out)
+ logging.info(f'{request_id} enqueued.')
+
+ config = await get_global_config(cleos)
+
+ tx_hash = None
+ ipfs_hash = None
+ for i in range(60):
+ submits = await hyperion.aget_actions(
+ account=account,
+ filter='telos.gpu:submit',
+ sort='desc',
+ after=request_time
+ )
+ actions = [
+ action
+ for action in submits['actions']
+ if action[
+ 'act']['data']['request_hash'] == request_hash
+ ]
+ if len(actions) > 0:
+ tx_hash = actions[0]['trx_id']
+ ipfs_hash = actions[0]['act']['data']['ipfs_hash']
+ break
+
+ await asyncio.sleep(1)
+
+ if not ipfs_hash:
+ await bot.reply_to(message, 'timeout processing request')
+ return
+
+ await bot.reply_to(
+ message,
+ generate_reply_caption(
+ user, params, ipfs_hash, tx_hash),
+ reply_markup=build_redo_menu(),
+ parse_mode='HTML'
+ )
+
+
async def run_skynet_telegram(
tg_token: str,
account: str,
permission: str,
node_url: str,
+ hyperion_url: str,
db_host: str,
db_user: str,
db_pass: str,
@@ -78,7 +198,7 @@ async def run_skynet_telegram(
remove=True)
cleos = CLEOS(dclient, vtestnet, url=node_url, remote=node_url)
- hyperion = HyperionAPI(node_url)
+ hyperion = HyperionAPI(hyperion_url)
logging.basicConfig(level=logging.INFO)
@@ -88,10 +208,6 @@ async def run_skynet_telegram(
bot = AsyncTeleBot(tg_token)
logging.info(f'tg_token: {tg_token}')
- async def get_global_config():
- return (await cleos.aget_table(
- 'telos.gpu', 'telos.gpu', 'config'))[0]
-
async with open_database_connection(
db_user, db_pass, db_host
) as db_call:
@@ -117,13 +233,12 @@ async def run_skynet_telegram(
@bot.message_handler(commands=['txt2img'])
async def send_txt2img(message):
+ user = message.from_user.id
chat = message.chat
reply_id = None
if chat.type == 'group' and chat.id == GROUP_ID:
reply_id = message.message_id
- user_id = f'tg+{message.from_user.id}'
-
prompt = ' '.join(message.text.split(' ')[1:])
if len(prompt) == 0:
@@ -131,63 +246,25 @@ async def run_skynet_telegram(
return
logging.info(f'mid: {message.id}')
- user = await db_call('get_or_create_user', user_id)
+
+ await db_call('get_or_create_user', user)
user_config = {**(await db_call('get_user_config', user))}
del user_config['id']
- req = json.dumps({
- 'method': 'diffuse',
- 'params': {
- 'prompt': prompt,
- **user_config
- }
- })
+ params = {
+ 'prompt': prompt,
+ **user_config
+ }
- request_time = datetime.datetime.now().isoformat()
- ec, out = cleos.push_action(
- 'telos.gpu', 'enqueue', [account, req, ''], f'{account}@{permission}'
- )
- out = collect_stdout(out)
- if ec != 0:
- await bot.reply_to(message, out)
- return
+ await db_call('update_user_stats', user, last_prompt=prompt)
- request_id = int(out)
- logging.info(f'{request_id} enqueued.')
-
- config = await get_global_config()
-
- ipfs_hash = None
- sha_hash = None
- for i in range(60):
- submits = await hyperion.aget_actions(
- account=account,
- filter='telos.gpu:submit',
- sort='desc',
- after=request_Time
- )
- actions = submits['actions']
- if len(actions) > 0:
- ipfs_hash = results[0]['ipfs_hash']
- sha_hash = results[0]['result_hash']
- break
- else:
- await asyncio.sleep(1)
-
- if not ipfs_hash:
- await bot.reply_to(message, 'timeout processing request')
- return
-
- ipfs_link = f'https://ipfs.io/ipfs/{ipfs_hash}/image.png'
-
- await bot.reply_to(
- message,
- ipfs_link,
- reply_markup=build_redo_menu()
- )
+ await work_request(
+ bot, cleos, hyperion,
+ message, account, permission, params)
@bot.message_handler(func=lambda message: True, content_types=['photo'])
async def send_img2img(message):
+ user = message.from_user.id
chat = message.chat
reply_id = None
if chat.type == 'group' and chat.id == GROUP_ID:
@@ -261,7 +338,7 @@ async def run_skynet_telegram(
await bot.reply_to(
message,
ipfs_link + '\n' +
- prepare_metainfo_caption(message.from_user, result['meta']['meta']),
+ prepare_metainfo_caption(user, result['meta']['meta']),
reply_to_message_id=reply_id,
reply_markup=build_redo_menu()
)
@@ -277,6 +354,7 @@ async def run_skynet_telegram(
@bot.message_handler(commands=['redo'])
async def redo(message):
+ user = message.from_user.id
chat = message.chat
reply_id = None
if chat.type == 'group' and chat.id == GROUP_ID:
@@ -286,65 +364,31 @@ async def run_skynet_telegram(
del user_config['id']
prompt = await db_call('get_last_prompt_of', user)
- req = json.dumps({
- 'method': 'diffuse',
- 'params': {
- 'prompt': prompt,
- **user_config
- }
- })
-
- ec, out = cleos.push_action(
- 'telos.gpu', 'enqueue', [account, req, ''], f'{account}@{permission}'
- )
- if ec != 0:
- await bot.reply_to(message, out)
+ if not prompt:
+ await bot.reply_to(
+ message,
+ 'no last prompt found, do a txt2img cmd first!'
+ )
return
- request_id = int(out)
- logging.info(f'{request_id} enqueued.')
+ params = {
+ 'prompt': prompt,
+ **user_config
+ }
- ipfs_hash = None
- sha_hash = None
- for i in range(60):
- result = cleos.get_table(
- 'telos.gpu', 'telos.gpu', 'results',
- index_position=2,
- key_type='i64',
- lower_bound=request_id,
- upper_bound=request_id
- )
- if len(results) > 0:
- ipfs_hash = result[0]['ipfs_hash']
- sha_hash = result[0]['result_hash']
- break
- else:
- await asyncio.sleep(1)
-
- if not ipfs_hash:
- await bot.reply_to(message, 'timeout processing request')
-
- ipfs_link = f'https://ipfs.io/ipfs/{ipfs_hash}/image.png'
-
- await bot.reply_to(
- message,
- ipfs_link + '\n' +
- prepare_metainfo_caption(message.from_user, result['meta']['meta']),
- reply_to_message_id=reply_id,
- reply_markup=build_redo_menu()
- )
- return
+ await work_request(
+ bot, cleos, hyperion,
+ message, account, permission, params)
@bot.message_handler(commands=['config'])
async def set_config(message):
- rpc_params = {}
+ user = message.from_user.id
try:
attr, val, reply_txt = validate_user_config_request(
message.text)
logging.info(f'user config update: {attr} to {val}')
- await db_call('update_user_config',
- user, req.params['attr'], req.params['val'])
+ await db_call('update_user_config', user, attr, val)
logging.info('done')
except BaseException as e:
diff --git a/skynet/ipfs.py b/skynet/ipfs.py
index acd63c9..53a2c28 100644
--- a/skynet/ipfs.py
+++ b/skynet/ipfs.py
@@ -27,6 +27,15 @@ class IPFSDocker:
['ipfs', 'pin', 'add', ipfs_hash])
assert ec == 0
+ def connect(self, remote_node: str):
+ ec, out = self._container.exec_run(
+ ['ipfs', 'swarm', 'connect', remote_node])
+ if ec != 0:
+ logging.error(out)
+
+ assert ec == 0
+
+
@cm
def open_ipfs_node():
dclient = docker.from_env()