Make worker more resilient by using failable wrapper on network calls, modularize ipfs module and pinner code, drop ipfs links from telegram response and make explorer link easily configurable

add-txt2txt-models
Guillermo Rodriguez 2023-06-10 09:27:04 -03:00
parent c8a0a390a6
commit 44bfc5e9e7
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
8 changed files with 309 additions and 222 deletions

View File

@ -1,19 +1,14 @@
#!/usr/bin/python #!/usr/bin/python
import os
import time
import json import json
import logging import logging
import random import random
from typing import Optional
from datetime import datetime, timedelta
from functools import partial from functools import partial
import trio import trio
import asks import asks
import click import click
import docker
import asyncio import asyncio
import requests import requests
@ -21,8 +16,10 @@ from leap.cleos import CLEOS
from leap.sugar import collect_stdout from leap.sugar import collect_stdout
from leap.hyperion import HyperionAPI from leap.hyperion import HyperionAPI
from skynet.ipfs import IPFSHTTP
from .db import open_new_database from .db import open_new_database
from .ipfs import open_ipfs_node
from .config import * from .config import *
from .nodeos import open_cleos, open_nodeos from .nodeos import open_cleos, open_nodeos
from .constants import * from .constants import *
@ -352,9 +349,9 @@ def dgpu(
@click.option( @click.option(
'--key', '-k', default=None) '--key', '-k', default=None)
@click.option( @click.option(
'--hyperion-url', '-y', default='https://skynet.ancap.tech') '--hyperion-url', '-y', default=f'https://{DEFAULT_DOMAIN}')
@click.option( @click.option(
'--node-url', '-n', default='https://skynet.ancap.tech') '--node-url', '-n', default=f'https://{DEFAULT_DOMAIN}')
@click.option( @click.option(
'--ipfs-url', '-i', default=DEFAULT_IPFS_REMOTE) '--ipfs-url', '-i', default=DEFAULT_IPFS_REMOTE)
@click.option( @click.option(
@ -404,28 +401,12 @@ def telegram(
asyncio.run(_async_main()) asyncio.run(_async_main())
class IPFSHTTP:
def __init__(self, endpoint: str):
self.endpoint = endpoint
def pin(self, cid: str):
return requests.post(
f'{self.endpoint}/api/v0/pin/add',
params={'arg': cid}
)
async def a_pin(self, cid: str):
return await asks.post(
f'{self.endpoint}/api/v0/pin/add',
params={'arg': cid}
)
@run.command() @run.command()
@click.option('--loglevel', '-l', default='INFO', help='logging level') @click.option('--loglevel', '-l', default='INFO', help='logging level')
@click.option('--name', '-n', default='skynet-ipfs', help='container name') @click.option('--name', '-n', default='skynet-ipfs', help='container name')
def ipfs(loglevel, name): def ipfs(loglevel, name):
from skynet.ipfs.docker import open_ipfs_node
logging.basicConfig(level=loglevel) logging.basicConfig(level=loglevel)
with open_ipfs_node(name=name): with open_ipfs_node(name=name):
... ...
@ -437,90 +418,12 @@ def ipfs(loglevel, name):
@click.option( @click.option(
'--hyperion-url', '-y', default='http://127.0.0.1:42001') '--hyperion-url', '-y', default='http://127.0.0.1:42001')
def pinner(loglevel, ipfs_rpc, hyperion_url): def pinner(loglevel, ipfs_rpc, hyperion_url):
from .ipfs.pinner import SkynetPinner
logging.basicConfig(level=loglevel) logging.basicConfig(level=loglevel)
ipfs_node = IPFSHTTP(ipfs_rpc) ipfs_node = IPFSHTTP(ipfs_rpc)
hyperion = HyperionAPI(hyperion_url) hyperion = HyperionAPI(hyperion_url)
pinned = set() pinner = SkynetPinner(hyperion, ipfs_node)
async def _async_main():
async def capture_enqueues(after: datetime): trio.run(pinner.pin_forever)
enqueues = await hyperion.aget_actions(
account='telos.gpu',
filter='telos.gpu:enqueue',
sort='desc',
after=after.isoformat(),
limit=1000
)
logging.info(f'got {len(enqueues["actions"])} enqueue actions.')
cids = []
for action in enqueues['actions']:
cid = action['act']['data']['binary_data']
if cid and cid not in pinned:
pinned.add(cid)
cids.append(cid)
return cids
async def capture_submits(after: datetime):
submits = await hyperion.aget_actions(
account='telos.gpu',
filter='telos.gpu:submit',
sort='desc',
after=after.isoformat(),
limit=1000
)
logging.info(f'got {len(submits["actions"])} submits actions.')
cids = []
for action in submits['actions']:
cid = action['act']['data']['ipfs_hash']
if cid and cid not in pinned:
pinned.add(cid)
cids.append(cid)
return cids
async def task_pin(cid: str):
logging.info(f'pinning {cid}...')
for _ in range(6):
try:
with trio.move_on_after(5):
resp = await ipfs_node.a_pin(cid)
if resp.status_code != 200:
logging.error(f'error pinning {cid}:\n{resp.text}')
else:
logging.info(f'pinned {cid}')
return
except trio.TooSlowError:
logging.error(f'timed out pinning {cid}')
logging.error(f'gave up pinning {cid}')
try:
async with trio.open_nursery() as n:
while True:
now = datetime.now()
prev_second = now - timedelta(seconds=10)
# filter for the ones not already pinned
cids = [
*(await capture_enqueues(prev_second)),
*(await capture_submits(prev_second))
]
# pin and remember (in parallel)
for cid in cids:
n.start_soon(task_pin, cid)
await trio.sleep(1)
except KeyboardInterrupt:
...
trio.run(_async_main)

View File

@ -144,4 +144,6 @@ CONFIG_ATTRS = [
'upscaler' 'upscaler'
] ]
DEFAULT_DOMAIN = 'skygpu.net'
DEFAULT_IPFS_REMOTE = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv' DEFAULT_IPFS_REMOTE = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv'

View File

@ -1,5 +1,6 @@
#!/usr/bin/python #!/usr/bin/python
from functools import partial
import io import io
import json import json
import time import time
@ -15,7 +16,19 @@ from leap.cleos import CLEOS
from leap.sugar import Checksum256, Name, asset_from_str from leap.sugar import Checksum256, Name, asset_from_str
from skynet.dgpu.errors import DGPUComputeError from skynet.dgpu.errors import DGPUComputeError
from skynet.ipfs import get_ipfs_file, open_ipfs_node from skynet.ipfs import get_ipfs_file
from skynet.ipfs.docker import open_ipfs_node
async def failable(fn: partial, ret_fail=None):
try:
return await fn()
except (
asks.errors.RequestTimeout,
json.JSONDecodeError
):
return ret_fail
class SkynetGPUConnector: class SkynetGPUConnector:
@ -46,51 +59,61 @@ class SkynetGPUConnector:
yield self yield self
self.disconnect() self.disconnect()
# blockchain helpers # blockchain helpers
async def get_work_requests_last_hour(self): async def get_work_requests_last_hour(self):
logging.info('get_work_requests_last_hour') logging.info('get_work_requests_last_hour')
try: return await failable(
return await self.cleos.aget_table( partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'queue', 'telos.gpu', 'telos.gpu', 'queue',
index_position=2, index_position=2,
key_type='i64', key_type='i64',
lower_bound=int(time.time()) - 3600 lower_bound=int(time.time()) - 3600
) ), ret_fail=[])
except (
asks.errors.RequestTimeout,
json.JSONDecodeError
):
return []
async def get_status_by_request_id(self, request_id: int): async def get_status_by_request_id(self, request_id: int):
logging.info('get_status_by_request_id') logging.info('get_status_by_request_id')
return await self.cleos.aget_table( return await failable(
'telos.gpu', request_id, 'status') partial(
self.cleos.aget_table,
'telos.gpu', request_id, 'status'), ret_fail=[])
async def get_global_config(self): async def get_global_config(self):
logging.info('get_global_config') logging.info('get_global_config')
return (await self.cleos.aget_table( rows = await failable(
'telos.gpu', 'telos.gpu', 'config'))[0] partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'config'))
if rows:
return rows[0]
else:
return None
async def get_worker_balance(self): async def get_worker_balance(self):
logging.info('get_worker_balance') logging.info('get_worker_balance')
rows = await self.cleos.aget_table( rows = await failable(
partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'users', 'telos.gpu', 'telos.gpu', 'users',
index_position=1, index_position=1,
key_type='name', key_type='name',
lower_bound=self.account, lower_bound=self.account,
upper_bound=self.account upper_bound=self.account
) ))
if len(rows) == 1:
if rows:
return rows[0]['balance'] return rows[0]['balance']
else: else:
return None return None
async def begin_work(self, request_id: int): async def begin_work(self, request_id: int):
logging.info('begin_work') logging.info('begin_work')
return await self.cleos.a_push_action( return await failable(
partial(
self.cleos.a_push_action,
'telos.gpu', 'telos.gpu',
'workbegin', 'workbegin',
{ {
@ -101,10 +124,13 @@ class SkynetGPUConnector:
self.account, self.key, self.account, self.key,
permission=self.permission permission=self.permission
) )
)
async def cancel_work(self, request_id: int, reason: str): async def cancel_work(self, request_id: int, reason: str):
logging.info('cancel_work') logging.info('cancel_work')
return await self.cleos.a_push_action( return await failable(
partial(
self.cleos.a_push_action,
'telos.gpu', 'telos.gpu',
'workcancel', 'workcancel',
{ {
@ -115,6 +141,7 @@ class SkynetGPUConnector:
self.account, self.key, self.account, self.key,
permission=self.permission permission=self.permission
) )
)
async def maybe_withdraw_all(self): async def maybe_withdraw_all(self):
logging.info('maybe_withdraw_all') logging.info('maybe_withdraw_all')
@ -124,7 +151,9 @@ class SkynetGPUConnector:
balance_amount = float(balance.split(' ')[0]) balance_amount = float(balance.split(' ')[0])
if balance_amount > 0: if balance_amount > 0:
await self.cleos.a_push_action( await failable(
partial(
self.cleos.a_push_action,
'telos.gpu', 'telos.gpu',
'withdraw', 'withdraw',
{ {
@ -134,16 +163,20 @@ class SkynetGPUConnector:
self.account, self.key, self.account, self.key,
permission=self.permission permission=self.permission
) )
)
async def find_my_results(self): async def find_my_results(self):
logging.info('find_my_results') logging.info('find_my_results')
return await self.cleos.aget_table( return await failable(
partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'results', 'telos.gpu', 'telos.gpu', 'results',
index_position=4, index_position=4,
key_type='name', key_type='name',
lower_bound=self.account, lower_bound=self.account,
upper_bound=self.account upper_bound=self.account
) )
)
async def submit_work( async def submit_work(
self, self,
@ -153,7 +186,9 @@ class SkynetGPUConnector:
ipfs_hash: str ipfs_hash: str
): ):
logging.info('submit_work') logging.info('submit_work')
await self.cleos.a_push_action( return await failable(
partial(
self.cleos.a_push_action,
'telos.gpu', 'telos.gpu',
'submit', 'submit',
{ {
@ -166,6 +201,7 @@ class SkynetGPUConnector:
self.account, self.key, self.account, self.key,
permission=self.permission permission=self.permission
) )
)
# IPFS helpers # IPFS helpers

View File

@ -13,14 +13,13 @@ from contextlib import asynccontextmanager as acm
from leap.cleos import CLEOS from leap.cleos import CLEOS
from leap.sugar import Name, asset_from_str, collect_stdout from leap.sugar import Name, asset_from_str, collect_stdout
from leap.hyperion import HyperionAPI from leap.hyperion import HyperionAPI
from telebot.asyncio_helper import ApiTelegramException
from telebot.types import InputMediaPhoto from telebot.types import InputMediaPhoto
from telebot.types import CallbackQuery
from telebot.async_telebot import AsyncTeleBot from telebot.async_telebot import AsyncTeleBot
from skynet.db import open_new_database, open_database_connection from skynet.db import open_new_database, open_database_connection
from skynet.ipfs import open_ipfs_node, get_ipfs_file from skynet.ipfs import get_ipfs_file
from skynet.ipfs.docker import open_ipfs_node
from skynet.constants import * from skynet.constants import *
from . import * from . import *
@ -164,7 +163,7 @@ class SkynetTelegramFrontend:
enqueue_tx_id = res['transaction_id'] enqueue_tx_id = res['transaction_id']
enqueue_tx_link = hlink( enqueue_tx_link = hlink(
'Your request on Skynet Explorer', 'Your request on Skynet Explorer',
f'https://skynet.ancap.tech/v2/explore/transaction/{enqueue_tx_id}' f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{enqueue_tx_id}'
) )
await self.append_status_message( await self.append_status_message(
@ -221,7 +220,7 @@ class SkynetTelegramFrontend:
tx_link = hlink( tx_link = hlink(
'Your result on Skynet Explorer', 'Your result on Skynet Explorer',
f'https://skynet.ancap.tech/v2/explore/transaction/{tx_hash}' f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{tx_hash}'
) )
await self.append_status_message( await self.append_status_message(
@ -233,11 +232,11 @@ class SkynetTelegramFrontend:
) )
# attempt to get the image and send it # attempt to get the image and send it
ipfs_link = f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png' ipfs_link = f'https://ipfs.{DEFAULT_DOMAIN}/ipfs/{ipfs_hash}/image.png'
resp = await get_ipfs_file(ipfs_link) resp = await get_ipfs_file(ipfs_link)
caption = generate_reply_caption( caption = generate_reply_caption(
user, params, ipfs_hash, tx_hash, worker, reward) user, params, tx_hash, worker, reward)
if not resp or resp.status_code != 200: if not resp or resp.status_code != 200:
logging.error(f'couldn\'t get ipfs hosted image at {ipfs_link}!') logging.error(f'couldn\'t get ipfs hosted image at {ipfs_link}!')

View File

@ -65,32 +65,25 @@ def prepare_metainfo_caption(tguser, worker: str, reward: str, meta: dict) -> st
def generate_reply_caption( def generate_reply_caption(
tguser, # telegram user tguser, # telegram user
params: dict, params: dict,
ipfs_hash: str,
tx_hash: str, tx_hash: str,
worker: str, worker: str,
reward: str reward: str
): ):
ipfs_link = hlink(
'Get your image on IPFS',
f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png'
)
explorer_link = hlink( explorer_link = hlink(
'SKYNET Transaction Explorer', 'SKYNET Transaction Explorer',
f'https://skynet.ancap.tech/v2/explore/transaction/{tx_hash}' f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{tx_hash}'
) )
meta_info = prepare_metainfo_caption(tguser, worker, reward, params) meta_info = prepare_metainfo_caption(tguser, worker, reward, params)
final_msg = '\n'.join([ final_msg = '\n'.join([
'Worker finished your task!', 'Worker finished your task!',
ipfs_link,
explorer_link, explorer_link,
f'PARAMETER INFO:\n{meta_info}' f'PARAMETER INFO:\n{meta_info}'
]) ])
final_msg = '\n'.join([ final_msg = '\n'.join([
f'<b>{ipfs_link}</b>', f'<b><i>{explorer_link}</i></b>',
f'<i>{explorer_link}</i>',
f'{meta_info}' f'{meta_info}'
]) ])

View File

@ -0,0 +1,45 @@
#!/usr/bin/python
import logging
import asks
import requests
class IPFSHTTP:
def __init__(self, endpoint: str):
self.endpoint = endpoint
def pin(self, cid: str):
return requests.post(
f'{self.endpoint}/api/v0/pin/add',
params={'arg': cid}
)
async def a_pin(self, cid: str):
return await asks.post(
f'{self.endpoint}/api/v0/pin/add',
params={'arg': cid}
)
async def get_ipfs_file(ipfs_link: str):
logging.info(f'attempting to get image at {ipfs_link}')
resp = None
for i in range(10):
try:
resp = await asks.get(ipfs_link, timeout=3)
except asks.errors.RequestTimeout:
logging.warning('timeout...')
except asks.errors.BadHttpResponse as e:
logging.error(f'ifps gateway exception: \n{e}')
if resp:
logging.info(f'status_code: {resp.status_code}')
else:
logging.error(f'timeout')
return resp

View File

@ -1,36 +1,18 @@
#!/usr/bin/python #!/usr/bin/python
import os import os
import sys
import logging import logging
from pathlib import Path from pathlib import Path
from contextlib import contextmanager as cm from contextlib import contextmanager as cm
import asks
import docker import docker
from asks.errors import RequestTimeout
from docker.types import Mount from docker.types import Mount
from docker.models.containers import Container from docker.models.containers import Container
async def get_ipfs_file(ipfs_link: str):
logging.info(f'attempting to get image at {ipfs_link}')
resp = None
for i in range(10):
try:
resp = await asks.get(ipfs_link, timeout=3)
except asks.errors.RequestTimeout:
logging.warning('timeout...')
if resp:
logging.info(f'status_code: {resp.status_code}')
else:
logging.error(f'timeout')
return resp
class IPFSDocker: class IPFSDocker:
def __init__(self, container: Container): def __init__(self, container: Container):
@ -44,7 +26,7 @@ class IPFSDocker:
return out.decode().rstrip() return out.decode().rstrip()
def pin(self, ipfs_hash: str): def pin(self, ipfs_hash: str):
ec, out = self._container.exec_run( ec, _ = self._container.exec_run(
['ipfs', 'pin', 'add', ipfs_hash]) ['ipfs', 'pin', 'add', ipfs_hash])
assert ec == 0 assert ec == 0
@ -90,6 +72,7 @@ def open_ipfs_node(name='skynet-ipfs'):
remove=True remove=True
) )
if sys.platform != 'win32':
uid = os.getuid() uid = os.getuid()
gid = os.getgid() gid = os.getgid()
ec, out = container.exec_run(['chown', f'{uid}:{gid}', '-R', export_target]) ec, out = container.exec_run(['chown', f'{uid}:{gid}', '-R', export_target])
@ -106,4 +89,3 @@ def open_ipfs_node(name='skynet-ipfs'):
break break
yield IPFSDocker(container) yield IPFSDocker(container)

View File

@ -0,0 +1,127 @@
#!/usr/bin/python
import logging
import traceback
from datetime import datetime, timedelta
import trio
from leap.hyperion import HyperionAPI
from . import IPFSHTTP
MAX_TIME = timedelta(seconds=20)
class SkynetPinner:
def __init__(
self,
hyperion: HyperionAPI,
ipfs_http: IPFSHTTP
):
self.hyperion = hyperion
self.ipfs_http = ipfs_http
self._pinned = {}
self._now = datetime.now()
def is_pinned(self, cid: str):
pin_time = self._pinned.get(cid)
return pin_time
def pin_cids(self, cids: list[str]):
for cid in cids:
self._pinned[cid] = self._now
def cleanup_old_cids(self):
cids = list(self._pinned.keys())
for cid in cids:
if (self._now - self._pinned[cid]) > MAX_TIME * 2:
del self._pinned[cid]
async def capture_enqueues(self, after: datetime):
enqueues = await self.hyperion.aget_actions(
account='telos.gpu',
filter='telos.gpu:enqueue',
sort='desc',
after=after.isoformat(),
limit=1000
)
logging.info(f'got {len(enqueues["actions"])} enqueue actions.')
cids = []
for action in enqueues['actions']:
cid = action['act']['data']['binary_data']
if cid and not self.is_pinned(cid):
cids.append(cid)
return cids
async def capture_submits(self, after: datetime):
submits = await self.hyperion.aget_actions(
account='telos.gpu',
filter='telos.gpu:submit',
sort='desc',
after=after.isoformat(),
limit=1000
)
logging.info(f'got {len(submits["actions"])} submits actions.')
cids = []
for action in submits['actions']:
cid = action['act']['data']['ipfs_hash']
if cid and not self.is_pinned(cid):
cids.append(cid)
return cids
async def task_pin(self, cid: str):
logging.info(f'pinning {cid}...')
for _ in range(6):
try:
with trio.move_on_after(5):
resp = await self.ipfs_http.a_pin(cid)
if resp.status_code != 200:
logging.error(f'error pinning {cid}:\n{resp.text}')
else:
logging.info(f'pinned {cid}')
return
except trio.TooSlowError:
logging.error(f'timed out pinning {cid}')
logging.error(f'gave up pinning {cid}')
async def pin_forever(self):
async with trio.open_nursery() as n:
while True:
try:
self._now = datetime.now()
self.cleanup_old_cids()
prev_second = self._now - MAX_TIME
cids = [
*(await self.capture_enqueues(prev_second)),
*(await self.capture_submits(prev_second))
]
self.pin_cids(cids)
for cid in cids:
n.start_soon(self.task_pin, cid)
except OSError as e:
traceback.print_exc()
except KeyboardInterrupt:
break
await trio.sleep(1)