skynet/skynet/dgpu/network.py

229 lines
6.3 KiB
Python
Raw Normal View History

2023-06-04 20:51:43 +00:00
#!/usr/bin/python
from functools import partial
2023-06-04 20:51:43 +00:00
import io
import json
import time
import logging
import asks
from PIL import Image
from contextlib import ExitStack
from contextlib import asynccontextmanager as acm
from leap.cleos import CLEOS
from leap.sugar import Checksum256, Name, asset_from_str
from skynet.constants import DEFAULT_DOMAIN
2023-06-04 20:51:43 +00:00
from skynet.dgpu.errors import DGPUComputeError
from skynet.ipfs import get_ipfs_file
from skynet.ipfs.docker import open_ipfs_node
async def failable(fn: partial, ret_fail=None):
try:
return await fn()
except (
asks.errors.RequestTimeout,
json.JSONDecodeError
):
return ret_fail
2023-06-04 20:51:43 +00:00
class SkynetGPUConnector:
def __init__(self, config: dict):
self.account = Name(config['account'])
self.permission = config['permission']
self.key = config['key']
self.node_url = config['node_url']
self.hyperion_url = config['hyperion_url']
self.ipfs_url = config['ipfs_url']
self.cleos = CLEOS(
None, None, self.node_url, remote=self.node_url)
self._exit_stack = ExitStack()
def connect(self):
self.ipfs_node = self._exit_stack.enter_context(
open_ipfs_node())
def disconnect(self):
self._exit_stack.close()
@acm
async def open(self):
self.connect()
yield self
self.disconnect()
2023-06-04 20:51:43 +00:00
# blockchain helpers
async def get_work_requests_last_hour(self):
logging.info('get_work_requests_last_hour')
return await failable(
partial(
self.cleos.aget_table,
2023-06-04 20:51:43 +00:00
'telos.gpu', 'telos.gpu', 'queue',
index_position=2,
key_type='i64',
lower_bound=int(time.time()) - 3600
), ret_fail=[])
2023-06-04 20:51:43 +00:00
async def get_status_by_request_id(self, request_id: int):
logging.info('get_status_by_request_id')
return await failable(
partial(
self.cleos.aget_table,
'telos.gpu', request_id, 'status'), ret_fail=[])
2023-06-04 20:51:43 +00:00
async def get_global_config(self):
logging.info('get_global_config')
rows = await failable(
partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'config'))
if rows:
return rows[0]
else:
return None
2023-06-04 20:51:43 +00:00
async def get_worker_balance(self):
logging.info('get_worker_balance')
rows = await failable(
partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'users',
index_position=1,
key_type='name',
lower_bound=self.account,
upper_bound=self.account
))
if rows:
2023-06-04 20:51:43 +00:00
return rows[0]['balance']
else:
return None
async def begin_work(self, request_id: int):
logging.info('begin_work')
return await failable(
partial(
self.cleos.a_push_action,
'telos.gpu',
'workbegin',
{
'worker': self.account,
'request_id': request_id,
'max_workers': 2
},
self.account, self.key,
permission=self.permission
)
2023-06-04 20:51:43 +00:00
)
async def cancel_work(self, request_id: int, reason: str):
logging.info('cancel_work')
return await failable(
partial(
self.cleos.a_push_action,
'telos.gpu',
'workcancel',
{
'worker': self.account,
'request_id': request_id,
'reason': reason
},
self.account, self.key,
permission=self.permission
)
2023-06-04 20:51:43 +00:00
)
async def maybe_withdraw_all(self):
logging.info('maybe_withdraw_all')
balance = await self.get_worker_balance()
if not balance:
return
balance_amount = float(balance.split(' ')[0])
if balance_amount > 0:
await failable(
partial(
self.cleos.a_push_action,
'telos.gpu',
'withdraw',
{
'user': self.account,
'quantity': asset_from_str(balance)
},
self.account, self.key,
permission=self.permission
)
2023-06-04 20:51:43 +00:00
)
async def find_my_results(self):
logging.info('find_my_results')
return await failable(
partial(
self.cleos.aget_table,
'telos.gpu', 'telos.gpu', 'results',
index_position=4,
key_type='name',
lower_bound=self.account,
upper_bound=self.account
)
2023-06-04 20:51:43 +00:00
)
async def submit_work(
self,
request_id: int,
request_hash: str,
result_hash: str,
ipfs_hash: str
):
logging.info('submit_work')
return await failable(
partial(
self.cleos.a_push_action,
'telos.gpu',
'submit',
{
'worker': self.account,
'request_id': request_id,
'request_hash': Checksum256(request_hash),
'result_hash': Checksum256(result_hash),
'ipfs_hash': ipfs_hash
},
self.account, self.key,
permission=self.permission
)
2023-06-04 20:51:43 +00:00
)
# IPFS helpers
def publish_on_ipfs(self, raw_img: bytes):
logging.info('publish_on_ipfs')
img = Image.open(io.BytesIO(raw_img))
img.save(f'ipfs-docker-staging/image.png')
ipfs_hash = self.ipfs_node.add('image.png')
self.ipfs_node.pin(ipfs_hash)
return ipfs_hash
async def get_input_data(self, ipfs_hash: str) -> bytes:
if ipfs_hash == '':
return b''
resp = await get_ipfs_file(f'https://ipfs.{DEFAULT_DOMAIN}/ipfs/{ipfs_hash}/image.png')
if not resp:
2023-06-04 20:51:43 +00:00
raise DGPUComputeError('Couldn\'t gather input data from ipfs')
return resp.raw