diff --git a/skynet/cli.py b/skynet/cli.py index 63b3d92..767b2da 100644 --- a/skynet/cli.py +++ b/skynet/cli.py @@ -1,19 +1,14 @@ #!/usr/bin/python -import os -import time import json import logging import random -from typing import Optional -from datetime import datetime, timedelta from functools import partial import trio import asks import click -import docker import asyncio import requests @@ -21,8 +16,10 @@ from leap.cleos import CLEOS from leap.sugar import collect_stdout from leap.hyperion import HyperionAPI +from skynet.ipfs import IPFSHTTP + + from .db import open_new_database -from .ipfs import open_ipfs_node from .config import * from .nodeos import open_cleos, open_nodeos from .constants import * @@ -352,9 +349,9 @@ def dgpu( @click.option( '--key', '-k', default=None) @click.option( - '--hyperion-url', '-y', default='https://skynet.ancap.tech') + '--hyperion-url', '-y', default=f'https://{DEFAULT_DOMAIN}') @click.option( - '--node-url', '-n', default='https://skynet.ancap.tech') + '--node-url', '-n', default=f'https://{DEFAULT_DOMAIN}') @click.option( '--ipfs-url', '-i', default=DEFAULT_IPFS_REMOTE) @click.option( @@ -404,28 +401,12 @@ def telegram( asyncio.run(_async_main()) -class IPFSHTTP: - - def __init__(self, endpoint: str): - self.endpoint = endpoint - - def pin(self, cid: str): - return requests.post( - f'{self.endpoint}/api/v0/pin/add', - params={'arg': cid} - ) - - async def a_pin(self, cid: str): - return await asks.post( - f'{self.endpoint}/api/v0/pin/add', - params={'arg': cid} - ) - - @run.command() @click.option('--loglevel', '-l', default='INFO', help='logging level') @click.option('--name', '-n', default='skynet-ipfs', help='container name') def ipfs(loglevel, name): + from skynet.ipfs.docker import open_ipfs_node + logging.basicConfig(level=loglevel) with open_ipfs_node(name=name): ... @@ -437,90 +418,12 @@ def ipfs(loglevel, name): @click.option( '--hyperion-url', '-y', default='http://127.0.0.1:42001') def pinner(loglevel, ipfs_rpc, hyperion_url): + from .ipfs.pinner import SkynetPinner + logging.basicConfig(level=loglevel) ipfs_node = IPFSHTTP(ipfs_rpc) hyperion = HyperionAPI(hyperion_url) - pinned = set() - async def _async_main(): + pinner = SkynetPinner(hyperion, ipfs_node) - async def capture_enqueues(after: datetime): - enqueues = await hyperion.aget_actions( - account='telos.gpu', - filter='telos.gpu:enqueue', - sort='desc', - after=after.isoformat(), - limit=1000 - ) - - logging.info(f'got {len(enqueues["actions"])} enqueue actions.') - - cids = [] - for action in enqueues['actions']: - cid = action['act']['data']['binary_data'] - if cid and cid not in pinned: - pinned.add(cid) - cids.append(cid) - - return cids - - async def capture_submits(after: datetime): - submits = await hyperion.aget_actions( - account='telos.gpu', - filter='telos.gpu:submit', - sort='desc', - after=after.isoformat(), - limit=1000 - ) - - logging.info(f'got {len(submits["actions"])} submits actions.') - - cids = [] - for action in submits['actions']: - cid = action['act']['data']['ipfs_hash'] - if cid and cid not in pinned: - pinned.add(cid) - cids.append(cid) - - return cids - - async def task_pin(cid: str): - logging.info(f'pinning {cid}...') - for _ in range(6): - try: - with trio.move_on_after(5): - resp = await ipfs_node.a_pin(cid) - if resp.status_code != 200: - logging.error(f'error pinning {cid}:\n{resp.text}') - - else: - logging.info(f'pinned {cid}') - return - - except trio.TooSlowError: - logging.error(f'timed out pinning {cid}') - - logging.error(f'gave up pinning {cid}') - - try: - async with trio.open_nursery() as n: - while True: - now = datetime.now() - prev_second = now - timedelta(seconds=10) - - # filter for the ones not already pinned - cids = [ - *(await capture_enqueues(prev_second)), - *(await capture_submits(prev_second)) - ] - - # pin and remember (in parallel) - for cid in cids: - n.start_soon(task_pin, cid) - - await trio.sleep(1) - - except KeyboardInterrupt: - ... - - trio.run(_async_main) + trio.run(pinner.pin_forever) diff --git a/skynet/constants.py b/skynet/constants.py index a1e13f4..d3e319d 100644 --- a/skynet/constants.py +++ b/skynet/constants.py @@ -144,4 +144,6 @@ CONFIG_ATTRS = [ 'upscaler' ] +DEFAULT_DOMAIN = 'skygpu.net' + DEFAULT_IPFS_REMOTE = '/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv' diff --git a/skynet/dgpu/network.py b/skynet/dgpu/network.py index 55b8228..e7e0396 100644 --- a/skynet/dgpu/network.py +++ b/skynet/dgpu/network.py @@ -1,5 +1,6 @@ #!/usr/bin/python +from functools import partial import io import json import time @@ -15,7 +16,19 @@ from leap.cleos import CLEOS from leap.sugar import Checksum256, Name, asset_from_str from skynet.dgpu.errors import DGPUComputeError -from skynet.ipfs import get_ipfs_file, open_ipfs_node +from skynet.ipfs import get_ipfs_file +from skynet.ipfs.docker import open_ipfs_node + + +async def failable(fn: partial, ret_fail=None): + try: + return await fn() + + except ( + asks.errors.RequestTimeout, + json.JSONDecodeError + ): + return ret_fail class SkynetGPUConnector: @@ -46,74 +59,88 @@ class SkynetGPUConnector: yield self self.disconnect() + # blockchain helpers async def get_work_requests_last_hour(self): logging.info('get_work_requests_last_hour') - try: - return await self.cleos.aget_table( + return await failable( + partial( + self.cleos.aget_table, 'telos.gpu', 'telos.gpu', 'queue', index_position=2, key_type='i64', lower_bound=int(time.time()) - 3600 - ) - - except ( - asks.errors.RequestTimeout, - json.JSONDecodeError - ): - return [] + ), ret_fail=[]) async def get_status_by_request_id(self, request_id: int): logging.info('get_status_by_request_id') - return await self.cleos.aget_table( - 'telos.gpu', request_id, 'status') + return await failable( + partial( + self.cleos.aget_table, + 'telos.gpu', request_id, 'status'), ret_fail=[]) async def get_global_config(self): logging.info('get_global_config') - return (await self.cleos.aget_table( - 'telos.gpu', 'telos.gpu', 'config'))[0] + rows = await failable( + partial( + self.cleos.aget_table, + 'telos.gpu', 'telos.gpu', 'config')) + + if rows: + return rows[0] + else: + return None async def get_worker_balance(self): logging.info('get_worker_balance') - rows = await self.cleos.aget_table( - 'telos.gpu', 'telos.gpu', 'users', - index_position=1, - key_type='name', - lower_bound=self.account, - upper_bound=self.account - ) - if len(rows) == 1: + rows = await failable( + partial( + self.cleos.aget_table, + 'telos.gpu', 'telos.gpu', 'users', + index_position=1, + key_type='name', + lower_bound=self.account, + upper_bound=self.account + )) + + if rows: return rows[0]['balance'] else: return None async def begin_work(self, request_id: int): logging.info('begin_work') - return await self.cleos.a_push_action( - 'telos.gpu', - 'workbegin', - { - 'worker': self.account, - 'request_id': request_id, - 'max_workers': 2 - }, - self.account, self.key, - permission=self.permission + return await failable( + partial( + self.cleos.a_push_action, + 'telos.gpu', + 'workbegin', + { + 'worker': self.account, + 'request_id': request_id, + 'max_workers': 2 + }, + self.account, self.key, + permission=self.permission + ) ) async def cancel_work(self, request_id: int, reason: str): logging.info('cancel_work') - return await self.cleos.a_push_action( - 'telos.gpu', - 'workcancel', - { - 'worker': self.account, - 'request_id': request_id, - 'reason': reason - }, - self.account, self.key, - permission=self.permission + return await failable( + partial( + self.cleos.a_push_action, + 'telos.gpu', + 'workcancel', + { + 'worker': self.account, + 'request_id': request_id, + 'reason': reason + }, + self.account, self.key, + permission=self.permission + ) ) async def maybe_withdraw_all(self): @@ -124,25 +151,31 @@ class SkynetGPUConnector: balance_amount = float(balance.split(' ')[0]) if balance_amount > 0: - await self.cleos.a_push_action( - 'telos.gpu', - 'withdraw', - { - 'user': self.account, - 'quantity': asset_from_str(balance) - }, - self.account, self.key, - permission=self.permission + await failable( + partial( + self.cleos.a_push_action, + 'telos.gpu', + 'withdraw', + { + 'user': self.account, + 'quantity': asset_from_str(balance) + }, + self.account, self.key, + permission=self.permission + ) ) async def find_my_results(self): logging.info('find_my_results') - return await self.cleos.aget_table( - 'telos.gpu', 'telos.gpu', 'results', - index_position=4, - key_type='name', - lower_bound=self.account, - upper_bound=self.account + return await failable( + partial( + self.cleos.aget_table, + 'telos.gpu', 'telos.gpu', 'results', + index_position=4, + key_type='name', + lower_bound=self.account, + upper_bound=self.account + ) ) async def submit_work( @@ -153,18 +186,21 @@ class SkynetGPUConnector: ipfs_hash: str ): logging.info('submit_work') - await self.cleos.a_push_action( - 'telos.gpu', - 'submit', - { - 'worker': self.account, - 'request_id': request_id, - 'request_hash': Checksum256(request_hash), - 'result_hash': Checksum256(result_hash), - 'ipfs_hash': ipfs_hash - }, - self.account, self.key, - permission=self.permission + return await failable( + partial( + self.cleos.a_push_action, + 'telos.gpu', + 'submit', + { + 'worker': self.account, + 'request_id': request_id, + 'request_hash': Checksum256(request_hash), + 'result_hash': Checksum256(result_hash), + 'ipfs_hash': ipfs_hash + }, + self.account, self.key, + permission=self.permission + ) ) # IPFS helpers diff --git a/skynet/frontend/telegram/__init__.py b/skynet/frontend/telegram/__init__.py index 07a39c5..3fcbeb9 100644 --- a/skynet/frontend/telegram/__init__.py +++ b/skynet/frontend/telegram/__init__.py @@ -13,14 +13,13 @@ from contextlib import asynccontextmanager as acm from leap.cleos import CLEOS from leap.sugar import Name, asset_from_str, collect_stdout from leap.hyperion import HyperionAPI -from telebot.asyncio_helper import ApiTelegramException from telebot.types import InputMediaPhoto -from telebot.types import CallbackQuery from telebot.async_telebot import AsyncTeleBot from skynet.db import open_new_database, open_database_connection -from skynet.ipfs import open_ipfs_node, get_ipfs_file +from skynet.ipfs import get_ipfs_file +from skynet.ipfs.docker import open_ipfs_node from skynet.constants import * from . import * @@ -164,7 +163,7 @@ class SkynetTelegramFrontend: enqueue_tx_id = res['transaction_id'] enqueue_tx_link = hlink( 'Your request on Skynet Explorer', - f'https://skynet.ancap.tech/v2/explore/transaction/{enqueue_tx_id}' + f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{enqueue_tx_id}' ) await self.append_status_message( @@ -221,7 +220,7 @@ class SkynetTelegramFrontend: tx_link = hlink( 'Your result on Skynet Explorer', - f'https://skynet.ancap.tech/v2/explore/transaction/{tx_hash}' + f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{tx_hash}' ) await self.append_status_message( @@ -233,11 +232,11 @@ class SkynetTelegramFrontend: ) # attempt to get the image and send it - ipfs_link = f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png' + ipfs_link = f'https://ipfs.{DEFAULT_DOMAIN}/ipfs/{ipfs_hash}/image.png' resp = await get_ipfs_file(ipfs_link) caption = generate_reply_caption( - user, params, ipfs_hash, tx_hash, worker, reward) + user, params, tx_hash, worker, reward) if not resp or resp.status_code != 200: logging.error(f'couldn\'t get ipfs hosted image at {ipfs_link}!') diff --git a/skynet/frontend/telegram/utils.py b/skynet/frontend/telegram/utils.py index cebb6e5..ad08bba 100644 --- a/skynet/frontend/telegram/utils.py +++ b/skynet/frontend/telegram/utils.py @@ -65,32 +65,25 @@ def prepare_metainfo_caption(tguser, worker: str, reward: str, meta: dict) -> st def generate_reply_caption( tguser, # telegram user params: dict, - ipfs_hash: str, tx_hash: str, worker: str, reward: str ): - ipfs_link = hlink( - 'Get your image on IPFS', - f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png' - ) explorer_link = hlink( 'SKYNET Transaction Explorer', - f'https://skynet.ancap.tech/v2/explore/transaction/{tx_hash}' + f'https://explorer.{DEFAULT_DOMAIN}/v2/explore/transaction/{tx_hash}' ) meta_info = prepare_metainfo_caption(tguser, worker, reward, params) final_msg = '\n'.join([ 'Worker finished your task!', - ipfs_link, explorer_link, f'PARAMETER INFO:\n{meta_info}' ]) final_msg = '\n'.join([ - f'{ipfs_link}', - f'{explorer_link}', + f'{explorer_link}', f'{meta_info}' ]) diff --git a/skynet/ipfs/__init__.py b/skynet/ipfs/__init__.py new file mode 100644 index 0000000..bb4e0fe --- /dev/null +++ b/skynet/ipfs/__init__.py @@ -0,0 +1,45 @@ +#!/usr/bin/python + +import logging + +import asks +import requests + + +class IPFSHTTP: + + def __init__(self, endpoint: str): + self.endpoint = endpoint + + def pin(self, cid: str): + return requests.post( + f'{self.endpoint}/api/v0/pin/add', + params={'arg': cid} + ) + + async def a_pin(self, cid: str): + return await asks.post( + f'{self.endpoint}/api/v0/pin/add', + params={'arg': cid} + ) + + +async def get_ipfs_file(ipfs_link: str): + logging.info(f'attempting to get image at {ipfs_link}') + resp = None + for i in range(10): + try: + resp = await asks.get(ipfs_link, timeout=3) + + except asks.errors.RequestTimeout: + logging.warning('timeout...') + + except asks.errors.BadHttpResponse as e: + logging.error(f'ifps gateway exception: \n{e}') + + if resp: + logging.info(f'status_code: {resp.status_code}') + else: + logging.error(f'timeout') + + return resp diff --git a/skynet/ipfs.py b/skynet/ipfs/docker.py similarity index 70% rename from skynet/ipfs.py rename to skynet/ipfs/docker.py index 1c2bd87..8158d2e 100644 --- a/skynet/ipfs.py +++ b/skynet/ipfs/docker.py @@ -1,36 +1,18 @@ #!/usr/bin/python import os +import sys import logging from pathlib import Path from contextlib import contextmanager as cm -import asks import docker -from asks.errors import RequestTimeout from docker.types import Mount from docker.models.containers import Container -async def get_ipfs_file(ipfs_link: str): - logging.info(f'attempting to get image at {ipfs_link}') - resp = None - for i in range(10): - try: - resp = await asks.get(ipfs_link, timeout=3) - - except asks.errors.RequestTimeout: - logging.warning('timeout...') - - if resp: - logging.info(f'status_code: {resp.status_code}') - else: - logging.error(f'timeout') - return resp - - class IPFSDocker: def __init__(self, container: Container): @@ -44,7 +26,7 @@ class IPFSDocker: return out.decode().rstrip() def pin(self, ipfs_hash: str): - ec, out = self._container.exec_run( + ec, _ = self._container.exec_run( ['ipfs', 'pin', 'add', ipfs_hash]) assert ec == 0 @@ -90,14 +72,15 @@ def open_ipfs_node(name='skynet-ipfs'): remove=True ) - uid = os.getuid() - gid = os.getgid() - ec, out = container.exec_run(['chown', f'{uid}:{gid}', '-R', export_target]) - logging.info(out) - assert ec == 0 - ec, out = container.exec_run(['chown', f'{uid}:{gid}', '-R', data_target]) - logging.info(out) - assert ec == 0 + if sys.platform != 'win32': + uid = os.getuid() + gid = os.getgid() + ec, out = container.exec_run(['chown', f'{uid}:{gid}', '-R', export_target]) + logging.info(out) + assert ec == 0 + ec, out = container.exec_run(['chown', f'{uid}:{gid}', '-R', data_target]) + logging.info(out) + assert ec == 0 for log in container.logs(stream=True): log = log.decode().rstrip() @@ -106,4 +89,3 @@ def open_ipfs_node(name='skynet-ipfs'): break yield IPFSDocker(container) - diff --git a/skynet/ipfs/pinner.py b/skynet/ipfs/pinner.py new file mode 100644 index 0000000..ab443bf --- /dev/null +++ b/skynet/ipfs/pinner.py @@ -0,0 +1,127 @@ +#!/usr/bin/python + +import logging +import traceback + +from datetime import datetime, timedelta + +import trio + +from leap.hyperion import HyperionAPI + +from . import IPFSHTTP + + +MAX_TIME = timedelta(seconds=20) + + +class SkynetPinner: + + def __init__( + self, + hyperion: HyperionAPI, + ipfs_http: IPFSHTTP + ): + self.hyperion = hyperion + self.ipfs_http = ipfs_http + + self._pinned = {} + self._now = datetime.now() + + def is_pinned(self, cid: str): + pin_time = self._pinned.get(cid) + return pin_time + + def pin_cids(self, cids: list[str]): + for cid in cids: + self._pinned[cid] = self._now + + def cleanup_old_cids(self): + cids = list(self._pinned.keys()) + for cid in cids: + if (self._now - self._pinned[cid]) > MAX_TIME * 2: + del self._pinned[cid] + + async def capture_enqueues(self, after: datetime): + enqueues = await self.hyperion.aget_actions( + account='telos.gpu', + filter='telos.gpu:enqueue', + sort='desc', + after=after.isoformat(), + limit=1000 + ) + + logging.info(f'got {len(enqueues["actions"])} enqueue actions.') + + cids = [] + for action in enqueues['actions']: + cid = action['act']['data']['binary_data'] + if cid and not self.is_pinned(cid): + cids.append(cid) + + return cids + + async def capture_submits(self, after: datetime): + submits = await self.hyperion.aget_actions( + account='telos.gpu', + filter='telos.gpu:submit', + sort='desc', + after=after.isoformat(), + limit=1000 + ) + + logging.info(f'got {len(submits["actions"])} submits actions.') + + cids = [] + for action in submits['actions']: + cid = action['act']['data']['ipfs_hash'] + if cid and not self.is_pinned(cid): + cids.append(cid) + + return cids + + async def task_pin(self, cid: str): + logging.info(f'pinning {cid}...') + for _ in range(6): + try: + with trio.move_on_after(5): + resp = await self.ipfs_http.a_pin(cid) + if resp.status_code != 200: + logging.error(f'error pinning {cid}:\n{resp.text}') + + else: + logging.info(f'pinned {cid}') + return + + except trio.TooSlowError: + logging.error(f'timed out pinning {cid}') + + logging.error(f'gave up pinning {cid}') + + async def pin_forever(self): + async with trio.open_nursery() as n: + while True: + try: + self._now = datetime.now() + self.cleanup_old_cids() + + prev_second = self._now - MAX_TIME + + cids = [ + *(await self.capture_enqueues(prev_second)), + *(await self.capture_submits(prev_second)) + ] + + self.pin_cids(cids) + + for cid in cids: + n.start_soon(self.task_pin, cid) + + except OSError as e: + traceback.print_exc() + + except KeyboardInterrupt: + break + + await trio.sleep(1) +