Improve worker ipfs input data parallelizm

pull/26/head
Guillermo Rodriguez 2023-10-08 09:54:31 -03:00
parent aa1d52dba0
commit d749dc4f57
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
1 changed files with 22 additions and 30 deletions

View File

@ -12,7 +12,7 @@ import asks
import trio import trio
import anyio import anyio
from PIL import Image from PIL import Image, UnidentifiedImageError
from leap.cleos import CLEOS from leap.cleos import CLEOS
from leap.sugar import Checksum256, Name, asset_from_str from leap.sugar import Checksum256, Name, asset_from_str
@ -263,43 +263,35 @@ class SkynetGPUConnector:
ipfs_link = f'https://ipfs.{DEFAULT_DOMAIN}/ipfs/{ipfs_hash}' ipfs_link = f'https://ipfs.{DEFAULT_DOMAIN}/ipfs/{ipfs_hash}'
ipfs_link_legacy = ipfs_link + '/image.png' ipfs_link_legacy = ipfs_link + '/image.png'
async def get_and_set_results(link: str):
results[link] = await get_ipfs_file(link)
def get_image_from_resp(resp):
with Image.open(io.BytesIO(resp.raw)):
return resp.raw
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
async def get_and_set_results(link: str):
res = await get_ipfs_file(link, timeout=1)
logging.info(f'got response from {link}')
if not res or res.status_code != 200:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
else:
try:
with Image.open(io.BytesIO(res.raw)):
results[link] = res.raw
n.cancel_scope.cancel()
except UnidentifiedImageError:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
n.start_soon( n.start_soon(
get_and_set_results, ipfs_link) get_and_set_results, ipfs_link)
n.start_soon( n.start_soon(
get_and_set_results, ipfs_link_legacy) get_and_set_results, ipfs_link_legacy)
png_img = None png_img = None
resp = results[ipfs_link_legacy] if ipfs_link_legacy in results:
if not resp or resp.status_code != 200: png_img = results[ipfs_link_legacy]
logging.warning(f'couldn\'t get ipfs binary data at {ipfs_link_legacy}!')
else: png_img = None
try: if ipfs_link in results:
png_img = get_image_from_resp(resp) png_img = results[ipfs_link]
except UnidentifiedImageError:
logging.warning(f'couldn\'t get ipfs binary data at {ipfs_link_legacy}!')
if not png_img:
resp = results[ipfs_link]
if not resp or resp.status_code != 200:
logging.warning(f'couldn\'t get ipfs binary data at {ipfs_link}!')
else:
try:
png_img = get_image_from_resp(resp)
except UnidentifiedImageError:
logging.warning(f'couldn\'t get ipfs binary data at {ipfs_link}!')
...
if not png_img: if not png_img:
raise DGPUComputeError('Couldn\'t gather input data from ipfs') raise DGPUComputeError('Couldn\'t gather input data from ipfs')