Add autowithdraw switch, start storing input images on ipfs

add-txt2txt-models
Guillermo Rodriguez 2023-05-29 00:46:47 -03:00
parent 303ed7b24f
commit 22c403d3ae
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
4 changed files with 321 additions and 255 deletions

View File

@ -282,6 +282,8 @@ def nodeos():
'--permission', '-p', default='active')
@click.option(
'--key', '-k', default=None)
@click.option(
'--auto-withdraw', '-w', default=True)
@click.option(
'--node-url', '-n', default='http://skynet.ancap.tech')
@click.option(
@ -293,6 +295,7 @@ def dgpu(
account: str,
permission: str,
key: str | None,
auto_withdraw: bool,
node_url: str,
ipfs_url: str,
algos: list[str]
@ -321,6 +324,7 @@ def dgpu(
account, permission,
cleos,
ipfs_url,
auto_withdraw=auto_withdraw,
key=key, initial_algos=json.loads(algos)
))
@ -341,6 +345,8 @@ def dgpu(
'--hyperion-url', '-n', default='http://test1.us.telos.net:42001')
@click.option(
'--node-url', '-n', default='http://skynet.ancap.tech')
@click.option(
'--ipfs-url', '-n', default='/ip4/169.197.142.4/tcp/4001/p2p/12D3KooWKHKPFuqJPeqYgtUJtfZTHvEArRX2qvThYBrjuTuPg2Nx')
@click.option(
'--db-host', '-h', default='localhost:5432')
@click.option(
@ -352,8 +358,9 @@ def telegram(
account: str,
permission: str,
key: str | None,
node_url: str,
hyperion_url: str,
ipfs_url: str,
node_url: str,
db_host: str,
db_user: str,
db_pass: str
@ -372,6 +379,7 @@ def telegram(
node_url,
hyperion_url,
db_host, db_user, db_pass,
remote_ipfs_node=ipfs_url,
key=key
))
@ -400,9 +408,19 @@ def pinner(loglevel, container, hyperion_url):
try:
while True:
# get all submits in the last minute
now = datetime.now()
half_min_ago = now - timedelta(seconds=30)
# get all enqueues with binary data
# in the last minute
enqueues = hyperion.get_actions(
account='telos.gpu',
filter='telos.gpu:enqueue',
sort='desc',
after=half_min_ago.isoformat()
)
# get all submits in the last minute
submits = hyperion.get_actions(
account='telos.gpu',
filter='telos.gpu:submit',
@ -411,16 +429,23 @@ def pinner(loglevel, container, hyperion_url):
)
# filter for the ones not already pinned
actions = [
action
cids = [
*[
action['act']['data']['binary_data']
for action in enqueues['actions']
if action['act']['data']['binary_data']
not in last_pinned
],
*[
action['act']['data']['ipfs_hash']
for action in submits['actions']
if action['act']['data']['ipfs_hash']
not in last_pinned
]
]
# pin and remember
for action in actions:
cid = action['act']['data']['ipfs_hash']
for cid in cids:
last_pinned[cid] = now
ipfs_node.pin(cid)

View File

@ -27,7 +27,7 @@ from realesrgan import RealESRGANer
from basicsr.archs.rrdbnet_arch import RRDBNet
from diffusers.models import UNet2DConditionModel
from .ipfs import IPFSDocker, open_ipfs_node
from .ipfs import IPFSDocker, open_ipfs_node, get_ipfs_file
from .utils import *
from .constants import *
@ -60,6 +60,7 @@ async def open_dgpu_node(
remote_ipfs_node: str,
key: str = None,
initial_algos: Optional[List[str]] = None,
auto_withdraw: bool = True
):
logging.basicConfig(level=logging.INFO)
@ -103,7 +104,7 @@ async def open_dgpu_node(
logging.info(f'resized it to {image.size}')
if algo not in models:
if algo not in ALGOS:
if params['algo'] not in ALGOS:
raise DGPUComputeError(f'Unknown algo \"{algo}\"')
logging.info(f'{algo} not in loaded models, swapping...')
@ -266,7 +267,7 @@ async def open_dgpu_node(
def publish_on_ipfs(img_sha: str, raw_img: bytes):
logging.info('publish_on_ipfs')
img = Image.open(io.BytesIO(raw_img))
img.save(f'tmp/ipfs-docker-staging/image.png')
img.save(f'ipfs-docker-staging/image.png')
ipfs_hash = ipfs_node.add('image.png')
@ -291,12 +292,23 @@ async def open_dgpu_node(
print(collect_stdout(out))
assert ec == 0
async def get_input_data(ipfs_hash: str) -> bytes:
if ipfs_hash == '':
return b''
resp = await get_ipfs_file(f'http://test1.us.telos.net:8080/ipfs/{ipfs_hash}/image.png')
if resp.status_code != 200:
raise DGPUComputeError('Couldn\'t gather input data from ipfs')
return resp.raw
config = await get_global_config()
with open_ipfs_node() as ipfs_node:
ipfs_node.connect(remote_ipfs_node)
try:
while True:
if auto_withdraw:
maybe_withdraw_all()
queue = await get_work_requests_last_hour()
@ -314,11 +326,15 @@ async def open_dgpu_node(
# parse request
body = json.loads(req['body'])
binary = bytes.fromhex(req['binary_data'])
binary = await get_input_data(req['binary_data'])
hash_str = (
str(await get_user_nonce(req['user']))
+
req['body']
+
req['binary_data']
)
logging.info(f'hashing: {hash_str}')
request_hash = sha256(hash_str.encode('utf-8')).hexdigest()

View File

@ -27,6 +27,7 @@ from telebot.async_telebot import AsyncTeleBot, ExceptionHandler
from telebot.formatting import hlink
from ..db import open_new_database, open_database_connection
from ..ipfs import open_ipfs_node, get_ipfs_file
from ..constants import *
from . import *
@ -45,7 +46,7 @@ def build_redo_menu():
return inline_keyboard
def prepare_metainfo_caption(tguser, meta: dict) -> str:
def prepare_metainfo_caption(tguser, worker: str, meta: dict) -> str:
prompt = meta["prompt"]
if len(prompt) > 256:
prompt = prompt[:256]
@ -55,7 +56,7 @@ def prepare_metainfo_caption(tguser, meta: dict) -> str:
else:
user = f'{tguser.first_name} id: {tguser.id}'
meta_str = f'<u>by {user}</u>\n'
meta_str = f'<u>by {user}</u> <i>performed by {worker}</i>\n'
meta_str += f'<code>prompt:</code> {prompt}\n'
meta_str += f'<code>seed: {meta["seed"]}</code>\n'
@ -76,7 +77,8 @@ def generate_reply_caption(
tguser, # telegram user
params: dict,
ipfs_hash: str,
tx_hash: str
tx_hash: str,
worker: str
):
ipfs_link = hlink(
'Get your image on IPFS',
@ -87,7 +89,7 @@ def generate_reply_caption(
f'http://test1.us.telos.net:42001/v2/explore/transaction/{tx_hash}'
)
meta_info = prepare_metainfo_caption(tguser, params)
meta_info = prepare_metainfo_caption(tguser, worker, params)
final_msg = '\n'.join([
'Worker finished your task!',
@ -126,6 +128,7 @@ async def work_request(
account: str,
permission: str,
params: dict,
ipfs_node,
file_id: str | None = None,
file_path: str | None = None
):
@ -147,11 +150,15 @@ async def work_request(
logging.warning(f'user sent img of size {image.size}')
image.thumbnail((512, 512))
logging.warning(f'resized it to {image.size}')
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
image_raw = img_byte_arr.getvalue()
binary = image_raw.hex()
image.save(f'ipfs-docker-staging/image.png', format='PNG')
ipfs_hash = ipfs_node.add('image.png')
ipfs_node.pin(ipfs_hash)
logging.info(f'published input image {ipfs_hash} on ipfs')
binary = ipfs_hash
else:
binary = ''
@ -166,7 +173,7 @@ async def work_request(
nonce = await get_user_nonce(cleos, account)
request_hash = sha256(
(str(nonce) + body).encode('utf-8')).hexdigest().upper()
(str(nonce) + body + binary).encode('utf-8')).hexdigest().upper()
request_id = int(out)
logging.info(f'{request_id} enqueued.')
@ -190,7 +197,9 @@ async def work_request(
]
if len(actions) > 0:
tx_hash = actions[0]['trx_id']
ipfs_hash = actions[0]['act']['data']['ipfs_hash']
data = actions[0]['act']['data']
ipfs_hash = data['ipfs_hash']
worker = data['worker']
break
await asyncio.sleep(1)
@ -200,23 +209,14 @@ async def work_request(
return
# attempt to get the image and send it
ipfs_link = f'http://test1.us.telos.net:8080/ipfs/{ipfs_hash}/image.png'
logging.info(f'attempting to get image at {ipfs_link}')
resp = None
for i in range(10):
try:
resp = await asks.get(ipfs_link, timeout=2)
except asks.errors.RequestTimeout:
logging.warning('timeout...')
...
logging.info(f'status_code: {resp.status_code}')
resp = await get_ipfs_file(
f'http://test1.us.telos.net:8080/ipfs/{ipfs_hash}/image.png')
caption = generate_reply_caption(
user, params, ipfs_hash, tx_hash)
user, params, ipfs_hash, tx_hash, worker)
if resp.status_code != 200:
logging.error(f'couldn\'t get ipfs hosted image at {ipfs_link}!')
await bot.reply_to(
message,
caption,
@ -225,6 +225,7 @@ async def work_request(
)
else:
logging.info(f'succes! sending generated image')
if file_id: # img2img
await bot.send_media_group(
chat.id,
@ -258,6 +259,7 @@ async def run_skynet_telegram(
db_host: str,
db_user: str,
db_pass: str,
remote_ipfs_node: str,
key: str = None
):
dclient = docker.from_env()
@ -280,6 +282,8 @@ async def run_skynet_telegram(
bot = AsyncTeleBot(tg_token, exception_handler=SKYExceptionHandler)
logging.info(f'tg_token: {tg_token}')
with open_ipfs_node() as ipfs_node:
ipfs_node.connect(remote_ipfs_node)
async with open_database_connection(
db_user, db_pass, db_host
) as db_call:
@ -333,7 +337,8 @@ async def run_skynet_telegram(
await work_request(
bot, cleos, hyperion,
message, user, chat,
account, permission, params
account, permission, params,
ipfs_node
)
@bot.message_handler(func=lambda message: True, content_types=['photo'])
@ -377,6 +382,7 @@ async def run_skynet_telegram(
bot, cleos, hyperion,
message, user, chat,
account, permission, params,
ipfs_node,
file_id=file_id, file_path=file_path
)
@ -426,7 +432,8 @@ async def run_skynet_telegram(
await work_request(
bot, cleos, hyperion,
message, user, chat,
account, permission, params
account, permission, params,
ipfs_node
)
@bot.message_handler(commands=['redo'])

View File

@ -6,12 +6,31 @@ import logging
from pathlib import Path
from contextlib import contextmanager as cm
import asks
import docker
from asks.errors import RequestTimeout
from docker.types import Mount
from docker.models.containers import Container
async def get_ipfs_file(ipfs_link: str):
logging.info(f'attempting to get image at {ipfs_link}')
resp = None
for i in range(10):
try:
resp = await asks.get(ipfs_link, timeout=3)
except asks.errors.RequestTimeout:
logging.warning('timeout...')
if resp:
logging.info(f'status_code: {resp.status_code}')
else:
logging.error(f'timeout')
return resp
class IPFSDocker:
def __init__(self, container: Container):
@ -39,13 +58,18 @@ class IPFSDocker:
@cm
def open_ipfs_node():
def open_ipfs_node(name='skynet-ipfs'):
dclient = docker.from_env()
staging_dir = (Path().resolve() / 'ipfs-docker-staging').mkdir(
parents=True, exist_ok=True)
data_dir = (Path().resolve() / 'ipfs-docker-data').mkdir(
parents=True, exist_ok=True)
try:
container = dclient.containers.get(name)
except docker.errors.NotFound:
staging_dir = Path().resolve() / 'ipfs-docker-staging'
staging_dir.mkdir(parents=True, exist_ok=True)
data_dir = Path().resolve() / 'ipfs-docker-data'
data_dir.mkdir(parents=True, exist_ok=True)
export_target = '/export'
data_target = '/data/ipfs'
@ -62,8 +86,7 @@ def open_ipfs_node():
Mount(export_target, str(staging_dir), 'bind'),
Mount(data_target, str(data_dir), 'bind')
],
detach=True,
remove=True
detach=True
)
uid = os.getuid()
gid = os.getgid()
@ -71,7 +94,6 @@ def open_ipfs_node():
assert ec == 0
ec, out = container.exec_run(['chown', f'{uid}:{gid}', '-R', data_target])
assert ec == 0
try:
for log in container.logs(stream=True):
log = log.decode().rstrip()
@ -81,7 +103,3 @@ def open_ipfs_node():
yield IPFSDocker(container)
finally:
if container:
container.stop()