diff --git a/skynet/cli.py b/skynet/cli.py index b8cfab8..06bc1bd 100644 --- a/skynet/cli.py +++ b/skynet/cli.py @@ -449,17 +449,14 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): ipfs_node = IPFSHTTP(ipfs_rpc) hyperion = HyperionAPI(hyperion_url) - already_pinned: set[str] = set() - async def _async_main(): - async def capture_enqueues(last_hour: datetime): - # get all enqueuesin the last hour + async def capture_enqueues(after: datetime): enqueues = await hyperion.aget_actions( account='telos.gpu', filter='telos.gpu:enqueue', sort='desc', - after=last_hour.isoformat(), + after=after.isoformat(), limit=1000 ) @@ -468,18 +465,17 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): cids = [] for action in enqueues['actions']: cid = action['act']['data']['binary_data'] - if cid and cid not in already_pinned: + if cid: cids.append(cid) return cids - async def capture_submits(last_hour: datetime): - # get all submits in the last hour + async def capture_submits(after: datetime): submits = await hyperion.aget_actions( account='telos.gpu', filter='telos.gpu:submit', sort='desc', - after=last_hour.isoformat(), + after=after.isoformat(), limit=1000 ) @@ -488,14 +484,11 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): cids = [] for action in submits['actions']: cid = action['act']['data']['ipfs_hash'] - if cid and cid not in already_pinned: - cids.append(cid) + cids.append(cid) return cids async def task_pin(cid: str): - already_pinned.add(cid) - resp = await ipfs_node.a_pin(cid) if resp.status_code != 200: logging.error(f'error pinning {cid}:\n{resp.text}') @@ -507,12 +500,12 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): async with trio.open_nursery() as n: while True: now = datetime.now() - last_hour = now - timedelta(hours=1) + prev_second = now - timedelta(seconds=1) # filter for the ones not already pinned cids = [ - *(await capture_enqueues(last_hour)), - *(await capture_submits(last_hour)) + *(await capture_enqueues(prev_second)), + *(await capture_submits(prev_second)) ] # pin and remember (in parallel) diff --git a/skynet/dgpu.py b/skynet/dgpu.py index 8f5bc12..67ef251 100644 --- a/skynet/dgpu.py +++ b/skynet/dgpu.py @@ -16,7 +16,7 @@ import asks import torch from leap.cleos import CLEOS, default_nodeos_image -from leap.sugar import get_container, collect_stdout +from leap.sugar import * from diffusers import ( StableDiffusionPipeline, @@ -67,9 +67,6 @@ async def open_dgpu_node( logging.info(f'starting dgpu node!') logging.info(f'launching toolchain container!') - if key: - cleos.setup_wallet(key) - logging.info(f'loading models...') upscaler = init_upscaler() @@ -192,9 +189,9 @@ async def open_dgpu_node( return (await cleos.aget_table( 'telos.gpu', 'telos.gpu', 'config'))[0] - def get_worker_balance(): + async def get_worker_balance(): logging.info('get_worker_balance') - rows = cleos.get_table( + rows = await cleos.aget_table( 'telos.gpu', 'telos.gpu', 'users', index_position=1, key_type='name', @@ -216,27 +213,34 @@ async def open_dgpu_node( upper_bound=user ))[0]['nonce'] - def begin_work(request_id: int): + async def begin_work(request_id: int): logging.info('begin_work') - return cleos.push_action( + return await cleos.a_push_action( 'telos.gpu', 'workbegin', - [account, request_id], - f'{account}@{permission}', - retry=0 + { + 'worker': Name(account), + 'request_id': request_id + }, + account, key, + permission=permission ) - def cancel_work(request_id: int, reason: str): + async def cancel_work(request_id: int, reason: str): logging.info('cancel_work') - ec, out = cleos.push_action( + return await cleos.a_push_action( 'telos.gpu', 'workcancel', - [account, request_id, reason], - f'{account}@{permission}', - retry=2 + { + 'worker': Name(account), + 'request_id': request_id, + 'reason': reason + }, + account, key, + permission=permission ) - def maybe_withdraw_all(): + async def maybe_withdraw_all(): logging.info('maybe_withdraw_all') balance = get_worker_balance() if not balance: @@ -244,13 +248,16 @@ async def open_dgpu_node( balance_amount = float(balance.split(' ')[0]) if balance_amount > 0: - ec, out = cleos.push_action( + await cleos.a_push_action( 'telos.gpu', 'withdraw', - [account, balance], - f'{account}@{permission}' + { + 'user': Name(account), + 'quantity': asset_from_str(balance) + }, + account, key, + permission=permission ) - logging.info(collect_stdout(out)) async def find_my_results(): logging.info('find_my_results') @@ -274,29 +281,32 @@ async def open_dgpu_node( return ipfs_hash - def submit_work( + async def submit_work( request_id: int, request_hash: str, result_hash: str, ipfs_hash: str ): logging.info('submit_work') - ec, out = cleos.push_action( + await cleos.a_push_action( 'telos.gpu', 'submit', - [account, request_id, request_hash, result_hash, ipfs_hash], - f'{account}@{permission}', - retry=0 + { + 'worker': Name(account), + 'request_id': request_id, + 'request_hash': Checksum256(request_hash), + 'result_hash': Checksum256(result_hash), + 'ipfs_hash': ipfs_hash + }, + account, key, + permission=permission ) - if ec != 0: - print(collect_stdout(out)) - async def get_input_data(ipfs_hash: str) -> bytes: if ipfs_hash == '': return b'' - resp = await get_ipfs_file(f'http://test1.us.telos.net:8080/ipfs/{ipfs_hash}/image.png') + resp = await get_ipfs_file(f'https://ipfs.ancap.tech/ipfs/{ipfs_hash}/image.png') if resp.status_code != 200: raise DGPUComputeError('Couldn\'t gather input data from ipfs') @@ -342,8 +352,8 @@ async def open_dgpu_node( # perform work logging.info(f'working on {body}') - ec, _ = begin_work(rid) - if ec != 0: + resp = await begin_work(rid) + if 'code' in resp: logging.info(f'probably beign worked on already... skip.') else: @@ -353,11 +363,11 @@ async def open_dgpu_node( ipfs_hash = publish_on_ipfs(img_sha, raw_img) - submit_work(rid, request_hash, img_sha, ipfs_hash) + await submit_work(rid, request_hash, img_sha, ipfs_hash) break except BaseException as e: - cancel_work(rid, str(e)) + await cancel_work(rid, str(e)) break else: diff --git a/skynet/frontend/telegram.py b/skynet/frontend/telegram.py index a556124..56d1c92 100644 --- a/skynet/frontend/telegram.py +++ b/skynet/frontend/telegram.py @@ -152,14 +152,24 @@ async def work_request( request_time = datetime.now().isoformat() reward = '20.0000 GPU' - ec, out = cleos.push_action( - 'telos.gpu', 'enqueue', [account, body, binary_data, reward], f'{account}@{permission}' + res = await cleos.s_push_action( + 'telos.gpu', + 'enqueue', + { + 'user': Name(account), + 'request_body': body, + 'binary_data': binary_data, + 'reward': asset_from_str(reward) + }, + account, key, permission=permission ) - out = collect_stdout(out) - if ec != 0: - await bot.reply_to(message, out) + + if 'code' in res: + await bot.reply_to(message, json.dumps(res, indent=4)) return + out = collect_stdout(res) + request_id, nonce = out.split(':') request_hash = sha256(