Rework gpu worker logic to work better in parallel with other workers

add-txt2txt-models
Guillermo Rodriguez 2023-05-29 13:43:03 -03:00
parent 2b18fa376b
commit 25c86b5eaf
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
1 changed files with 34 additions and 37 deletions

View File

@ -218,13 +218,12 @@ async def open_dgpu_node(
def begin_work(request_id: int):
logging.info('begin_work')
ec, out = cleos.push_action(
return cleos.push_action(
'telos.gpu',
'workbegin',
[account, request_id],
f'{account}@{permission}'
)
assert ec == 0
def cancel_work(request_id: int, reason: str):
logging.info('cancel_work')
@ -234,7 +233,6 @@ async def open_dgpu_node(
[account, request_id, reason],
f'{account}@{permission}'
)
assert ec == 0
def maybe_withdraw_all():
logging.info('maybe_withdraw_all')
@ -251,7 +249,6 @@ async def open_dgpu_node(
f'{account}@{permission}'
)
logging.info(collect_stdout(out))
assert ec == 0
async def find_my_results():
logging.info('find_my_results')
@ -289,8 +286,8 @@ async def open_dgpu_node(
f'{account}@{permission}'
)
print(collect_stdout(out))
assert ec == 0
if ec != 0:
print(collect_stdout(out))
async def get_input_data(ipfs_hash: str) -> bytes:
if ipfs_hash == '':
@ -317,51 +314,51 @@ async def open_dgpu_node(
rid = req['id']
my_results = [res['id'] for res in (await find_my_results())]
if rid in my_results:
continue
if rid not in my_results:
statuses = await get_status_by_request_id(rid)
statuses = await get_status_by_request_id(rid)
if len(statuses) < config['verification_amount']:
if len(statuses) < config['verification_amount']:
# parse request
body = json.loads(req['body'])
# parse request
body = json.loads(req['body'])
binary = await get_input_data(req['binary_data'])
binary = await get_input_data(req['binary_data'])
hash_str = (
str(await get_user_nonce(req['user']))
+
req['body']
+
req['binary_data']
)
logging.info(f'hashing: {hash_str}')
request_hash = sha256(hash_str.encode('utf-8')).hexdigest()
hash_str = (
str(await get_user_nonce(req['user']))
+
req['body']
+
req['binary_data']
)
logging.info(f'hashing: {hash_str}')
request_hash = sha256(hash_str.encode('utf-8')).hexdigest()
# TODO: validate request
# TODO: validate request
# perform work
logging.info(f'working on {body}')
# perform work
logging.info(f'working on {body}')
ec, _ = begin_work(rid)
if ec != 0:
logging.info(f'probably beign worked on already... skip.')
begin_work(rid)
else:
try:
img_sha, raw_img = gpu_compute_one(
body['method'], body['params'], binext=binary)
try:
img_sha, raw_img = gpu_compute_one(
body['method'], body['params'], binext=binary)
ipfs_hash = publish_on_ipfs(img_sha, raw_img)
ipfs_hash = publish_on_ipfs(img_sha, raw_img)
submit_work(rid, request_hash, img_sha, ipfs_hash)
break
submit_work(rid, request_hash, img_sha, ipfs_hash)
break
except BaseException as e:
cancel_work(rid, str(e))
except BaseException as e:
cancel_work(rid, str(e))
break
else:
logging.info(f'request {rid} already beign worked on, skip...')
continue
await trio.sleep(1)