From 25c86b5eafcd79ab58a469f1fc7abc7ac3ce9b16 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Mon, 29 May 2023 13:43:03 -0300 Subject: [PATCH] Rework gpu worker logic to work better in parallel with other workers --- skynet/dgpu.py | 71 ++++++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/skynet/dgpu.py b/skynet/dgpu.py index e8c8490..a4303e4 100644 --- a/skynet/dgpu.py +++ b/skynet/dgpu.py @@ -218,13 +218,12 @@ async def open_dgpu_node( def begin_work(request_id: int): logging.info('begin_work') - ec, out = cleos.push_action( + return cleos.push_action( 'telos.gpu', 'workbegin', [account, request_id], f'{account}@{permission}' ) - assert ec == 0 def cancel_work(request_id: int, reason: str): logging.info('cancel_work') @@ -234,7 +233,6 @@ async def open_dgpu_node( [account, request_id, reason], f'{account}@{permission}' ) - assert ec == 0 def maybe_withdraw_all(): logging.info('maybe_withdraw_all') @@ -251,7 +249,6 @@ async def open_dgpu_node( f'{account}@{permission}' ) logging.info(collect_stdout(out)) - assert ec == 0 async def find_my_results(): logging.info('find_my_results') @@ -289,8 +286,8 @@ async def open_dgpu_node( f'{account}@{permission}' ) - print(collect_stdout(out)) - assert ec == 0 + if ec != 0: + print(collect_stdout(out)) async def get_input_data(ipfs_hash: str) -> bytes: if ipfs_hash == '': @@ -317,51 +314,51 @@ async def open_dgpu_node( rid = req['id'] my_results = [res['id'] for res in (await find_my_results())] - if rid in my_results: - continue + if rid not in my_results: + statuses = await get_status_by_request_id(rid) - statuses = await get_status_by_request_id(rid) + if len(statuses) < config['verification_amount']: - if len(statuses) < config['verification_amount']: + # parse request + body = json.loads(req['body']) - # parse request - body = json.loads(req['body']) + binary = await get_input_data(req['binary_data']) - binary = await get_input_data(req['binary_data']) + hash_str = ( + str(await get_user_nonce(req['user'])) + + + req['body'] + + + req['binary_data'] + ) + logging.info(f'hashing: {hash_str}') + request_hash = sha256(hash_str.encode('utf-8')).hexdigest() - hash_str = ( - str(await get_user_nonce(req['user'])) - + - req['body'] - + - req['binary_data'] - ) - logging.info(f'hashing: {hash_str}') - request_hash = sha256(hash_str.encode('utf-8')).hexdigest() + # TODO: validate request - # TODO: validate request + # perform work + logging.info(f'working on {body}') - # perform work - logging.info(f'working on {body}') + ec, _ = begin_work(rid) + if ec != 0: + logging.info(f'probably beign worked on already... skip.') - begin_work(rid) + else: + try: + img_sha, raw_img = gpu_compute_one( + body['method'], body['params'], binext=binary) - try: - img_sha, raw_img = gpu_compute_one( - body['method'], body['params'], binext=binary) + ipfs_hash = publish_on_ipfs(img_sha, raw_img) - ipfs_hash = publish_on_ipfs(img_sha, raw_img) + submit_work(rid, request_hash, img_sha, ipfs_hash) + break - submit_work(rid, request_hash, img_sha, ipfs_hash) - - break - - except BaseException as e: - cancel_work(rid, str(e)) + except BaseException as e: + cancel_work(rid, str(e)) + break else: logging.info(f'request {rid} already beign worked on, skip...') - continue await trio.sleep(1)