diff --git a/skynet/cli.py b/skynet/cli.py index 840b937..54dc484 100755 --- a/skynet/cli.py +++ b/skynet/cli.py @@ -16,7 +16,7 @@ from leap.cleos import CLEOS from leap.sugar import collect_stdout, Name, asset_from_str from leap.hyperion import HyperionAPI -from skynet.ipfs import IPFSHTTP +from skynet.ipfs import AsyncIPFSHTTP from .db import open_new_database @@ -516,7 +516,7 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): from .ipfs.pinner import SkynetPinner logging.basicConfig(level=loglevel) - ipfs_node = IPFSHTTP(ipfs_rpc) + ipfs_node = AsyncIPFSHTTP(ipfs_rpc) hyperion = HyperionAPI(hyperion_url) pinner = SkynetPinner(hyperion, ipfs_node) diff --git a/skynet/dgpu/daemon.py b/skynet/dgpu/daemon.py index bf7c176..bd0f1f9 100644 --- a/skynet/dgpu/daemon.py +++ b/skynet/dgpu/daemon.py @@ -73,7 +73,7 @@ class SkynetDGPUDaemon: img_sha, img_raw = self.mm.compute_one( body['method'], body['params'], binary=binary) - ipfs_hash = self.conn.publish_on_ipfs( img_raw) + ipfs_hash = await self.conn.publish_on_ipfs(img_raw) await self.conn.submit_work(rid, request_hash, img_sha, ipfs_hash) break diff --git a/skynet/dgpu/network.py b/skynet/dgpu/network.py index 5309122..6c4fe4b 100644 --- a/skynet/dgpu/network.py +++ b/skynet/dgpu/network.py @@ -3,13 +3,13 @@ from functools import partial import io import json +from pathlib import Path import time import logging import asks from PIL import Image -from contextlib import ExitStack from contextlib import asynccontextmanager as acm from leap.cleos import CLEOS @@ -17,8 +17,7 @@ from leap.sugar import Checksum256, Name, asset_from_str from skynet.constants import DEFAULT_DOMAIN from skynet.dgpu.errors import DGPUComputeError -from skynet.ipfs import get_ipfs_file -from skynet.ipfs.docker import open_ipfs_node +from skynet.ipfs import AsyncIPFSHTTP, get_ipfs_file async def failable(fn: partial, ret_fail=None): @@ -38,28 +37,17 @@ class SkynetGPUConnector: self.account = Name(config['account']) self.permission = config['permission'] self.key = config['key'] + self.node_url = config['node_url'] self.hyperion_url = config['hyperion_url'] - self.ipfs_url = config['ipfs_url'] self.cleos = CLEOS( None, None, self.node_url, remote=self.node_url) - self._exit_stack = ExitStack() - - def connect(self): - self.ipfs_node = self._exit_stack.enter_context( - open_ipfs_node()) - - def disconnect(self): - self._exit_stack.close() - - @acm - async def open(self): - self.connect() - yield self - self.disconnect() + self.ipfs_gateway_url = config['ipfs_gateway_url'] + self.ipfs_url = config['ipfs_url'] + self.ipfs_client = AsyncIPFSHTTP(self.ipfs_url) # blockchain helpers @@ -206,21 +194,23 @@ class SkynetGPUConnector: # IPFS helpers - def publish_on_ipfs(self, raw_img: bytes): + async def publish_on_ipfs(self, raw_img: bytes): logging.info('publish_on_ipfs') img = Image.open(io.BytesIO(raw_img)) - img.save(f'ipfs-docker-staging/image.png') + img.save('ipfs-docker-staging/image.png') # check peer connections, reconnect to skynet gateway if not - peers = self.ipfs_node.check_connect() - if self.ipfs_url not in peers: - self.ipfs_node.connect(self.ipfs_url) + peers = await self.ipfs_client.peers() + peer_addresses = [peer['Addr'] for peer in peers] + if self.ipfs_gateway_url not in peer_addresses: + await self.ipfs_client.connect(self.ipfs_gateway_url) - ipfs_hash = self.ipfs_node.add('image.png') + file_info = await self.ipfs_client.add(Path('ipfs-docker-staging/image.png')) + file_cid = file_info['Hash'] - self.ipfs_node.pin(ipfs_hash) + await self.ipfs_client.pin(file_cid) - return ipfs_hash + return file_cid async def get_input_data(self, ipfs_hash: str) -> bytes: if ipfs_hash == '': diff --git a/skynet/ipfs/__init__.py b/skynet/ipfs/__init__.py index 9ee7b42..5cb4067 100644 --- a/skynet/ipfs/__init__.py +++ b/skynet/ipfs/__init__.py @@ -1,26 +1,61 @@ #!/usr/bin/python import logging +from pathlib import Path import asks -import requests -class IPFSHTTP: +class IPFSClientException(BaseException): + ... + + +class AsyncIPFSHTTP: def __init__(self, endpoint: str): self.endpoint = endpoint - def pin(self, cid: str): - return requests.post( - f'{self.endpoint}/api/v0/pin/add', + async def _post(self, sub_url: str, *args, **kwargs): + resp = await asks.post( + self.endpoint + sub_url, + *args, **kwargs + ) + + if resp.status_code != 200: + raise IPFSClientException(resp.text) + + return resp.json() + + async def add(self, file_path: Path, **kwargs): + files = { + 'file': (str(file_path), open(file_path, 'rb')) + } + headers = { + 'Content-Type': 'multipart/form-data' + } + return await self._post( + '/api/v0/add', + files=files, + headers=headers, + params=kwargs + ) + + async def pin(self, cid: str): + return await self._post( + '/api/v0/pin/add', params={'arg': cid} ) - async def a_pin(self, cid: str): - return await asks.post( - f'{self.endpoint}/api/v0/pin/add', - params={'arg': cid} + async def connect(self, multi_addr: str): + return await self._post( + '/api/v0/swarm/connect', + params={'arg': multi_addr} + ) + + async def peers(self, **kwargs): + return await self._post( + '/api/v0/swarm/peers', + params=kwargs ) diff --git a/skynet/ipfs/pinner.py b/skynet/ipfs/pinner.py index c4aee10..146517f 100644 --- a/skynet/ipfs/pinner.py +++ b/skynet/ipfs/pinner.py @@ -9,7 +9,7 @@ import trio from leap.hyperion import HyperionAPI -from . import IPFSHTTP +from . import AsyncIPFSHTTP MAX_TIME = timedelta(seconds=20) @@ -20,7 +20,7 @@ class SkynetPinner: def __init__( self, hyperion: HyperionAPI, - ipfs_http: IPFSHTTP + ipfs_http: AsyncIPFSHTTP ): self.hyperion = hyperion self.ipfs_http = ipfs_http @@ -85,7 +85,7 @@ class SkynetPinner: for _ in range(6): try: with trio.move_on_after(5): - resp = await self.ipfs_http.a_pin(cid) + resp = await self.ipfs_http.pin(cid) if resp.status_code != 200: logging.error(f'error pinning {cid}:\n{resp.text}') del self._pinned[cid]