mirror of https://github.com/skygpu/skynet.git
Drop cleos docker container usage for push action in frontend and dgpu, also make pinner way more agresive
parent
25413a68cc
commit
40ba84c109
|
@ -449,17 +449,14 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
|
||||||
ipfs_node = IPFSHTTP(ipfs_rpc)
|
ipfs_node = IPFSHTTP(ipfs_rpc)
|
||||||
hyperion = HyperionAPI(hyperion_url)
|
hyperion = HyperionAPI(hyperion_url)
|
||||||
|
|
||||||
already_pinned: set[str] = set()
|
|
||||||
|
|
||||||
async def _async_main():
|
async def _async_main():
|
||||||
|
|
||||||
async def capture_enqueues(last_hour: datetime):
|
async def capture_enqueues(after: datetime):
|
||||||
# get all enqueuesin the last hour
|
|
||||||
enqueues = await hyperion.aget_actions(
|
enqueues = await hyperion.aget_actions(
|
||||||
account='telos.gpu',
|
account='telos.gpu',
|
||||||
filter='telos.gpu:enqueue',
|
filter='telos.gpu:enqueue',
|
||||||
sort='desc',
|
sort='desc',
|
||||||
after=last_hour.isoformat(),
|
after=after.isoformat(),
|
||||||
limit=1000
|
limit=1000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -468,18 +465,17 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
|
||||||
cids = []
|
cids = []
|
||||||
for action in enqueues['actions']:
|
for action in enqueues['actions']:
|
||||||
cid = action['act']['data']['binary_data']
|
cid = action['act']['data']['binary_data']
|
||||||
if cid and cid not in already_pinned:
|
if cid:
|
||||||
cids.append(cid)
|
cids.append(cid)
|
||||||
|
|
||||||
return cids
|
return cids
|
||||||
|
|
||||||
async def capture_submits(last_hour: datetime):
|
async def capture_submits(after: datetime):
|
||||||
# get all submits in the last hour
|
|
||||||
submits = await hyperion.aget_actions(
|
submits = await hyperion.aget_actions(
|
||||||
account='telos.gpu',
|
account='telos.gpu',
|
||||||
filter='telos.gpu:submit',
|
filter='telos.gpu:submit',
|
||||||
sort='desc',
|
sort='desc',
|
||||||
after=last_hour.isoformat(),
|
after=after.isoformat(),
|
||||||
limit=1000
|
limit=1000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -488,14 +484,11 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
|
||||||
cids = []
|
cids = []
|
||||||
for action in submits['actions']:
|
for action in submits['actions']:
|
||||||
cid = action['act']['data']['ipfs_hash']
|
cid = action['act']['data']['ipfs_hash']
|
||||||
if cid and cid not in already_pinned:
|
|
||||||
cids.append(cid)
|
cids.append(cid)
|
||||||
|
|
||||||
return cids
|
return cids
|
||||||
|
|
||||||
async def task_pin(cid: str):
|
async def task_pin(cid: str):
|
||||||
already_pinned.add(cid)
|
|
||||||
|
|
||||||
resp = await ipfs_node.a_pin(cid)
|
resp = await ipfs_node.a_pin(cid)
|
||||||
if resp.status_code != 200:
|
if resp.status_code != 200:
|
||||||
logging.error(f'error pinning {cid}:\n{resp.text}')
|
logging.error(f'error pinning {cid}:\n{resp.text}')
|
||||||
|
@ -507,12 +500,12 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
while True:
|
while True:
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
last_hour = now - timedelta(hours=1)
|
prev_second = now - timedelta(seconds=1)
|
||||||
|
|
||||||
# filter for the ones not already pinned
|
# filter for the ones not already pinned
|
||||||
cids = [
|
cids = [
|
||||||
*(await capture_enqueues(last_hour)),
|
*(await capture_enqueues(prev_second)),
|
||||||
*(await capture_submits(last_hour))
|
*(await capture_submits(prev_second))
|
||||||
]
|
]
|
||||||
|
|
||||||
# pin and remember (in parallel)
|
# pin and remember (in parallel)
|
||||||
|
|
|
@ -16,7 +16,7 @@ import asks
|
||||||
import torch
|
import torch
|
||||||
|
|
||||||
from leap.cleos import CLEOS, default_nodeos_image
|
from leap.cleos import CLEOS, default_nodeos_image
|
||||||
from leap.sugar import get_container, collect_stdout
|
from leap.sugar import *
|
||||||
|
|
||||||
from diffusers import (
|
from diffusers import (
|
||||||
StableDiffusionPipeline,
|
StableDiffusionPipeline,
|
||||||
|
@ -67,9 +67,6 @@ async def open_dgpu_node(
|
||||||
logging.info(f'starting dgpu node!')
|
logging.info(f'starting dgpu node!')
|
||||||
logging.info(f'launching toolchain container!')
|
logging.info(f'launching toolchain container!')
|
||||||
|
|
||||||
if key:
|
|
||||||
cleos.setup_wallet(key)
|
|
||||||
|
|
||||||
logging.info(f'loading models...')
|
logging.info(f'loading models...')
|
||||||
|
|
||||||
upscaler = init_upscaler()
|
upscaler = init_upscaler()
|
||||||
|
@ -192,9 +189,9 @@ async def open_dgpu_node(
|
||||||
return (await cleos.aget_table(
|
return (await cleos.aget_table(
|
||||||
'telos.gpu', 'telos.gpu', 'config'))[0]
|
'telos.gpu', 'telos.gpu', 'config'))[0]
|
||||||
|
|
||||||
def get_worker_balance():
|
async def get_worker_balance():
|
||||||
logging.info('get_worker_balance')
|
logging.info('get_worker_balance')
|
||||||
rows = cleos.get_table(
|
rows = await cleos.aget_table(
|
||||||
'telos.gpu', 'telos.gpu', 'users',
|
'telos.gpu', 'telos.gpu', 'users',
|
||||||
index_position=1,
|
index_position=1,
|
||||||
key_type='name',
|
key_type='name',
|
||||||
|
@ -216,27 +213,34 @@ async def open_dgpu_node(
|
||||||
upper_bound=user
|
upper_bound=user
|
||||||
))[0]['nonce']
|
))[0]['nonce']
|
||||||
|
|
||||||
def begin_work(request_id: int):
|
async def begin_work(request_id: int):
|
||||||
logging.info('begin_work')
|
logging.info('begin_work')
|
||||||
return cleos.push_action(
|
return await cleos.a_push_action(
|
||||||
'telos.gpu',
|
'telos.gpu',
|
||||||
'workbegin',
|
'workbegin',
|
||||||
[account, request_id],
|
{
|
||||||
f'{account}@{permission}',
|
'worker': Name(account),
|
||||||
retry=0
|
'request_id': request_id
|
||||||
|
},
|
||||||
|
account, key,
|
||||||
|
permission=permission
|
||||||
)
|
)
|
||||||
|
|
||||||
def cancel_work(request_id: int, reason: str):
|
async def cancel_work(request_id: int, reason: str):
|
||||||
logging.info('cancel_work')
|
logging.info('cancel_work')
|
||||||
ec, out = cleos.push_action(
|
return await cleos.a_push_action(
|
||||||
'telos.gpu',
|
'telos.gpu',
|
||||||
'workcancel',
|
'workcancel',
|
||||||
[account, request_id, reason],
|
{
|
||||||
f'{account}@{permission}',
|
'worker': Name(account),
|
||||||
retry=2
|
'request_id': request_id,
|
||||||
|
'reason': reason
|
||||||
|
},
|
||||||
|
account, key,
|
||||||
|
permission=permission
|
||||||
)
|
)
|
||||||
|
|
||||||
def maybe_withdraw_all():
|
async def maybe_withdraw_all():
|
||||||
logging.info('maybe_withdraw_all')
|
logging.info('maybe_withdraw_all')
|
||||||
balance = get_worker_balance()
|
balance = get_worker_balance()
|
||||||
if not balance:
|
if not balance:
|
||||||
|
@ -244,13 +248,16 @@ async def open_dgpu_node(
|
||||||
|
|
||||||
balance_amount = float(balance.split(' ')[0])
|
balance_amount = float(balance.split(' ')[0])
|
||||||
if balance_amount > 0:
|
if balance_amount > 0:
|
||||||
ec, out = cleos.push_action(
|
await cleos.a_push_action(
|
||||||
'telos.gpu',
|
'telos.gpu',
|
||||||
'withdraw',
|
'withdraw',
|
||||||
[account, balance],
|
{
|
||||||
f'{account}@{permission}'
|
'user': Name(account),
|
||||||
|
'quantity': asset_from_str(balance)
|
||||||
|
},
|
||||||
|
account, key,
|
||||||
|
permission=permission
|
||||||
)
|
)
|
||||||
logging.info(collect_stdout(out))
|
|
||||||
|
|
||||||
async def find_my_results():
|
async def find_my_results():
|
||||||
logging.info('find_my_results')
|
logging.info('find_my_results')
|
||||||
|
@ -274,29 +281,32 @@ async def open_dgpu_node(
|
||||||
|
|
||||||
return ipfs_hash
|
return ipfs_hash
|
||||||
|
|
||||||
def submit_work(
|
async def submit_work(
|
||||||
request_id: int,
|
request_id: int,
|
||||||
request_hash: str,
|
request_hash: str,
|
||||||
result_hash: str,
|
result_hash: str,
|
||||||
ipfs_hash: str
|
ipfs_hash: str
|
||||||
):
|
):
|
||||||
logging.info('submit_work')
|
logging.info('submit_work')
|
||||||
ec, out = cleos.push_action(
|
await cleos.a_push_action(
|
||||||
'telos.gpu',
|
'telos.gpu',
|
||||||
'submit',
|
'submit',
|
||||||
[account, request_id, request_hash, result_hash, ipfs_hash],
|
{
|
||||||
f'{account}@{permission}',
|
'worker': Name(account),
|
||||||
retry=0
|
'request_id': request_id,
|
||||||
|
'request_hash': Checksum256(request_hash),
|
||||||
|
'result_hash': Checksum256(result_hash),
|
||||||
|
'ipfs_hash': ipfs_hash
|
||||||
|
},
|
||||||
|
account, key,
|
||||||
|
permission=permission
|
||||||
)
|
)
|
||||||
|
|
||||||
if ec != 0:
|
|
||||||
print(collect_stdout(out))
|
|
||||||
|
|
||||||
async def get_input_data(ipfs_hash: str) -> bytes:
|
async def get_input_data(ipfs_hash: str) -> bytes:
|
||||||
if ipfs_hash == '':
|
if ipfs_hash == '':
|
||||||
return b''
|
return b''
|
||||||
|
|
||||||
resp = await get_ipfs_file(f'http://test1.us.telos.net:8080/ipfs/{ipfs_hash}/image.png')
|
resp = await get_ipfs_file(f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png')
|
||||||
if resp.status_code != 200:
|
if resp.status_code != 200:
|
||||||
raise DGPUComputeError('Couldn\'t gather input data from ipfs')
|
raise DGPUComputeError('Couldn\'t gather input data from ipfs')
|
||||||
|
|
||||||
|
@ -342,8 +352,8 @@ async def open_dgpu_node(
|
||||||
# perform work
|
# perform work
|
||||||
logging.info(f'working on {body}')
|
logging.info(f'working on {body}')
|
||||||
|
|
||||||
ec, _ = begin_work(rid)
|
resp = await begin_work(rid)
|
||||||
if ec != 0:
|
if 'code' in resp:
|
||||||
logging.info(f'probably beign worked on already... skip.')
|
logging.info(f'probably beign worked on already... skip.')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -353,11 +363,11 @@ async def open_dgpu_node(
|
||||||
|
|
||||||
ipfs_hash = publish_on_ipfs(img_sha, raw_img)
|
ipfs_hash = publish_on_ipfs(img_sha, raw_img)
|
||||||
|
|
||||||
submit_work(rid, request_hash, img_sha, ipfs_hash)
|
await submit_work(rid, request_hash, img_sha, ipfs_hash)
|
||||||
break
|
break
|
||||||
|
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
cancel_work(rid, str(e))
|
await cancel_work(rid, str(e))
|
||||||
break
|
break
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -152,14 +152,24 @@ async def work_request(
|
||||||
request_time = datetime.now().isoformat()
|
request_time = datetime.now().isoformat()
|
||||||
|
|
||||||
reward = '20.0000 GPU'
|
reward = '20.0000 GPU'
|
||||||
ec, out = cleos.push_action(
|
res = await cleos.s_push_action(
|
||||||
'telos.gpu', 'enqueue', [account, body, binary_data, reward], f'{account}@{permission}'
|
'telos.gpu',
|
||||||
|
'enqueue',
|
||||||
|
{
|
||||||
|
'user': Name(account),
|
||||||
|
'request_body': body,
|
||||||
|
'binary_data': binary_data,
|
||||||
|
'reward': asset_from_str(reward)
|
||||||
|
},
|
||||||
|
account, key, permission=permission
|
||||||
)
|
)
|
||||||
out = collect_stdout(out)
|
|
||||||
if ec != 0:
|
if 'code' in res:
|
||||||
await bot.reply_to(message, out)
|
await bot.reply_to(message, json.dumps(res, indent=4))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
out = collect_stdout(res)
|
||||||
|
|
||||||
request_id, nonce = out.split(':')
|
request_id, nonce = out.split(':')
|
||||||
|
|
||||||
request_hash = sha256(
|
request_hash = sha256(
|
||||||
|
|
Loading…
Reference in New Issue