First version integrated to rust contract

rust_contract
Guillermo Rodriguez 2025-02-13 18:51:42 -03:00
parent eeb27d5bbf
commit 0a5c06e312
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
14 changed files with 680 additions and 865 deletions

View File

@ -7,14 +7,57 @@ jobs:
name: Pytest Tests name: Pytest Tests
runs-on: ubuntu-24.04 runs-on: ubuntu-24.04
timeout-minutes: 10 timeout-minutes: 10
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v4
with: with:
submodules: recursive submodules: recursive
- name: Install system dependencies (Rust + Python)
run: |
sudo apt-get update
sudo apt-get install -y \
build-essential \
pkg-config \
libssl-dev \
clang \
lld \
protobuf-compiler \
make \
wget
- name: Install binaryen 120
run: |
wget https://github.com/WebAssembly/binaryen/releases/download/version_120/binaryen-version_120-x86_64-linux.tar.gz
tar xvf binaryen-version_120-x86_64-linux.tar.gz
echo "$(pwd)/binaryen-version_120/bin" >> $GITHUB_PATH
- name: Install the latest version of uv - name: Install the latest version of uv
uses: astral-sh/setup-uv@v5 uses: astral-sh/setup-uv@v5
- name: Install Rust
run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
rustup install stable
rustup component add rust-src --toolchain stable
rustup target add wasm32-wasip1
- name: Set up Python environment and dependencies
run: |
uv venv .venv --python=3.12
uv pip install -U rust-contracts-builder
echo "$(pwd)/.venv/bin" >> $GITHUB_PATH
- name: Apply required modifications for rust-contracts-builder
run: |
sed -i "s/wasm32-wasi /wasm32-wasip1 /g" .venv/lib/python3.12/site-packages/rust_contracts_builder/__init__.py
sed -i "s/wasm32-wasi\//wasm32-wasip1\//g" .venv/lib/python3.12/site-packages/rust_contracts_builder/__init__.py
- name: Build Rust project
run: rust-contract build
working-directory: tests/contracts/skygpu-contract
- uses: actions/cache@v3 - uses: actions/cache@v3
name: Cache venv name: Cache venv
with: with:

3
.gitmodules vendored 100644
View File

@ -0,0 +1,3 @@
[submodule "tests/contracts/skygpu-contract"]
path = tests/contracts/skygpu-contract
url = https://github.com/skygpu/skygpu-contract.git

View File

@ -16,13 +16,14 @@ async def open_test_worker(
cleos, ipfs_node, cleos, ipfs_node,
account: str = 'testworker', account: str = 'testworker',
permission: str = 'active', permission: str = 'active',
key: str = '5KRPFxF4RJebqPXqRzwStmCaEWeRfp3pR7XUNoA3zCHt5fnPu3s',
hf_token: str = '', hf_token: str = '',
**kwargs **kwargs
): ):
config = override_dgpu_config( config = override_dgpu_config(
account=account, account=account,
permission=permission, permission=permission,
key=cleos.private_keys[account], key=key,
node_url=cleos.endpoint, node_url=cleos.endpoint,
ipfs_url=ipfs_node[1].endpoint, ipfs_url=ipfs_node[1].endpoint,
hf_token=hf_token, hf_token=hf_token,

View File

@ -271,221 +271,3 @@ TG_MAX_WIDTH = 1280
TG_MAX_HEIGHT = 1280 TG_MAX_HEIGHT = 1280
DEFAULT_SINGLE_CARD_MAP = 'cuda:0' DEFAULT_SINGLE_CARD_MAP = 'cuda:0'
GPU_CONTRACT_ABI = {
"version": "eosio::abi/1.2",
"types": [],
"structs": [
{
"name": "account",
"base": "",
"fields": [
{"name": "user", "type": "name"},
{"name": "balance", "type": "asset"},
{"name": "nonce", "type": "uint64"}
]
},
{
"name": "card",
"base": "",
"fields": [
{"name": "id", "type": "uint64"},
{"name": "owner", "type": "name"},
{"name": "card_name", "type": "string"},
{"name": "version", "type": "string"},
{"name": "total_memory", "type": "uint64"},
{"name": "mp_count", "type": "uint32"},
{"name": "extra", "type": "string"}
]
},
{
"name": "clean",
"base": "",
"fields": []
},
{
"name": "config",
"base": "",
"fields": [
{"name": "token_contract", "type": "name"},
{"name": "token_symbol", "type": "symbol"}
]
},
{
"name": "dequeue",
"base": "",
"fields": [
{"name": "user", "type": "name"},
{"name": "request_id", "type": "uint64"}
]
},
{
"name": "enqueue",
"base": "",
"fields": [
{"name": "user", "type": "name"},
{"name": "request_body", "type": "string"},
{"name": "binary_data", "type": "string"},
{"name": "reward", "type": "asset"},
{"name": "min_verification", "type": "uint32"}
]
},
{
"name": "gcfgstruct",
"base": "",
"fields": [
{"name": "token_contract", "type": "name"},
{"name": "token_symbol", "type": "symbol"}
]
},
{
"name": "submit",
"base": "",
"fields": [
{"name": "worker", "type": "name"},
{"name": "request_id", "type": "uint64"},
{"name": "request_hash", "type": "checksum256"},
{"name": "result_hash", "type": "checksum256"},
{"name": "ipfs_hash", "type": "string"}
]
},
{
"name": "withdraw",
"base": "",
"fields": [
{"name": "user", "type": "name"},
{"name": "quantity", "type": "asset"}
]
},
{
"name": "work_request_struct",
"base": "",
"fields": [
{"name": "id", "type": "uint64"},
{"name": "user", "type": "name"},
{"name": "reward", "type": "asset"},
{"name": "min_verification", "type": "uint32"},
{"name": "nonce", "type": "uint64"},
{"name": "body", "type": "string"},
{"name": "binary_data", "type": "string"},
{"name": "timestamp", "type": "time_point_sec"}
]
},
{
"name": "work_result_struct",
"base": "",
"fields": [
{"name": "id", "type": "uint64"},
{"name": "request_id", "type": "uint64"},
{"name": "user", "type": "name"},
{"name": "worker", "type": "name"},
{"name": "result_hash", "type": "checksum256"},
{"name": "ipfs_hash", "type": "string"},
{"name": "submited", "type": "time_point_sec"}
]
},
{
"name": "workbegin",
"base": "",
"fields": [
{"name": "worker", "type": "name"},
{"name": "request_id", "type": "uint64"},
{"name": "max_workers", "type": "uint32"}
]
},
{
"name": "workcancel",
"base": "",
"fields": [
{"name": "worker", "type": "name"},
{"name": "request_id", "type": "uint64"},
{"name": "reason", "type": "string"}
]
},
{
"name": "worker",
"base": "",
"fields": [
{"name": "account", "type": "name"},
{"name": "joined", "type": "time_point_sec"},
{"name": "left", "type": "time_point_sec"},
{"name": "url", "type": "string"}
]
},
{
"name": "worker_status_struct",
"base": "",
"fields": [
{"name": "worker", "type": "name"},
{"name": "status", "type": "string"},
{"name": "started", "type": "time_point_sec"}
]
}
],
"actions": [
{"name": "clean", "type": "clean", "ricardian_contract": ""},
{"name": "config", "type": "config", "ricardian_contract": ""},
{"name": "dequeue", "type": "dequeue", "ricardian_contract": ""},
{"name": "enqueue", "type": "enqueue", "ricardian_contract": ""},
{"name": "submit", "type": "submit", "ricardian_contract": ""},
{"name": "withdraw", "type": "withdraw", "ricardian_contract": ""},
{"name": "workbegin", "type": "workbegin", "ricardian_contract": ""},
{"name": "workcancel", "type": "workcancel", "ricardian_contract": ""}
],
"tables": [
{
"name": "cards",
"index_type": "i64",
"key_names": [],
"key_types": [],
"type": "card"
},
{
"name": "gcfgstruct",
"index_type": "i64",
"key_names": [],
"key_types": [],
"type": "gcfgstruct"
},
{
"name": "queue",
"index_type": "i64",
"key_names": [],
"key_types": [],
"type": "work_request_struct"
},
{
"name": "results",
"index_type": "i64",
"key_names": [],
"key_types": [],
"type": "work_result_struct"
},
{
"name": "status",
"index_type": "i64",
"key_names": [],
"key_types": [],
"type": "worker_status_struct"
},
{
"name": "users",
"index_type": "i64",
"key_names": [],
"key_types": [],
"type": "account"
},
{
"name": "workers",
"index_type": "i64",
"key_names": [],
"key_types": [],
"type": "worker"
}
],
"ricardian_clauses": [],
"error_messages": [],
"abi_extensions": [],
"variants": [],
"action_results": []
}

304
skynet/contract.py 100644
View File

@ -0,0 +1,304 @@
import time
import msgspec
from leap import CLEOS
from leap.protocol import Name
from skynet.types import (
ConfigV1,
AccountV1,
WorkerV0,
RequestV1,
BodyV0,
WorkerStatusV0,
ResultV0
)
class ConfigNotFound(BaseException):
...
class AccountNotFound(BaseException):
...
class WorkerNotFound(BaseException):
...
class RequestNotFound(BaseException):
...
class WorkerStatusNotFound(BaseException):
...
class GPUContractAPI:
def __init__(self, cleos: CLEOS):
self.receiver = 'gpu.scd'
self._cleos = cleos
# views into data
async def get_config(self) -> ConfigV1:
rows = await self._cleos.aget_table(
self.receiver, self.receiver, 'config',
resp_cls=ConfigV1
)
if len(rows) == 0:
raise ConfigNotFound()
return rows[0]
async def get_user(self, user: str) -> AccountV1:
rows = await self._cleos.aget_table(
self.receiver, self.receiver, 'users',
key_type='name',
lower_bound=user,
upper_bound=user,
resp_cls=AccountV1
)
if len(rows) == 0:
raise AccountNotFound(user)
return rows[0]
async def get_users(self) -> list[AccountV1]:
return await self._cleos.aget_table(self.receiver, self.receiver, 'users', resp_cls=AccountV1)
async def get_worker(self, worker: str) -> WorkerV0:
rows = await self._cleos.aget_table(
self.receiver, self.receiver, 'workers',
key_type='name',
lower_bound=worker,
upper_bound=worker,
resp_cls=WorkerV0
)
if len(rows) == 0:
raise WorkerNotFound(worker)
return rows[0]
async def get_workers(self) -> list[AccountV1]:
return await self._cleos.aget_table(self.receiver, self.receiver, 'workers', resp_cls=WorkerV0)
async def get_queue(self) -> RequestV1:
return await self._cleos.aget_table(self.receiver, self.receiver, 'queue', resp_cls=RequestV1)
async def get_request(self, request_id: int) -> RequestV1:
rows = await self._cleos.aget_table(
self.receiver, self.receiver, 'queue',
lower_bound=request_id,
upper_bound=request_id,
resp_cls=RequestV1
)
if len(rows) == 0:
raise RequestNotFound(request_id)
return rows[0]
async def get_requests_since(self, seconds: int) -> list[RequestV1]:
return await self._cleos.aget_table(
self.receiver, self.receiver, 'queue',
index_position=2,
key_type='i64',
lower_bound=int(time.time()) - seconds,
resp_cls=RequestV1
)
async def get_statuses_for_request(self, request_id: int) -> list[WorkerStatusV0]:
return await self._cleos.aget_table(
self.receiver, str(Name.from_int(request_id)), 'status',
resp_cls=WorkerStatusV0
)
async def get_worker_status_for_request(self, request_id: int, worker: str) -> WorkerStatusV0:
rows = await self._cleos.aget_table(
self.receiver, str(Name.from_int(request_id)), 'status',
key_type='name',
lower_bound=worker,
upper_bound=worker,
resp_cls=WorkerStatusV0
)
if len(rows) == 0:
raise WorkerStatusNotFound(request_id)
return rows[0]
async def get_results(self, request_id: int) -> list[ResultV0]:
return await self._cleos.aget_table(
self.receiver, self.receiver, 'results',
index_position=2,
key_type='i64',
lower_bound=request_id,
upper_bound=request_id,
resp_cls=ResultV0
)
async def get_worker_results(self, worker: str) -> list[ResultV0]:
return await self._cleos.aget_table(
self.receiver, self.receiver, 'results',
index_position=4,
key_type='name',
lower_bound=worker,
upper_bound=worker,
resp_cls=ResultV0
)
# system actions
async def init_config(self, token_account: str, token_symbol: str):
return await self._cleos.a_push_action(
self.receiver,
'config',
[token_account, token_symbol],
self.receiver
)
async def clean_tables(self, nuke: bool = False):
return await self._cleos.a_push_action(
self.receiver,
'clean',
[nuke],
self.receiver
)
# balance actions
async def deposit(self, user: str, quantity: str):
return await self._cleos.a_push_action(
'eosio.token',
'transfer',
[user, self.receiver, quantity, 'testing gpu deposit'],
user,
key=self._cleos.private_keys[user]
)
async def withdraw(self, user: str, quantity: str):
return await self._cleos.a_push_action(
self.receiver,
'withdraw',
[user, quantity],
user,
key=self._cleos.private_keys[user]
)
# worker actions
async def register_worker(
self,
worker: str,
url: str
):
return await self._cleos.a_push_action(
self.receiver,
'regworker',
[worker, url],
worker,
key=self._cleos.private_keys[worker]
)
async def unregister_worker(
self,
worker: str,
reason: str
):
return await self._cleos.a_push_action(
self.receiver,
'unregworker',
[worker, reason],
worker,
key=self._cleos.private_keys[worker]
)
async def accept_work(
self,
worker: str,
request_id: int,
max_workers: int = 10
):
return await self._cleos.a_push_action(
self.receiver,
'workbegin',
[worker, request_id, max_workers],
worker,
key=self._cleos.private_keys[worker]
)
async def cancel_work(
self,
worker: str,
request_id: int,
reason: str
):
return await self._cleos.a_push_action(
self.receiver,
'workcancel',
[worker, request_id, reason],
worker,
key=self._cleos.private_keys[worker]
)
async def submit_work(
self,
worker: str,
request_id: int,
result_hash: str,
ipfs_hash: str
):
return await self._cleos.a_push_action(
self.receiver,
'submit',
[worker, request_id, result_hash, ipfs_hash],
worker,
key=self._cleos.private_keys[worker]
)
# user actions
async def enqueue(
self,
account: str,
body: BodyV0,
binary_data: str = '',
reward: str = '1.0000 TLOS',
min_verification: int = 1
) -> int:
body = msgspec.json.encode(body).decode('utf-8')
result = await self._cleos.a_push_action(
self.receiver,
'enqueue',
[
account,
body,
binary_data,
reward,
min_verification
],
account,
key=self._cleos.private_keys[account]
)
console = result['processed']['action_traces'][0]['console']
nonce_index = -1
timestamp_index = -2
lines = console.rstrip().split('\n')
nonce = int(lines[nonce_index])
timestamp = lines[timestamp_index]
return RequestV1(
id=int(nonce),
user=account,
reward=reward,
min_verification=min_verification,
body=body,
binary_data=binary_data,
timestamp=timestamp
)
async def dequeue(self, user: str, request_id: int):
return await self._cleos.a_push_action(
self.receiver,
'dequeue',
[user, request_id],
user,
key=self._cleos.private_keys[user]
)

View File

@ -1,6 +1,5 @@
import logging import logging
from functools import partial from functools import partial
from hashlib import sha256
import trio import trio
import msgspec import msgspec
@ -22,7 +21,7 @@ from skynet.dgpu.network import (
async def maybe_update_tui_balance(conn: NetConnector): async def maybe_update_tui_balance(conn: NetConnector):
async def _fn(tui): async def _fn(tui):
# update balance # update balance
balance = await conn.get_worker_balance() balance = await conn.contract.get_user(tui.config.account).balance
tui.set_header_text(new_balance=f'balance: {balance}') tui.set_header_text(new_balance=f'balance: {balance}')
await maybe_update_tui_async(_fn) await maybe_update_tui_async(_fn)
@ -101,24 +100,12 @@ async def maybe_serve_one(
f'IPFS fetch input error !?! retries left {retry - r - 1}\n' f'IPFS fetch input error !?! retries left {retry - r - 1}\n'
) )
# compute unique request hash used on submit
hash_str = (
str(req.nonce)
+
req.body
+
req.binary_data
)
logging.debug(f'hashing: {hash_str}')
request_hash = sha256(hash_str.encode('utf-8')).hexdigest()
logging.info(f'calculated request hash: {request_hash}')
total_step = body.params.step total_step = body.params.step
mode = body.method mode = body.method
# TODO: validate request # TODO: validate request
resp = await conn.begin_work(req.id) resp = await conn.contract.accept_work(config.account, req.id)
if not resp or 'code' in resp: if not resp or 'code' in resp:
logging.info('begin_work error, probably being worked on already... skip.') logging.info('begin_work error, probably being worked on already... skip.')
return return
@ -157,7 +144,9 @@ async def maybe_serve_one(
ipfs_hash = await conn.publish_on_ipfs(output, typ=output_type) ipfs_hash = await conn.publish_on_ipfs(output, typ=output_type)
await conn.submit_work(req.id, request_hash, output_hash, ipfs_hash) await conn.contract.submit_work(config.account, req.id, output_hash, ipfs_hash)
await state_mngr.update_state()
await maybe_update_tui_balance(conn) await maybe_update_tui_balance(conn)
@ -168,8 +157,10 @@ async def maybe_serve_one(
if 'network cancel' not in str(err): if 'network cancel' not in str(err):
logging.exception('Failed to serve model request !?\n') logging.exception('Failed to serve model request !?\n')
await state_mngr.update_state()
if state_mngr.is_request_in_progress(req.id): if state_mngr.is_request_in_progress(req.id):
await conn.cancel_work(req.id, 'reason not provided') await conn.contract.cancel_work(config.account, req.id, 'reason not provided')
async def dgpu_serve_forever( async def dgpu_serve_forever(

View File

@ -15,18 +15,15 @@ import outcome
import msgspec import msgspec
from PIL import Image from PIL import Image
from leap.cleos import CLEOS from leap.cleos import CLEOS
from leap.protocol import Asset
from skynet.dgpu.tui import maybe_update_tui from skynet.dgpu.tui import maybe_update_tui
from skynet.config import DgpuConfig as Config, load_skynet_toml from skynet.config import DgpuConfig as Config, load_skynet_toml
from skynet.types import ( from skynet.types import (
ConfigV0,
AccountV0,
BodyV0, BodyV0,
RequestV0, RequestV1,
WorkerStatusV0, WorkerStatusV0,
ResultV0 ResultV0
) )
from skynet.constants import GPU_CONTRACT_ABI from skynet.contract import GPUContractAPI
from skynet.ipfs import ( from skynet.ipfs import (
AsyncIPFSHTTP, AsyncIPFSHTTP,
@ -70,178 +67,16 @@ class NetConnector:
def __init__(self, config: Config): def __init__(self, config: Config):
self.config = config self.config = config
self.cleos = CLEOS(endpoint=config.node_url) self.cleos = CLEOS(endpoint=config.node_url)
self.cleos.load_abi('gpu.scd', GPU_CONTRACT_ABI) self.cleos.import_key(config.account, config.key)
abi = self.cleos.get_abi('gpu.scd')
self.cleos.load_abi('gpu.scd', abi)
self.contract = GPUContractAPI(self.cleos)
self.ipfs_client = AsyncIPFSHTTP(config.ipfs_url) self.ipfs_client = AsyncIPFSHTTP(config.ipfs_url)
maybe_update_tui(lambda tui: tui.set_header_text(new_worker_name=self.config.account)) maybe_update_tui(lambda tui: tui.set_header_text(new_worker_name=self.config.account))
# blockchain helpers
async def get_work_requests_last_hour(self) -> list[RequestV0]:
logging.info('get_work_requests_last_hour')
rows = await failable(
partial(
self.cleos.aget_table,
'gpu.scd', 'gpu.scd', 'queue',
index_position=2,
key_type='i64',
lower_bound=int(time.time()) - 3600,
resp_cls=RequestV0
), ret_fail=[])
logging.info(f'found {len(rows)} requests on queue')
return rows
async def get_status_by_request_id(self, request_id: int) -> list[WorkerStatusV0]:
logging.info('get_status_by_request_id')
rows = await failable(
partial(
self.cleos.aget_table,
'gpu.scd', request_id, 'status', resp_cls=WorkerStatusV0), ret_fail=[])
logging.info(f'found status for workers: {[r.worker for r in rows]}')
return rows
async def get_global_config(self) -> ConfigV0:
logging.info('get_global_config')
rows = await failable(
partial(
self.cleos.aget_table,
'gpu.scd', 'gpu.scd', 'config',
resp_cls=ConfigV0))
if rows:
cfg = rows[0]
logging.info(f'config found: {cfg}')
return cfg
else:
logging.error('global config not found, is the contract initialized?')
return None
async def get_worker_balance(self) -> str:
logging.info('get_worker_balance')
rows = await failable(
partial(
self.cleos.aget_table,
'gpu.scd', 'gpu.scd', 'users',
index_position=1,
key_type='name',
lower_bound=self.config.account,
upper_bound=self.config.account,
resp_cls=AccountV0
))
if rows:
b = rows[0].balance
logging.info(f'balance: {b}')
return b
else:
logging.info('no balance info found')
return None
async def begin_work(self, request_id: int):
'''
Publish to the bc that the worker is beginning a model-computation
step.
'''
logging.info(f'begin_work on #{request_id}')
return await failable(
partial(
self.cleos.a_push_action,
'gpu.scd',
'workbegin',
list({
'worker': self.config.account,
'request_id': request_id,
'max_workers': 2
}.values()),
self.config.account, self.config.key,
permission=self.config.permission
)
)
async def cancel_work(self, request_id: int, reason: str):
logging.info(f'cancel_work on #{request_id}')
return await failable(
partial(
self.cleos.a_push_action,
'gpu.scd',
'workcancel',
list({
'worker': self.config.account,
'request_id': request_id,
'reason': reason
}.values()),
self.config.account, self.config.key,
permission=self.config.permission
)
)
async def maybe_withdraw_all(self):
logging.info('maybe_withdraw_all')
balance = await self.get_worker_balance()
if not balance:
return
balance_amount = float(balance.split(' ')[0])
if balance_amount > 0:
await failable(
partial(
self.cleos.a_push_action,
'gpu.scd',
'withdraw',
list({
'user': self.config.account,
'quantity': Asset.from_str(balance)
}.values()),
self.config.account, self.config.key,
permission=self.config.permission
)
)
async def find_results(self) -> list[ResultV0]:
logging.info('find_results')
rows = await failable(
partial(
self.cleos.aget_table,
'gpu.scd', 'gpu.scd', 'results',
index_position=4,
key_type='name',
lower_bound=self.config.account,
upper_bound=self.config.account,
resp_cls=ResultV0
)
)
return rows
async def submit_work(
self,
request_id: int,
request_hash: str,
result_hash: str,
ipfs_hash: str
):
logging.info(f'submit_work #{request_id}')
return await failable(
partial(
self.cleos.a_push_action,
'gpu.scd',
'submit',
list({
'worker': self.config.account,
'request_id': request_id,
'request_hash': request_hash,
'result_hash': result_hash,
'ipfs_hash': ipfs_hash
}.values()),
self.config.account, self.config.key,
permission=self.config.permission
)
)
# IPFS helpers # IPFS helpers
async def publish_on_ipfs(self, raw, typ: str = 'png'): async def publish_on_ipfs(self, raw, typ: str = 'png'):
Path('ipfs-staging').mkdir(exist_ok=True) Path('ipfs-staging').mkdir(exist_ok=True)
@ -302,9 +137,10 @@ class ContractState:
def __init__(self, conn: NetConnector): def __init__(self, conn: NetConnector):
self._conn = conn self._conn = conn
self._config = load_skynet_toml().dgpu
self._poll_index = 0 self._poll_index = 0
self._queue: list[RequestV0] = [] self._queue: list[RequestV1] = []
self._status_by_rid: dict[int, list[WorkerStatusV0]] = {} self._status_by_rid: dict[int, list[WorkerStatusV0]] = {}
self._results: list[ResultV0] = [] self._results: list[ResultV0] = []
@ -315,10 +151,10 @@ class ContractState:
return self._poll_index return self._poll_index
async def _fetch_results(self): async def _fetch_results(self):
self._results = await self._conn.find_results() self._results = await self._conn.contract.get_worker_results(self._config.account)
async def _fetch_statuses_for_id(self, rid: int): async def _fetch_statuses_for_id(self, rid: int):
self._status_by_rid[rid] = await self._conn.get_status_by_request_id(rid) self._status_by_rid[rid] = await self._conn.contract.get_statuses_for_request(rid)
async def update_state(self): async def update_state(self):
''' '''
@ -326,7 +162,7 @@ class ContractState:
''' '''
# raw queue from chain # raw queue from chain
_queue = await self._conn.get_work_requests_last_hour() _queue = await self._conn.contract.get_requests_since(3600)
# filter out invalids # filter out invalids
self._queue = [] self._queue = []
@ -380,7 +216,7 @@ class ContractState:
return len(self._queue) return len(self._queue)
@property @property
def first(self) -> RequestV0 | None: def first(self) -> RequestV1 | None:
if len(self._queue) > 0: if len(self._queue) > 0:
return self._queue[0] return self._queue[0]
@ -391,7 +227,7 @@ class ContractState:
return set(( return set((
status.worker status.worker
for status in self._status_by_rid[request_id] for status in self._status_by_rid[request_id]
if status.worker != self._conn.config.account if status.worker != self._config.account
)) ))
# predicates # predicates

View File

@ -39,6 +39,25 @@ class ConfigV0:
token_contract: str token_contract: str
token_symbol: str token_symbol: str
'''
ConfigV1
singleton containing global info about system, definition:
```rust
#[chain(table="config", singleton)]
pub struct Config {
token_account: Name,
token_symbol: Symbol,
global_nonce: u64
}
```
'''
class ConfigV1(Struct):
token_account: str
token_symbol: str
global_nonce: int
''' '''
RequestV0 RequestV0
@ -103,6 +122,38 @@ class RequestV0(Struct):
binary_data: str binary_data: str
timestamp: str timestamp: str
'''
RequestV1
a request placed on the queue, definition:
NEW: nonce field removed
scope: self.receiver
```rust
#[chain(table="queue")]
pub struct Request {
#[chain(primary)]
id: u64,
user: Name,
reward: Asset,
min_verification: u32,
body: String,
binary_data: String,
#[chain(secondary)]
timestamp: TimePointSec
}
```
'''
class RequestV1(Struct):
id: int
user: str
reward: str
min_verification: int
body: str
binary_data: str
timestamp: str
''' '''
AccountV0 AccountV0
@ -128,6 +179,27 @@ class AccountV0(Struct):
balance: str balance: str
nonce: int nonce: int
'''
AccountV1
a user account, users must deposit tokens in order to enqueue requests, definition:
scope: self.receiver
```rust
#[chain(table="users")]
pub struct Account {
#[chain(primary)]
user: Name,
balance: Asset
}
```
'''
class AccountV1(Struct):
user: str
balance: str
''' '''
WorkerV0 WorkerV0

View File

@ -1,6 +1,7 @@
import pytest import pytest
from skynet.ipfs import AsyncIPFSHTTP from skynet.ipfs import AsyncIPFSHTTP
from skynet.contract import GPUContractAPI
from skynet._testing import override_dgpu_config from skynet._testing import override_dgpu_config
@ -24,9 +25,11 @@ def skynet_cleos(cleos_bs):
# cleos.import_key('gpu.scd', priv) # cleos.import_key('gpu.scd', priv)
cleos.new_account('gpu.scd', ram=4200000) cleos.new_account('gpu.scd', ram=4200000)
contract_path = 'tests/contracts/skygpu-contract/target'
cleos.deploy_contract_from_path( cleos.deploy_contract_from_path(
'gpu.scd', 'gpu.scd',
'tests/contracts/gpu.scd', contract_path,
contract_name='skygpu',
create_account=False create_account=False
) )
@ -37,9 +40,13 @@ def skynet_cleos(cleos_bs):
'gpu.scd' 'gpu.scd'
) )
cleos.new_account('testworker') testworker_key = '5KRPFxF4RJebqPXqRzwStmCaEWeRfp3pR7XUNoA3zCHt5fnPu3s'
pub_key = cleos.import_key('testworker', testworker_key)
cleos.new_account('testworker', key=pub_key)
yield cleos cleos.wait_blocks(1)
yield GPUContractAPI(cleos), cleos
@pytest.fixture @pytest.fixture

View File

@ -1,416 +0,0 @@
{
"____comment": "This file was generated with eosio-abigen. DO NOT EDIT ",
"version": "eosio::abi/1.2",
"types": [],
"structs": [
{
"name": "account",
"base": "",
"fields": [
{
"name": "user",
"type": "name"
},
{
"name": "balance",
"type": "asset"
},
{
"name": "nonce",
"type": "uint64"
}
]
},
{
"name": "card",
"base": "",
"fields": [
{
"name": "id",
"type": "uint64"
},
{
"name": "owner",
"type": "name"
},
{
"name": "card_name",
"type": "string"
},
{
"name": "version",
"type": "string"
},
{
"name": "total_memory",
"type": "uint64"
},
{
"name": "mp_count",
"type": "uint32"
},
{
"name": "extra",
"type": "string"
}
]
},
{
"name": "clean",
"base": "",
"fields": []
},
{
"name": "config",
"base": "",
"fields": [
{
"name": "token_contract",
"type": "name"
},
{
"name": "token_symbol",
"type": "symbol"
}
]
},
{
"name": "dequeue",
"base": "",
"fields": [
{
"name": "user",
"type": "name"
},
{
"name": "request_id",
"type": "uint64"
}
]
},
{
"name": "enqueue",
"base": "",
"fields": [
{
"name": "user",
"type": "name"
},
{
"name": "request_body",
"type": "string"
},
{
"name": "binary_data",
"type": "string"
},
{
"name": "reward",
"type": "asset"
},
{
"name": "min_verification",
"type": "uint32"
}
]
},
{
"name": "global_configuration_struct",
"base": "",
"fields": [
{
"name": "token_contract",
"type": "name"
},
{
"name": "token_symbol",
"type": "symbol"
}
]
},
{
"name": "submit",
"base": "",
"fields": [
{
"name": "worker",
"type": "name"
},
{
"name": "request_id",
"type": "uint64"
},
{
"name": "request_hash",
"type": "checksum256"
},
{
"name": "result_hash",
"type": "checksum256"
},
{
"name": "ipfs_hash",
"type": "string"
}
]
},
{
"name": "withdraw",
"base": "",
"fields": [
{
"name": "user",
"type": "name"
},
{
"name": "quantity",
"type": "asset"
}
]
},
{
"name": "work_request_struct",
"base": "",
"fields": [
{
"name": "id",
"type": "uint64"
},
{
"name": "user",
"type": "name"
},
{
"name": "reward",
"type": "asset"
},
{
"name": "min_verification",
"type": "uint32"
},
{
"name": "nonce",
"type": "uint64"
},
{
"name": "body",
"type": "string"
},
{
"name": "binary_data",
"type": "string"
},
{
"name": "timestamp",
"type": "time_point_sec"
}
]
},
{
"name": "work_result_struct",
"base": "",
"fields": [
{
"name": "id",
"type": "uint64"
},
{
"name": "request_id",
"type": "uint64"
},
{
"name": "user",
"type": "name"
},
{
"name": "worker",
"type": "name"
},
{
"name": "result_hash",
"type": "checksum256"
},
{
"name": "ipfs_hash",
"type": "string"
},
{
"name": "submited",
"type": "time_point_sec"
}
]
},
{
"name": "workbegin",
"base": "",
"fields": [
{
"name": "worker",
"type": "name"
},
{
"name": "request_id",
"type": "uint64"
},
{
"name": "max_workers",
"type": "uint32"
}
]
},
{
"name": "workcancel",
"base": "",
"fields": [
{
"name": "worker",
"type": "name"
},
{
"name": "request_id",
"type": "uint64"
},
{
"name": "reason",
"type": "string"
}
]
},
{
"name": "worker",
"base": "",
"fields": [
{
"name": "account",
"type": "name"
},
{
"name": "joined",
"type": "time_point_sec"
},
{
"name": "left",
"type": "time_point_sec"
},
{
"name": "url",
"type": "string"
}
]
},
{
"name": "worker_status_struct",
"base": "",
"fields": [
{
"name": "worker",
"type": "name"
},
{
"name": "status",
"type": "string"
},
{
"name": "started",
"type": "time_point_sec"
}
]
}
],
"actions": [
{
"name": "clean",
"type": "clean",
"ricardian_contract": ""
},
{
"name": "config",
"type": "config",
"ricardian_contract": ""
},
{
"name": "dequeue",
"type": "dequeue",
"ricardian_contract": ""
},
{
"name": "enqueue",
"type": "enqueue",
"ricardian_contract": ""
},
{
"name": "submit",
"type": "submit",
"ricardian_contract": ""
},
{
"name": "withdraw",
"type": "withdraw",
"ricardian_contract": ""
},
{
"name": "workbegin",
"type": "workbegin",
"ricardian_contract": ""
},
{
"name": "workcancel",
"type": "workcancel",
"ricardian_contract": ""
}
],
"tables": [
{
"name": "cards",
"type": "card",
"index_type": "i64",
"key_names": [],
"key_types": []
},
{
"name": "config",
"type": "global_configuration_struct",
"index_type": "i64",
"key_names": [],
"key_types": []
},
{
"name": "queue",
"type": "work_request_struct",
"index_type": "i64",
"key_names": [],
"key_types": []
},
{
"name": "results",
"type": "work_result_struct",
"index_type": "i64",
"key_names": [],
"key_types": []
},
{
"name": "status",
"type": "worker_status_struct",
"index_type": "i64",
"key_names": [],
"key_types": []
},
{
"name": "users",
"type": "account",
"index_type": "i64",
"key_names": [],
"key_types": []
},
{
"name": "workers",
"type": "worker",
"index_type": "i64",
"key_names": [],
"key_types": []
}
],
"ricardian_clauses": [],
"variants": [],
"action_results": []
}

Binary file not shown.

@ -0,0 +1 @@
Subproject commit 87794ffb45340103159450694c56f241615431b2

View File

@ -1,13 +1,231 @@
import trio import trio
import pytest
from leap.errors import TransactionPushError
from msgspec import json from msgspec import json
from skynet.contract import RequestNotFound, ConfigNotFound
from skynet.types import BodyV0, BodyV0Params from skynet.types import BodyV0, BodyV0Params
from skynet._testing import open_test_worker from skynet._testing import open_test_worker
async def test_system(skynet_cleos):
gpu, cleos = skynet_cleos
# assert config can only be init once (done by fixture)
with pytest.raises(TransactionPushError):
await gpu.init_config('eosio.token', '4,TLOS')
# test clean function
# fill tables with data
# accounts
users = []
quantity = '1.0000 TLOS'
usr_num = 3
for _ in range(usr_num):
test_user = cleos.new_account()
cleos.transfer_token('eosio', test_user, quantity, 'clean testing')
await gpu.deposit(test_user, quantity)
# will throw if user not found
await gpu.get_user(test_user)
users.append(test_user)
# queue
for user in users:
await gpu.enqueue(
user,
BodyV0(
method='txt2img',
params=BodyV0Params(
prompt='ayy lmao',
model='skygpu/mocker',
step=1,
seed=0,
guidance=10.0
)
),
min_verification=2 # make reqs stay after 1 result
)
# check requests are in queue
queue = await gpu.get_queue()
assert len(queue) == usr_num
# workers
workers = []
quantity = '1.0000 TLOS'
wrk_num = 3
for _ in range(wrk_num):
worker = cleos.new_account()
await gpu.register_worker(worker, 'http://localhost')
# will throw if worker not found
await gpu.get_worker(worker)
workers.append(worker)
# status
for i in range(wrk_num):
req = queue[i]
worker = workers[i]
await gpu.accept_work(worker, req.id)
# will throw is status not found
await gpu.get_worker_status_for_request(req.id, worker)
# results
# make one of the workers finish to populate result
await gpu.submit_work(
workers[0],
queue[0].id,
'ff' * 32,
'null hash'
)
results = await gpu.get_results(queue[0].id)
assert len(results) == 1
# all tables populated
# run clean nuke == false
await gpu.clean_tables()
# assert tables empty
assert len(await gpu.get_queue()) == 0
for req in queue:
assert len(await gpu.get_statuses_for_request(req.id)) == 0
assert len(await gpu.get_results(req.id)) == 0
# check config, accounts and workers still there
await gpu.get_config() # raises if not found
assert len(await gpu.get_users()) == usr_num
assert len(await gpu.get_workers()) == wrk_num
# test nuke
await gpu.clean_tables(nuke=True)
with pytest.raises(ConfigNotFound):
await gpu.get_config()
assert len(await gpu.get_users()) == 0
assert len(await gpu.get_workers()) == 0
# re init config in case other tests run
await gpu.init_config('eosio.token', '4,TLOS')
async def test_balance(skynet_cleos):
gpu, cleos = skynet_cleos
# create fresh account
account = cleos.new_account()
# try call withdraw with no user account reg'd
with pytest.raises(TransactionPushError):
await gpu.withdraw(account, '1.0000 TLOS')
# give tokens and deposit to gpu
quantity = '1000.0000 TLOS'
cleos.transfer_token('eosio', account, quantity)
await gpu.deposit(account, quantity)
# check if balance increased
account_row = await gpu.get_user(account)
assert account_row.balance == quantity
# try call withdraw with more than deposited
with pytest.raises(TransactionPushError):
await gpu.withdraw(account, '1000.0001 TLOS')
# withdraw full correct amount
await gpu.withdraw(account, quantity)
# check if balance decreased
account_row = await gpu.get_user(account)
assert account_row.balance == '0.0000 TLOS'
async def test_worker_reg(skynet_cleos):
gpu, cleos = skynet_cleos
# create fresh account
worker = cleos.new_account()
url = 'https://nvidia.com'
await gpu.register_worker(worker, url)
# find and check vals
worker_row = await gpu.get_worker(worker)
assert worker_row.account == worker
assert worker_row.url == url
assert worker_row.joined != '1970-01-01T00:00:00'
assert worker_row.left == '1970-01-01T00:00:00'
# attempt to register twice
with pytest.raises(TransactionPushError):
await gpu.register_worker(worker, url)
# unregister
reason = 'testing'
await gpu.unregister_worker(worker, reason)
worker_row = await gpu.get_worker(worker)
assert worker_row.account == worker
assert worker_row.url == url
assert worker_row.left != '1970-01-01T00:00:00'
# attempt to unreg twice
with pytest.raises(TransactionPushError):
await gpu.unregister_worker(worker, reason)
async def test_queue(skynet_cleos):
gpu, cleos = skynet_cleos
body = BodyV0(
method='txt2img',
params=BodyV0Params(
prompt='cyberpunk hacker travis bickle dystopic alley graffiti',
model='skygpu/mocker',
step=4,
seed=0,
guidance=10.0
)
)
# create account
account = cleos.new_account()
quantity = '1000.0000 TLOS'
cleos.transfer_token('eosio', account, quantity)
# attempt to create request without prev deposit
with pytest.raises(TransactionPushError):
await gpu.enqueue(account, body)
# deposit tokens into gpu
await gpu.deposit(account, quantity)
# finally enqueue
req = await gpu.enqueue(account, body)
# search by id
req_found = await gpu.get_request(req.id)
assert req == req_found
# search by timestamp
reqs = await gpu.get_requests_since(60 * 60)
assert len(reqs) == 1
assert reqs[0] == req
# attempt to dequeue wrong req
with pytest.raises(TransactionPushError):
await gpu.dequeue(account, 999999)
# dequeue correctly
await gpu.dequeue(account, req.id)
# check deletion
with pytest.raises(RequestNotFound):
await gpu.get_request(req.id)
async def test_full_flow(inject_mockers, skynet_cleos, ipfs_node): async def test_full_flow(inject_mockers, skynet_cleos, ipfs_node):
cleos = skynet_cleos gpu, cleos = skynet_cleos
# create account and deposit tokens into gpu # create account and deposit tokens into gpu
account = cleos.new_account() account = cleos.new_account()

31
uv.lock
View File

@ -1599,7 +1599,7 @@ wheels = [
[[package]] [[package]]
name = "py-leap" name = "py-leap"
version = "0.1a35" version = "0.1a35"
source = { editable = "../py-leap" } source = { git = "https://github.com/guilledk/py-leap.git?branch=struct_unwrap#20f2e1f74e98e3d75984e8e1eee13c3100c17652" }
dependencies = [ dependencies = [
{ name = "base58" }, { name = "base58" },
{ name = "cryptos" }, { name = "cryptos" },
@ -1609,33 +1609,6 @@ dependencies = [
{ name = "ripemd-hash" }, { name = "ripemd-hash" },
] ]
[package.metadata]
requires-dist = [
{ name = "base58", specifier = ">=2.1.1,<3" },
{ name = "cryptos", specifier = ">=2.0.9,<3" },
{ name = "httpx", specifier = ">=0.28.1,<0.29" },
{ name = "msgspec", specifier = ">=0.19.0" },
{ name = "requests", specifier = "<2.32.0" },
{ name = "ripemd-hash", specifier = ">=1.0.1,<2" },
]
[package.metadata.requires-dev]
dev = [
{ name = "docker", specifier = ">=6.1.3,<7" },
{ name = "pdbpp", specifier = ">=0.10.3,<0.11" },
{ name = "pytest", specifier = ">=8.3.4,<9" },
{ name = "pytest-trio", specifier = ">=0.8.0,<0.9" },
]
docs = [
{ name = "sphinx", specifier = "==7.1.2" },
{ name = "sphinx-rtd-theme", specifier = "==1.3.0" },
]
snaps = [
{ name = "bs4", specifier = ">=0.0.2,<0.0.3" },
{ name = "tdqm", specifier = ">=0.0.1,<0.0.2" },
{ name = "zstandard", specifier = ">=0.21.0,<0.22" },
]
[[package]] [[package]]
name = "pycparser" name = "pycparser"
version = "2.22" version = "2.22"
@ -2152,7 +2125,7 @@ requires-dist = [
{ name = "outcome", specifier = ">=1.3.0.post0" }, { name = "outcome", specifier = ">=1.3.0.post0" },
{ name = "pillow", specifier = ">=10.0.1,<11" }, { name = "pillow", specifier = ">=10.0.1,<11" },
{ name = "protobuf", specifier = ">=5.29.3,<6" }, { name = "protobuf", specifier = ">=5.29.3,<6" },
{ name = "py-leap", editable = "../py-leap" }, { name = "py-leap", git = "https://github.com/guilledk/py-leap.git?branch=struct_unwrap" },
{ name = "pytz", specifier = "~=2023.3.post1" }, { name = "pytz", specifier = "~=2023.3.post1" },
{ name = "toml", specifier = ">=0.10.2,<0.11" }, { name = "toml", specifier = ">=0.10.2,<0.11" },
{ name = "trio", specifier = ">=0.22.2,<0.23" }, { name = "trio", specifier = ">=0.22.2,<0.23" },