Add test for new ipfs async apis, fix cli entrypoints endpoint loading to new format

pull/23/head
Guillermo Rodriguez 2023-09-24 15:23:25 -03:00
parent 58f208afa2
commit 1b13cf25cc
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
9 changed files with 91 additions and 57 deletions

View File

@ -1,3 +1,4 @@
[pytest]
log_cli = True
log_level = info
trio_mode = True

View File

@ -20,7 +20,7 @@ key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
node_url = https://testnet.skygpu.net
hyperion_url = https://testnet.skygpu.net
ipfs_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv
ipfs_gateway_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv
token = XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
@ -31,6 +31,6 @@ key = 5Xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
node_url = https://testnet.skygpu.net
hyperion_url = https://testnet.skygpu.net
ipfs_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv
ipfs_gateway_url = /ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv
token = XXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

View File

@ -125,8 +125,7 @@ def enqueue(
key, account, permission = load_account_info(
'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
with open_cleos(node_url, key=key) as cleos:
async def enqueue_n_jobs():
@ -176,8 +175,7 @@ def clean(
key, account, permission = load_account_info(
'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
logging.basicConfig(level=loglevel)
cleos = CLEOS(None, None, url=node_url, remote=node_url)
@ -195,8 +193,7 @@ def clean(
@click.option(
'--node-url', '-n', default='https://skynet.ancap.tech')
def queue(node_url: str):
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
resp = requests.post(
f'{node_url}/v1/chain/get_table_rows',
json={
@ -213,8 +210,7 @@ def queue(node_url: str):
'--node-url', '-n', default='https://skynet.ancap.tech')
@click.argument('request-id')
def status(node_url: str, request_id: int):
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
resp = requests.post(
f'{node_url}/v1/chain/get_table_rows',
json={
@ -246,20 +242,22 @@ def dequeue(
key, account, permission = load_account_info(
'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
with open_cleos(node_url, key=key) as cleos:
res = trio.run(cleos.a_push_action,
cleos = CLEOS(None, None, url=node_url, remote=node_url)
res = trio.run(
partial(
cleos.a_push_action,
'telos.gpu',
'dequeue',
{
'user': Name(account),
'request_id': int(request_id),
},
account, key, permission,
account, key, permission=permission
)
print(res)
)
print(res)
@skynet.command()
@ -285,20 +283,21 @@ def config(
):
key, account, permission = load_account_info(
'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
with open_cleos(node_url, key=key) as cleos:
res = trio.run(cleos.a_push_action,
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
cleos = CLEOS(None, None, url=node_url, remote=node_url)
res = trio.run(
partial(
cleos.a_push_action,
'telos.gpu',
'config',
{
'token_contract': token_contract,
'token_symbol': token_symbol,
},
account, key, permission,
account, key, permission=permission
)
print(res)
)
print(res)
@skynet.command()
@ -321,12 +320,12 @@ def deposit(
key, account, permission = load_account_info(
'user', key, account, permission)
node_url, _, _ = load_endpoint_info(
'user', node_url, None, None)
node_url, _, _, _ = load_endpoint_info('user', node_url=node_url)
with open_cleos(node_url, key=key) as cleos:
res = trio.run(cleos.a_push_action,
'eosio.token',
res = trio.run(
partial(
cleos.a_push_action,
'telos.gpu',
'transfer',
{
'sender': Name(account),
@ -334,9 +333,10 @@ def deposit(
'amount': asset_from_str(quantity),
'memo': f'{account} transferred {quantity} to telos.gpu'
},
account, key, permission,
account, key, permission=permission
)
print(res)
)
print(res)
@skynet.group()
@ -388,7 +388,7 @@ def dgpu(
@click.option(
'--node-url', '-n', default=f'https://testnet.{DEFAULT_DOMAIN}')
@click.option(
'--ipfs-url', '-i', default=DEFAULT_IPFS_REMOTE)
'--ipfs-gateway-url', '-i', default=DEFAULT_IPFS_REMOTE)
@click.option(
'--db-host', '-h', default='localhost:5432')
@click.option(
@ -401,7 +401,7 @@ def telegram(
permission: str,
key: str | None,
hyperion_url: str,
ipfs_url: str,
ipfs_gateway_url: str,
node_url: str,
db_host: str,
db_user: str,
@ -414,8 +414,8 @@ def telegram(
key, account, permission = load_account_info(
'telegram', key, account, permission)
node_url, _, ipfs_url = load_endpoint_info(
'telegram', node_url, None, None)
node_url, _, ipfs_gateway_url, _ = load_endpoint_info(
'telegram', node_url=node_url, ipfs_gateway_url=ipfs_gateway_url)
async def _async_main():
frontend = SkynetTelegramFrontend(
@ -425,7 +425,7 @@ def telegram(
node_url,
hyperion_url,
db_host, db_user, db_pass,
remote_ipfs_node=ipfs_url,
remote_ipfs_node=ipfs_gateway_url,
key=key
)
@ -449,7 +449,7 @@ def telegram(
@click.option(
'--node-url', '-n', default=f'https://testnet.{DEFAULT_DOMAIN}')
@click.option(
'--ipfs-url', '-i', default=DEFAULT_IPFS_REMOTE)
'--ipfs-gateway-url', '-i', default=DEFAULT_IPFS_REMOTE)
@click.option(
'--db-host', '-h', default='localhost:5432')
@click.option(
@ -462,7 +462,7 @@ def discord(
permission: str,
key: str | None,
hyperion_url: str,
ipfs_url: str,
ipfs_gateway_url: str,
node_url: str,
db_host: str,
db_user: str,
@ -475,8 +475,8 @@ def discord(
key, account, permission = load_account_info(
'discord', key, account, permission)
node_url, _, ipfs_url = load_endpoint_info(
'discord', node_url, None, None)
node_url, _, ipfs_gateway_url, _ = load_endpoint_info(
'telegram', node_url=node_url, ipfs_gateway_url=ipfs_gateway_url)
async def _async_main():
frontend = SkynetDiscordFrontend(
@ -486,7 +486,7 @@ def discord(
node_url,
hyperion_url,
db_host, db_user, db_pass,
remote_ipfs_node=ipfs_url,
remote_ipfs_node=ipfs_gateway_url,
key=key
)

View File

@ -93,6 +93,7 @@ def load_endpoint_info(
node_url: str | None = None,
hyperion_url: str | None = None,
ipfs_url: str | None = None,
ipfs_gateway_url: str | None = None,
file_path=DEFAULT_CONFIG_PATH
):
config = load_skynet_ini(file_path=file_path)
@ -110,4 +111,7 @@ def load_endpoint_info(
if not ipfs_url and 'ipfs_url' in sub_config:
ipfs_url = sub_config['ipfs_url']
return node_url, hyperion_url, ipfs_url
if not ipfs_gateway_url and 'ipfs_gateway_url' in sub_config:
ipfs_gateway_url = sub_config['ipfs_gateway_url']
return node_url, hyperion_url, ipfs_gateway_url, ipfs_url

View File

@ -200,9 +200,9 @@ class SkynetGPUConnector:
img.save('ipfs-docker-staging/image.png')
# check peer connections, reconnect to skynet gateway if not
gateway_id = Path(self.ipfs_gateway_url).name
peers = await self.ipfs_client.peers()
peer_addresses = [peer['Addr'] for peer in peers]
if self.ipfs_gateway_url not in peer_addresses:
if gateway_id not in [p['Peer'] for p in peers]:
await self.ipfs_client.connect(self.ipfs_gateway_url)
file_info = await self.ipfs_client.add(Path('ipfs-docker-staging/image.png'))

View File

@ -28,23 +28,19 @@ class AsyncIPFSHTTP:
async def add(self, file_path: Path, **kwargs):
files = {
'file': (str(file_path), open(file_path, 'rb'))
}
headers = {
'Content-Type': 'multipart/form-data'
'file': file_path
}
return await self._post(
'/api/v0/add',
files=files,
headers=headers,
params=kwargs
)
async def pin(self, cid: str):
return await self._post(
return (await self._post(
'/api/v0/pin/add',
params={'arg': cid}
)
))['Pins']
async def connect(self, multi_addr: str):
return await self._post(
@ -53,10 +49,10 @@ class AsyncIPFSHTTP:
)
async def peers(self, **kwargs):
return await self._post(
return (await self._post(
'/api/v0/swarm/peers',
params=kwargs
)
))['Peers']
async def get_ipfs_file(ipfs_link: str, timeout: int = 60):

View File

@ -51,9 +51,10 @@ class IPFSDocker:
@cm
def open_ipfs_node(name='skynet-ipfs'):
def open_ipfs_node(name='skynet-ipfs', teardown=False):
dclient = docker.from_env()
container = None
try:
container = dclient.containers.get(name)
@ -100,3 +101,6 @@ def open_ipfs_node(name='skynet-ipfs'):
break
yield IPFSDocker(container)
if teardown and container:
container.stop()

View File

@ -1,15 +1,18 @@
#!/usr/bin/python
import logging
from pathlib import Path
import pytest
from skynet.db import open_new_database
from skynet.ipfs import AsyncIPFSHTTP
from skynet.ipfs.docker import open_ipfs_node
from skynet.nodeos import open_nodeos
@pytest.fixture(scope='session')
def ipfs_client():
with open_ipfs_node(teardown=True):
yield AsyncIPFSHTTP('http://127.0.0.1:5001')
@pytest.fixture(scope='session')
def postgres_db():
with open_new_database() as db_params:

View File

@ -0,0 +1,26 @@
#!/usr/bin/python
from pathlib import Path
async def test_connection(ipfs_client):
await ipfs_client.connect(
'/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv')
peers = await ipfs_client.peers()
assert '12D3KooWKWogLFNEcNNMKnzU7Snrnuj84RZdMBg3sLiQSQc51oEv' in [p['Peer'] for p in peers]
async def test_add_and_pin_file(ipfs_client):
test_file = Path('hello_world.txt')
with open(test_file, 'w+') as file:
file.write('Hello Skynet!')
file_info = await ipfs_client.add(test_file)
file_cid = file_info['Hash']
pin_resp = await ipfs_client.pin(file_cid)
assert file_cid in pin_resp
test_file.unlink()