Create fully async ipfs client, and stop using docker on worker

pull/23/head
Guillermo Rodriguez 2023-09-24 13:12:49 -03:00
parent 7f50952088
commit 01cbc736a0
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
5 changed files with 66 additions and 41 deletions

View File

@ -16,7 +16,7 @@ from leap.cleos import CLEOS
from leap.sugar import collect_stdout, Name, asset_from_str from leap.sugar import collect_stdout, Name, asset_from_str
from leap.hyperion import HyperionAPI from leap.hyperion import HyperionAPI
from skynet.ipfs import IPFSHTTP from skynet.ipfs import AsyncIPFSHTTP
from .db import open_new_database from .db import open_new_database
@ -516,7 +516,7 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
from .ipfs.pinner import SkynetPinner from .ipfs.pinner import SkynetPinner
logging.basicConfig(level=loglevel) logging.basicConfig(level=loglevel)
ipfs_node = IPFSHTTP(ipfs_rpc) ipfs_node = AsyncIPFSHTTP(ipfs_rpc)
hyperion = HyperionAPI(hyperion_url) hyperion = HyperionAPI(hyperion_url)
pinner = SkynetPinner(hyperion, ipfs_node) pinner = SkynetPinner(hyperion, ipfs_node)

View File

@ -73,7 +73,7 @@ class SkynetDGPUDaemon:
img_sha, img_raw = self.mm.compute_one( img_sha, img_raw = self.mm.compute_one(
body['method'], body['params'], binary=binary) body['method'], body['params'], binary=binary)
ipfs_hash = self.conn.publish_on_ipfs( img_raw) ipfs_hash = await self.conn.publish_on_ipfs(img_raw)
await self.conn.submit_work(rid, request_hash, img_sha, ipfs_hash) await self.conn.submit_work(rid, request_hash, img_sha, ipfs_hash)
break break

View File

@ -3,13 +3,13 @@
from functools import partial from functools import partial
import io import io
import json import json
from pathlib import Path
import time import time
import logging import logging
import asks import asks
from PIL import Image from PIL import Image
from contextlib import ExitStack
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from leap.cleos import CLEOS from leap.cleos import CLEOS
@ -17,8 +17,7 @@ from leap.sugar import Checksum256, Name, asset_from_str
from skynet.constants import DEFAULT_DOMAIN from skynet.constants import DEFAULT_DOMAIN
from skynet.dgpu.errors import DGPUComputeError from skynet.dgpu.errors import DGPUComputeError
from skynet.ipfs import get_ipfs_file from skynet.ipfs import AsyncIPFSHTTP, get_ipfs_file
from skynet.ipfs.docker import open_ipfs_node
async def failable(fn: partial, ret_fail=None): async def failable(fn: partial, ret_fail=None):
@ -38,28 +37,17 @@ class SkynetGPUConnector:
self.account = Name(config['account']) self.account = Name(config['account'])
self.permission = config['permission'] self.permission = config['permission']
self.key = config['key'] self.key = config['key']
self.node_url = config['node_url'] self.node_url = config['node_url']
self.hyperion_url = config['hyperion_url'] self.hyperion_url = config['hyperion_url']
self.ipfs_url = config['ipfs_url']
self.cleos = CLEOS( self.cleos = CLEOS(
None, None, self.node_url, remote=self.node_url) None, None, self.node_url, remote=self.node_url)
self._exit_stack = ExitStack() self.ipfs_gateway_url = config['ipfs_gateway_url']
self.ipfs_url = config['ipfs_url']
def connect(self):
self.ipfs_node = self._exit_stack.enter_context(
open_ipfs_node())
def disconnect(self):
self._exit_stack.close()
@acm
async def open(self):
self.connect()
yield self
self.disconnect()
self.ipfs_client = AsyncIPFSHTTP(self.ipfs_url)
# blockchain helpers # blockchain helpers
@ -206,21 +194,23 @@ class SkynetGPUConnector:
# IPFS helpers # IPFS helpers
def publish_on_ipfs(self, raw_img: bytes): async def publish_on_ipfs(self, raw_img: bytes):
logging.info('publish_on_ipfs') logging.info('publish_on_ipfs')
img = Image.open(io.BytesIO(raw_img)) img = Image.open(io.BytesIO(raw_img))
img.save(f'ipfs-docker-staging/image.png') img.save('ipfs-docker-staging/image.png')
# check peer connections, reconnect to skynet gateway if not # check peer connections, reconnect to skynet gateway if not
peers = self.ipfs_node.check_connect() peers = await self.ipfs_client.peers()
if self.ipfs_url not in peers: peer_addresses = [peer['Addr'] for peer in peers]
self.ipfs_node.connect(self.ipfs_url) if self.ipfs_gateway_url not in peer_addresses:
await self.ipfs_client.connect(self.ipfs_gateway_url)
ipfs_hash = self.ipfs_node.add('image.png') file_info = await self.ipfs_client.add(Path('ipfs-docker-staging/image.png'))
file_cid = file_info['Hash']
self.ipfs_node.pin(ipfs_hash) await self.ipfs_client.pin(file_cid)
return ipfs_hash return file_cid
async def get_input_data(self, ipfs_hash: str) -> bytes: async def get_input_data(self, ipfs_hash: str) -> bytes:
if ipfs_hash == '': if ipfs_hash == '':

View File

@ -1,26 +1,61 @@
#!/usr/bin/python #!/usr/bin/python
import logging import logging
from pathlib import Path
import asks import asks
import requests
class IPFSHTTP: class IPFSClientException(BaseException):
...
class AsyncIPFSHTTP:
def __init__(self, endpoint: str): def __init__(self, endpoint: str):
self.endpoint = endpoint self.endpoint = endpoint
def pin(self, cid: str): async def _post(self, sub_url: str, *args, **kwargs):
return requests.post( resp = await asks.post(
f'{self.endpoint}/api/v0/pin/add', self.endpoint + sub_url,
*args, **kwargs
)
if resp.status_code != 200:
raise IPFSClientException(resp.text)
return resp.json()
async def add(self, file_path: Path, **kwargs):
files = {
'file': (str(file_path), open(file_path, 'rb'))
}
headers = {
'Content-Type': 'multipart/form-data'
}
return await self._post(
'/api/v0/add',
files=files,
headers=headers,
params=kwargs
)
async def pin(self, cid: str):
return await self._post(
'/api/v0/pin/add',
params={'arg': cid} params={'arg': cid}
) )
async def a_pin(self, cid: str): async def connect(self, multi_addr: str):
return await asks.post( return await self._post(
f'{self.endpoint}/api/v0/pin/add', '/api/v0/swarm/connect',
params={'arg': cid} params={'arg': multi_addr}
)
async def peers(self, **kwargs):
return await self._post(
'/api/v0/swarm/peers',
params=kwargs
) )

View File

@ -9,7 +9,7 @@ import trio
from leap.hyperion import HyperionAPI from leap.hyperion import HyperionAPI
from . import IPFSHTTP from . import AsyncIPFSHTTP
MAX_TIME = timedelta(seconds=20) MAX_TIME = timedelta(seconds=20)
@ -20,7 +20,7 @@ class SkynetPinner:
def __init__( def __init__(
self, self,
hyperion: HyperionAPI, hyperion: HyperionAPI,
ipfs_http: IPFSHTTP ipfs_http: AsyncIPFSHTTP
): ):
self.hyperion = hyperion self.hyperion = hyperion
self.ipfs_http = ipfs_http self.ipfs_http = ipfs_http
@ -85,7 +85,7 @@ class SkynetPinner:
for _ in range(6): for _ in range(6):
try: try:
with trio.move_on_after(5): with trio.move_on_after(5):
resp = await self.ipfs_http.a_pin(cid) resp = await self.ipfs_http.pin(cid)
if resp.status_code != 200: if resp.status_code != 200:
logging.error(f'error pinning {cid}:\n{resp.text}') logging.error(f'error pinning {cid}:\n{resp.text}')
del self._pinned[cid] del self._pinned[cid]