diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 607a122..d5f29a8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,14 +7,57 @@ jobs: name: Pytest Tests runs-on: ubuntu-24.04 timeout-minutes: 10 + steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: submodules: recursive + - name: Install system dependencies (Rust + Python) + run: | + sudo apt-get update + sudo apt-get install -y \ + build-essential \ + pkg-config \ + libssl-dev \ + clang \ + lld \ + protobuf-compiler \ + make \ + wget + + - name: Install binaryen 120 + run: | + wget https://github.com/WebAssembly/binaryen/releases/download/version_120/binaryen-version_120-x86_64-linux.tar.gz + tar xvf binaryen-version_120-x86_64-linux.tar.gz + echo "$(pwd)/binaryen-version_120/bin" >> $GITHUB_PATH + - name: Install the latest version of uv uses: astral-sh/setup-uv@v5 + - name: Install Rust + run: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + echo "$HOME/.cargo/bin" >> $GITHUB_PATH + rustup install stable + rustup component add rust-src --toolchain stable + rustup target add wasm32-wasip1 + + - name: Set up Python environment and dependencies + run: | + uv venv .venv --python=3.12 + uv pip install -U rust-contracts-builder + echo "$(pwd)/.venv/bin" >> $GITHUB_PATH + + - name: Apply required modifications for rust-contracts-builder + run: | + sed -i "s/wasm32-wasi /wasm32-wasip1 /g" .venv/lib/python3.12/site-packages/rust_contracts_builder/__init__.py + sed -i "s/wasm32-wasi\//wasm32-wasip1\//g" .venv/lib/python3.12/site-packages/rust_contracts_builder/__init__.py + + - name: Build Rust project + run: rust-contract build + working-directory: tests/contracts/skygpu-contract + - uses: actions/cache@v3 name: Cache venv with: diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..90b8d3e --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "tests/contracts/skygpu-contract"] + path = tests/contracts/skygpu-contract + url = https://github.com/skygpu/skygpu-contract.git diff --git a/skynet/_testing.py b/skynet/_testing.py index 44eefba..fc599c5 100644 --- a/skynet/_testing.py +++ b/skynet/_testing.py @@ -16,13 +16,14 @@ async def open_test_worker( cleos, ipfs_node, account: str = 'testworker', permission: str = 'active', + key: str = '5KRPFxF4RJebqPXqRzwStmCaEWeRfp3pR7XUNoA3zCHt5fnPu3s', hf_token: str = '', **kwargs ): config = override_dgpu_config( account=account, permission=permission, - key=cleos.private_keys[account], + key=key, node_url=cleos.endpoint, ipfs_url=ipfs_node[1].endpoint, hf_token=hf_token, diff --git a/skynet/constants.py b/skynet/constants.py index 4de9b23..b5d255c 100755 --- a/skynet/constants.py +++ b/skynet/constants.py @@ -271,221 +271,3 @@ TG_MAX_WIDTH = 1280 TG_MAX_HEIGHT = 1280 DEFAULT_SINGLE_CARD_MAP = 'cuda:0' - -GPU_CONTRACT_ABI = { - "version": "eosio::abi/1.2", - "types": [], - "structs": [ - { - "name": "account", - "base": "", - "fields": [ - {"name": "user", "type": "name"}, - {"name": "balance", "type": "asset"}, - {"name": "nonce", "type": "uint64"} - ] - }, - { - "name": "card", - "base": "", - "fields": [ - {"name": "id", "type": "uint64"}, - {"name": "owner", "type": "name"}, - {"name": "card_name", "type": "string"}, - {"name": "version", "type": "string"}, - {"name": "total_memory", "type": "uint64"}, - {"name": "mp_count", "type": "uint32"}, - {"name": "extra", "type": "string"} - ] - }, - { - "name": "clean", - "base": "", - "fields": [] - }, - { - "name": "config", - "base": "", - "fields": [ - {"name": "token_contract", "type": "name"}, - {"name": "token_symbol", "type": "symbol"} - ] - }, - { - "name": "dequeue", - "base": "", - "fields": [ - {"name": "user", "type": "name"}, - {"name": "request_id", "type": "uint64"} - ] - }, - { - "name": "enqueue", - "base": "", - "fields": [ - {"name": "user", "type": "name"}, - {"name": "request_body", "type": "string"}, - {"name": "binary_data", "type": "string"}, - {"name": "reward", "type": "asset"}, - {"name": "min_verification", "type": "uint32"} - ] - }, - { - "name": "gcfgstruct", - "base": "", - "fields": [ - {"name": "token_contract", "type": "name"}, - {"name": "token_symbol", "type": "symbol"} - ] - }, - { - "name": "submit", - "base": "", - "fields": [ - {"name": "worker", "type": "name"}, - {"name": "request_id", "type": "uint64"}, - {"name": "request_hash", "type": "checksum256"}, - {"name": "result_hash", "type": "checksum256"}, - {"name": "ipfs_hash", "type": "string"} - ] - }, - { - "name": "withdraw", - "base": "", - "fields": [ - {"name": "user", "type": "name"}, - {"name": "quantity", "type": "asset"} - ] - }, - { - "name": "work_request_struct", - "base": "", - "fields": [ - {"name": "id", "type": "uint64"}, - {"name": "user", "type": "name"}, - {"name": "reward", "type": "asset"}, - {"name": "min_verification", "type": "uint32"}, - {"name": "nonce", "type": "uint64"}, - {"name": "body", "type": "string"}, - {"name": "binary_data", "type": "string"}, - {"name": "timestamp", "type": "time_point_sec"} - ] - }, - { - "name": "work_result_struct", - "base": "", - "fields": [ - {"name": "id", "type": "uint64"}, - {"name": "request_id", "type": "uint64"}, - {"name": "user", "type": "name"}, - {"name": "worker", "type": "name"}, - {"name": "result_hash", "type": "checksum256"}, - {"name": "ipfs_hash", "type": "string"}, - {"name": "submited", "type": "time_point_sec"} - ] - }, - { - "name": "workbegin", - "base": "", - "fields": [ - {"name": "worker", "type": "name"}, - {"name": "request_id", "type": "uint64"}, - {"name": "max_workers", "type": "uint32"} - ] - }, - { - "name": "workcancel", - "base": "", - "fields": [ - {"name": "worker", "type": "name"}, - {"name": "request_id", "type": "uint64"}, - {"name": "reason", "type": "string"} - ] - }, - { - "name": "worker", - "base": "", - "fields": [ - {"name": "account", "type": "name"}, - {"name": "joined", "type": "time_point_sec"}, - {"name": "left", "type": "time_point_sec"}, - {"name": "url", "type": "string"} - ] - }, - { - "name": "worker_status_struct", - "base": "", - "fields": [ - {"name": "worker", "type": "name"}, - {"name": "status", "type": "string"}, - {"name": "started", "type": "time_point_sec"} - ] - } - ], - "actions": [ - {"name": "clean", "type": "clean", "ricardian_contract": ""}, - {"name": "config", "type": "config", "ricardian_contract": ""}, - {"name": "dequeue", "type": "dequeue", "ricardian_contract": ""}, - {"name": "enqueue", "type": "enqueue", "ricardian_contract": ""}, - {"name": "submit", "type": "submit", "ricardian_contract": ""}, - {"name": "withdraw", "type": "withdraw", "ricardian_contract": ""}, - {"name": "workbegin", "type": "workbegin", "ricardian_contract": ""}, - {"name": "workcancel", "type": "workcancel", "ricardian_contract": ""} - ], - "tables": [ - { - "name": "cards", - "index_type": "i64", - "key_names": [], - "key_types": [], - "type": "card" - }, - { - "name": "gcfgstruct", - "index_type": "i64", - "key_names": [], - "key_types": [], - "type": "gcfgstruct" - }, - { - "name": "queue", - "index_type": "i64", - "key_names": [], - "key_types": [], - "type": "work_request_struct" - }, - { - "name": "results", - "index_type": "i64", - "key_names": [], - "key_types": [], - "type": "work_result_struct" - }, - { - "name": "status", - "index_type": "i64", - "key_names": [], - "key_types": [], - "type": "worker_status_struct" - }, - { - "name": "users", - "index_type": "i64", - "key_names": [], - "key_types": [], - "type": "account" - }, - { - "name": "workers", - "index_type": "i64", - "key_names": [], - "key_types": [], - "type": "worker" - } - ], - "ricardian_clauses": [], - "error_messages": [], - "abi_extensions": [], - "variants": [], - "action_results": [] -} diff --git a/skynet/contract.py b/skynet/contract.py new file mode 100644 index 0000000..11e0412 --- /dev/null +++ b/skynet/contract.py @@ -0,0 +1,304 @@ +import time + +import msgspec +from leap import CLEOS +from leap.protocol import Name + +from skynet.types import ( + ConfigV1, + AccountV1, + WorkerV0, + RequestV1, + BodyV0, + WorkerStatusV0, + ResultV0 +) + + +class ConfigNotFound(BaseException): + ... + +class AccountNotFound(BaseException): + ... + +class WorkerNotFound(BaseException): + ... + +class RequestNotFound(BaseException): + ... + +class WorkerStatusNotFound(BaseException): + ... + + +class GPUContractAPI: + + def __init__(self, cleos: CLEOS): + self.receiver = 'gpu.scd' + self._cleos = cleos + + # views into data + + async def get_config(self) -> ConfigV1: + rows = await self._cleos.aget_table( + self.receiver, self.receiver, 'config', + resp_cls=ConfigV1 + ) + if len(rows) == 0: + raise ConfigNotFound() + + return rows[0] + + async def get_user(self, user: str) -> AccountV1: + rows = await self._cleos.aget_table( + self.receiver, self.receiver, 'users', + key_type='name', + lower_bound=user, + upper_bound=user, + resp_cls=AccountV1 + ) + if len(rows) == 0: + raise AccountNotFound(user) + + return rows[0] + + async def get_users(self) -> list[AccountV1]: + return await self._cleos.aget_table(self.receiver, self.receiver, 'users', resp_cls=AccountV1) + + async def get_worker(self, worker: str) -> WorkerV0: + rows = await self._cleos.aget_table( + self.receiver, self.receiver, 'workers', + key_type='name', + lower_bound=worker, + upper_bound=worker, + resp_cls=WorkerV0 + ) + if len(rows) == 0: + raise WorkerNotFound(worker) + + return rows[0] + + async def get_workers(self) -> list[AccountV1]: + return await self._cleos.aget_table(self.receiver, self.receiver, 'workers', resp_cls=WorkerV0) + + async def get_queue(self) -> RequestV1: + return await self._cleos.aget_table(self.receiver, self.receiver, 'queue', resp_cls=RequestV1) + + async def get_request(self, request_id: int) -> RequestV1: + rows = await self._cleos.aget_table( + self.receiver, self.receiver, 'queue', + lower_bound=request_id, + upper_bound=request_id, + resp_cls=RequestV1 + ) + if len(rows) == 0: + raise RequestNotFound(request_id) + + return rows[0] + + async def get_requests_since(self, seconds: int) -> list[RequestV1]: + return await self._cleos.aget_table( + self.receiver, self.receiver, 'queue', + index_position=2, + key_type='i64', + lower_bound=int(time.time()) - seconds, + resp_cls=RequestV1 + ) + + async def get_statuses_for_request(self, request_id: int) -> list[WorkerStatusV0]: + return await self._cleos.aget_table( + self.receiver, str(Name.from_int(request_id)), 'status', + resp_cls=WorkerStatusV0 + ) + + async def get_worker_status_for_request(self, request_id: int, worker: str) -> WorkerStatusV0: + rows = await self._cleos.aget_table( + self.receiver, str(Name.from_int(request_id)), 'status', + key_type='name', + lower_bound=worker, + upper_bound=worker, + resp_cls=WorkerStatusV0 + ) + if len(rows) == 0: + raise WorkerStatusNotFound(request_id) + + return rows[0] + + async def get_results(self, request_id: int) -> list[ResultV0]: + return await self._cleos.aget_table( + self.receiver, self.receiver, 'results', + index_position=2, + key_type='i64', + lower_bound=request_id, + upper_bound=request_id, + resp_cls=ResultV0 + ) + + async def get_worker_results(self, worker: str) -> list[ResultV0]: + return await self._cleos.aget_table( + self.receiver, self.receiver, 'results', + index_position=4, + key_type='name', + lower_bound=worker, + upper_bound=worker, + resp_cls=ResultV0 + ) + + # system actions + async def init_config(self, token_account: str, token_symbol: str): + return await self._cleos.a_push_action( + self.receiver, + 'config', + [token_account, token_symbol], + self.receiver + ) + + async def clean_tables(self, nuke: bool = False): + return await self._cleos.a_push_action( + self.receiver, + 'clean', + [nuke], + self.receiver + ) + + # balance actions + + async def deposit(self, user: str, quantity: str): + return await self._cleos.a_push_action( + 'eosio.token', + 'transfer', + [user, self.receiver, quantity, 'testing gpu deposit'], + user, + key=self._cleos.private_keys[user] + ) + + async def withdraw(self, user: str, quantity: str): + return await self._cleos.a_push_action( + self.receiver, + 'withdraw', + [user, quantity], + user, + key=self._cleos.private_keys[user] + ) + + # worker actions + + async def register_worker( + self, + worker: str, + url: str + ): + return await self._cleos.a_push_action( + self.receiver, + 'regworker', + [worker, url], + worker, + key=self._cleos.private_keys[worker] + ) + + async def unregister_worker( + self, + worker: str, + reason: str + ): + return await self._cleos.a_push_action( + self.receiver, + 'unregworker', + [worker, reason], + worker, + key=self._cleos.private_keys[worker] + ) + + async def accept_work( + self, + worker: str, + request_id: int, + max_workers: int = 10 + ): + return await self._cleos.a_push_action( + self.receiver, + 'workbegin', + [worker, request_id, max_workers], + worker, + key=self._cleos.private_keys[worker] + ) + + async def cancel_work( + self, + worker: str, + request_id: int, + reason: str + ): + return await self._cleos.a_push_action( + self.receiver, + 'workcancel', + [worker, request_id, reason], + worker, + key=self._cleos.private_keys[worker] + ) + + async def submit_work( + self, + worker: str, + request_id: int, + result_hash: str, + ipfs_hash: str + ): + return await self._cleos.a_push_action( + self.receiver, + 'submit', + [worker, request_id, result_hash, ipfs_hash], + worker, + key=self._cleos.private_keys[worker] + ) + + # user actions + + async def enqueue( + self, + account: str, + body: BodyV0, + binary_data: str = '', + reward: str = '1.0000 TLOS', + min_verification: int = 1 + ) -> int: + + body = msgspec.json.encode(body).decode('utf-8') + result = await self._cleos.a_push_action( + self.receiver, + 'enqueue', + [ + account, + body, + binary_data, + reward, + min_verification + ], + account, + key=self._cleos.private_keys[account] + ) + console = result['processed']['action_traces'][0]['console'] + nonce_index = -1 + timestamp_index = -2 + lines = console.rstrip().split('\n') + nonce = int(lines[nonce_index]) + timestamp = lines[timestamp_index] + + return RequestV1( + id=int(nonce), + user=account, + reward=reward, + min_verification=min_verification, + body=body, + binary_data=binary_data, + timestamp=timestamp + ) + + async def dequeue(self, user: str, request_id: int): + return await self._cleos.a_push_action( + self.receiver, + 'dequeue', + [user, request_id], + user, + key=self._cleos.private_keys[user] + ) diff --git a/skynet/dgpu/daemon.py b/skynet/dgpu/daemon.py index 2b47069..2d09d3e 100755 --- a/skynet/dgpu/daemon.py +++ b/skynet/dgpu/daemon.py @@ -1,6 +1,5 @@ import logging from functools import partial -from hashlib import sha256 import trio import msgspec @@ -22,7 +21,7 @@ from skynet.dgpu.network import ( async def maybe_update_tui_balance(conn: NetConnector): async def _fn(tui): # update balance - balance = await conn.get_worker_balance() + balance = await conn.contract.get_user(tui.config.account).balance tui.set_header_text(new_balance=f'balance: {balance}') await maybe_update_tui_async(_fn) @@ -101,24 +100,12 @@ async def maybe_serve_one( f'IPFS fetch input error !?! retries left {retry - r - 1}\n' ) - # compute unique request hash used on submit - hash_str = ( - str(req.nonce) - + - req.body - + - req.binary_data - ) - logging.debug(f'hashing: {hash_str}') - request_hash = sha256(hash_str.encode('utf-8')).hexdigest() - logging.info(f'calculated request hash: {request_hash}') - total_step = body.params.step mode = body.method # TODO: validate request - resp = await conn.begin_work(req.id) + resp = await conn.contract.accept_work(config.account, req.id) if not resp or 'code' in resp: logging.info('begin_work error, probably being worked on already... skip.') return @@ -157,7 +144,9 @@ async def maybe_serve_one( ipfs_hash = await conn.publish_on_ipfs(output, typ=output_type) - await conn.submit_work(req.id, request_hash, output_hash, ipfs_hash) + await conn.contract.submit_work(config.account, req.id, output_hash, ipfs_hash) + + await state_mngr.update_state() await maybe_update_tui_balance(conn) @@ -168,8 +157,10 @@ async def maybe_serve_one( if 'network cancel' not in str(err): logging.exception('Failed to serve model request !?\n') + await state_mngr.update_state() + if state_mngr.is_request_in_progress(req.id): - await conn.cancel_work(req.id, 'reason not provided') + await conn.contract.cancel_work(config.account, req.id, 'reason not provided') async def dgpu_serve_forever( diff --git a/skynet/dgpu/network.py b/skynet/dgpu/network.py index 6a8350b..cc6add5 100755 --- a/skynet/dgpu/network.py +++ b/skynet/dgpu/network.py @@ -15,18 +15,15 @@ import outcome import msgspec from PIL import Image from leap.cleos import CLEOS -from leap.protocol import Asset from skynet.dgpu.tui import maybe_update_tui from skynet.config import DgpuConfig as Config, load_skynet_toml from skynet.types import ( - ConfigV0, - AccountV0, BodyV0, - RequestV0, + RequestV1, WorkerStatusV0, ResultV0 ) -from skynet.constants import GPU_CONTRACT_ABI +from skynet.contract import GPUContractAPI from skynet.ipfs import ( AsyncIPFSHTTP, @@ -70,178 +67,16 @@ class NetConnector: def __init__(self, config: Config): self.config = config self.cleos = CLEOS(endpoint=config.node_url) - self.cleos.load_abi('gpu.scd', GPU_CONTRACT_ABI) + self.cleos.import_key(config.account, config.key) + abi = self.cleos.get_abi('gpu.scd') + self.cleos.load_abi('gpu.scd', abi) + + self.contract = GPUContractAPI(self.cleos) self.ipfs_client = AsyncIPFSHTTP(config.ipfs_url) maybe_update_tui(lambda tui: tui.set_header_text(new_worker_name=self.config.account)) - - # blockchain helpers - - async def get_work_requests_last_hour(self) -> list[RequestV0]: - logging.info('get_work_requests_last_hour') - rows = await failable( - partial( - self.cleos.aget_table, - 'gpu.scd', 'gpu.scd', 'queue', - index_position=2, - key_type='i64', - lower_bound=int(time.time()) - 3600, - resp_cls=RequestV0 - ), ret_fail=[]) - - logging.info(f'found {len(rows)} requests on queue') - return rows - - async def get_status_by_request_id(self, request_id: int) -> list[WorkerStatusV0]: - logging.info('get_status_by_request_id') - rows = await failable( - partial( - self.cleos.aget_table, - 'gpu.scd', request_id, 'status', resp_cls=WorkerStatusV0), ret_fail=[]) - - logging.info(f'found status for workers: {[r.worker for r in rows]}') - return rows - - async def get_global_config(self) -> ConfigV0: - logging.info('get_global_config') - rows = await failable( - partial( - self.cleos.aget_table, - 'gpu.scd', 'gpu.scd', 'config', - resp_cls=ConfigV0)) - - if rows: - cfg = rows[0] - logging.info(f'config found: {cfg}') - return cfg - else: - logging.error('global config not found, is the contract initialized?') - return None - - async def get_worker_balance(self) -> str: - logging.info('get_worker_balance') - rows = await failable( - partial( - self.cleos.aget_table, - 'gpu.scd', 'gpu.scd', 'users', - index_position=1, - key_type='name', - lower_bound=self.config.account, - upper_bound=self.config.account, - resp_cls=AccountV0 - )) - - if rows: - b = rows[0].balance - logging.info(f'balance: {b}') - return b - else: - logging.info('no balance info found') - return None - - async def begin_work(self, request_id: int): - ''' - Publish to the bc that the worker is beginning a model-computation - step. - - ''' - logging.info(f'begin_work on #{request_id}') - return await failable( - partial( - self.cleos.a_push_action, - 'gpu.scd', - 'workbegin', - list({ - 'worker': self.config.account, - 'request_id': request_id, - 'max_workers': 2 - }.values()), - self.config.account, self.config.key, - permission=self.config.permission - ) - ) - - async def cancel_work(self, request_id: int, reason: str): - logging.info(f'cancel_work on #{request_id}') - return await failable( - partial( - self.cleos.a_push_action, - 'gpu.scd', - 'workcancel', - list({ - 'worker': self.config.account, - 'request_id': request_id, - 'reason': reason - }.values()), - self.config.account, self.config.key, - permission=self.config.permission - ) - ) - - async def maybe_withdraw_all(self): - logging.info('maybe_withdraw_all') - balance = await self.get_worker_balance() - if not balance: - return - - balance_amount = float(balance.split(' ')[0]) - if balance_amount > 0: - await failable( - partial( - self.cleos.a_push_action, - 'gpu.scd', - 'withdraw', - list({ - 'user': self.config.account, - 'quantity': Asset.from_str(balance) - }.values()), - self.config.account, self.config.key, - permission=self.config.permission - ) - ) - - async def find_results(self) -> list[ResultV0]: - logging.info('find_results') - rows = await failable( - partial( - self.cleos.aget_table, - 'gpu.scd', 'gpu.scd', 'results', - index_position=4, - key_type='name', - lower_bound=self.config.account, - upper_bound=self.config.account, - resp_cls=ResultV0 - ) - ) - return rows - - async def submit_work( - self, - request_id: int, - request_hash: str, - result_hash: str, - ipfs_hash: str - ): - logging.info(f'submit_work #{request_id}') - return await failable( - partial( - self.cleos.a_push_action, - 'gpu.scd', - 'submit', - list({ - 'worker': self.config.account, - 'request_id': request_id, - 'request_hash': request_hash, - 'result_hash': result_hash, - 'ipfs_hash': ipfs_hash - }.values()), - self.config.account, self.config.key, - permission=self.config.permission - ) - ) - # IPFS helpers async def publish_on_ipfs(self, raw, typ: str = 'png'): Path('ipfs-staging').mkdir(exist_ok=True) @@ -302,9 +137,10 @@ class ContractState: def __init__(self, conn: NetConnector): self._conn = conn + self._config = load_skynet_toml().dgpu self._poll_index = 0 - self._queue: list[RequestV0] = [] + self._queue: list[RequestV1] = [] self._status_by_rid: dict[int, list[WorkerStatusV0]] = {} self._results: list[ResultV0] = [] @@ -315,10 +151,10 @@ class ContractState: return self._poll_index async def _fetch_results(self): - self._results = await self._conn.find_results() + self._results = await self._conn.contract.get_worker_results(self._config.account) async def _fetch_statuses_for_id(self, rid: int): - self._status_by_rid[rid] = await self._conn.get_status_by_request_id(rid) + self._status_by_rid[rid] = await self._conn.contract.get_statuses_for_request(rid) async def update_state(self): ''' @@ -326,7 +162,7 @@ class ContractState: ''' # raw queue from chain - _queue = await self._conn.get_work_requests_last_hour() + _queue = await self._conn.contract.get_requests_since(3600) # filter out invalids self._queue = [] @@ -380,7 +216,7 @@ class ContractState: return len(self._queue) @property - def first(self) -> RequestV0 | None: + def first(self) -> RequestV1 | None: if len(self._queue) > 0: return self._queue[0] @@ -391,7 +227,7 @@ class ContractState: return set(( status.worker for status in self._status_by_rid[request_id] - if status.worker != self._conn.config.account + if status.worker != self._config.account )) # predicates diff --git a/skynet/types.py b/skynet/types.py index 4e7c3a0..4b44c1d 100644 --- a/skynet/types.py +++ b/skynet/types.py @@ -39,6 +39,25 @@ class ConfigV0: token_contract: str token_symbol: str +''' +ConfigV1 + +singleton containing global info about system, definition: +```rust +#[chain(table="config", singleton)] +pub struct Config { + token_account: Name, + token_symbol: Symbol, + global_nonce: u64 +} +``` +''' + +class ConfigV1(Struct): + token_account: str + token_symbol: str + global_nonce: int + ''' RequestV0 @@ -103,6 +122,38 @@ class RequestV0(Struct): binary_data: str timestamp: str +''' +RequestV1 + +a request placed on the queue, definition: +NEW: nonce field removed + +scope: self.receiver + +```rust +#[chain(table="queue")] +pub struct Request { + #[chain(primary)] + id: u64, + user: Name, + reward: Asset, + min_verification: u32, + body: String, + binary_data: String, + #[chain(secondary)] + timestamp: TimePointSec +} +``` +''' +class RequestV1(Struct): + id: int + user: str + reward: str + min_verification: int + body: str + binary_data: str + timestamp: str + ''' AccountV0 @@ -128,6 +179,27 @@ class AccountV0(Struct): balance: str nonce: int +''' +AccountV1 + +a user account, users must deposit tokens in order to enqueue requests, definition: + +scope: self.receiver + +```rust +#[chain(table="users")] +pub struct Account { + #[chain(primary)] + user: Name, + balance: Asset +} +``` +''' + +class AccountV1(Struct): + user: str + balance: str + ''' WorkerV0 diff --git a/tests/conftest.py b/tests/conftest.py index 7cb2fc0..888a94a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ import pytest from skynet.ipfs import AsyncIPFSHTTP +from skynet.contract import GPUContractAPI from skynet._testing import override_dgpu_config @@ -24,9 +25,11 @@ def skynet_cleos(cleos_bs): # cleos.import_key('gpu.scd', priv) cleos.new_account('gpu.scd', ram=4200000) + contract_path = 'tests/contracts/skygpu-contract/target' cleos.deploy_contract_from_path( 'gpu.scd', - 'tests/contracts/gpu.scd', + contract_path, + contract_name='skygpu', create_account=False ) @@ -37,9 +40,13 @@ def skynet_cleos(cleos_bs): 'gpu.scd' ) - cleos.new_account('testworker') + testworker_key = '5KRPFxF4RJebqPXqRzwStmCaEWeRfp3pR7XUNoA3zCHt5fnPu3s' + pub_key = cleos.import_key('testworker', testworker_key) + cleos.new_account('testworker', key=pub_key) - yield cleos + cleos.wait_blocks(1) + + yield GPUContractAPI(cleos), cleos @pytest.fixture diff --git a/tests/contracts/gpu.scd/gpu.scd.abi b/tests/contracts/gpu.scd/gpu.scd.abi deleted file mode 100644 index f3708bf..0000000 --- a/tests/contracts/gpu.scd/gpu.scd.abi +++ /dev/null @@ -1,416 +0,0 @@ -{ - "____comment": "This file was generated with eosio-abigen. DO NOT EDIT ", - "version": "eosio::abi/1.2", - "types": [], - "structs": [ - { - "name": "account", - "base": "", - "fields": [ - { - "name": "user", - "type": "name" - }, - { - "name": "balance", - "type": "asset" - }, - { - "name": "nonce", - "type": "uint64" - } - ] - }, - { - "name": "card", - "base": "", - "fields": [ - { - "name": "id", - "type": "uint64" - }, - { - "name": "owner", - "type": "name" - }, - { - "name": "card_name", - "type": "string" - }, - { - "name": "version", - "type": "string" - }, - { - "name": "total_memory", - "type": "uint64" - }, - { - "name": "mp_count", - "type": "uint32" - }, - { - "name": "extra", - "type": "string" - } - ] - }, - { - "name": "clean", - "base": "", - "fields": [] - }, - { - "name": "config", - "base": "", - "fields": [ - { - "name": "token_contract", - "type": "name" - }, - { - "name": "token_symbol", - "type": "symbol" - } - ] - }, - { - "name": "dequeue", - "base": "", - "fields": [ - { - "name": "user", - "type": "name" - }, - { - "name": "request_id", - "type": "uint64" - } - ] - }, - { - "name": "enqueue", - "base": "", - "fields": [ - { - "name": "user", - "type": "name" - }, - { - "name": "request_body", - "type": "string" - }, - { - "name": "binary_data", - "type": "string" - }, - { - "name": "reward", - "type": "asset" - }, - { - "name": "min_verification", - "type": "uint32" - } - ] - }, - { - "name": "global_configuration_struct", - "base": "", - "fields": [ - { - "name": "token_contract", - "type": "name" - }, - { - "name": "token_symbol", - "type": "symbol" - } - ] - }, - { - "name": "submit", - "base": "", - "fields": [ - { - "name": "worker", - "type": "name" - }, - { - "name": "request_id", - "type": "uint64" - }, - { - "name": "request_hash", - "type": "checksum256" - }, - { - "name": "result_hash", - "type": "checksum256" - }, - { - "name": "ipfs_hash", - "type": "string" - } - ] - }, - { - "name": "withdraw", - "base": "", - "fields": [ - { - "name": "user", - "type": "name" - }, - { - "name": "quantity", - "type": "asset" - } - ] - }, - { - "name": "work_request_struct", - "base": "", - "fields": [ - { - "name": "id", - "type": "uint64" - }, - { - "name": "user", - "type": "name" - }, - { - "name": "reward", - "type": "asset" - }, - { - "name": "min_verification", - "type": "uint32" - }, - { - "name": "nonce", - "type": "uint64" - }, - { - "name": "body", - "type": "string" - }, - { - "name": "binary_data", - "type": "string" - }, - { - "name": "timestamp", - "type": "time_point_sec" - } - ] - }, - { - "name": "work_result_struct", - "base": "", - "fields": [ - { - "name": "id", - "type": "uint64" - }, - { - "name": "request_id", - "type": "uint64" - }, - { - "name": "user", - "type": "name" - }, - { - "name": "worker", - "type": "name" - }, - { - "name": "result_hash", - "type": "checksum256" - }, - { - "name": "ipfs_hash", - "type": "string" - }, - { - "name": "submited", - "type": "time_point_sec" - } - ] - }, - { - "name": "workbegin", - "base": "", - "fields": [ - { - "name": "worker", - "type": "name" - }, - { - "name": "request_id", - "type": "uint64" - }, - { - "name": "max_workers", - "type": "uint32" - } - ] - }, - { - "name": "workcancel", - "base": "", - "fields": [ - { - "name": "worker", - "type": "name" - }, - { - "name": "request_id", - "type": "uint64" - }, - { - "name": "reason", - "type": "string" - } - ] - }, - { - "name": "worker", - "base": "", - "fields": [ - { - "name": "account", - "type": "name" - }, - { - "name": "joined", - "type": "time_point_sec" - }, - { - "name": "left", - "type": "time_point_sec" - }, - { - "name": "url", - "type": "string" - } - ] - }, - { - "name": "worker_status_struct", - "base": "", - "fields": [ - { - "name": "worker", - "type": "name" - }, - { - "name": "status", - "type": "string" - }, - { - "name": "started", - "type": "time_point_sec" - } - ] - } - ], - "actions": [ - { - "name": "clean", - "type": "clean", - "ricardian_contract": "" - }, - { - "name": "config", - "type": "config", - "ricardian_contract": "" - }, - { - "name": "dequeue", - "type": "dequeue", - "ricardian_contract": "" - }, - { - "name": "enqueue", - "type": "enqueue", - "ricardian_contract": "" - }, - { - "name": "submit", - "type": "submit", - "ricardian_contract": "" - }, - { - "name": "withdraw", - "type": "withdraw", - "ricardian_contract": "" - }, - { - "name": "workbegin", - "type": "workbegin", - "ricardian_contract": "" - }, - { - "name": "workcancel", - "type": "workcancel", - "ricardian_contract": "" - } - ], - "tables": [ - { - "name": "cards", - "type": "card", - "index_type": "i64", - "key_names": [], - "key_types": [] - }, - { - "name": "config", - "type": "global_configuration_struct", - "index_type": "i64", - "key_names": [], - "key_types": [] - }, - { - "name": "queue", - "type": "work_request_struct", - "index_type": "i64", - "key_names": [], - "key_types": [] - }, - { - "name": "results", - "type": "work_result_struct", - "index_type": "i64", - "key_names": [], - "key_types": [] - }, - { - "name": "status", - "type": "worker_status_struct", - "index_type": "i64", - "key_names": [], - "key_types": [] - }, - { - "name": "users", - "type": "account", - "index_type": "i64", - "key_names": [], - "key_types": [] - }, - { - "name": "workers", - "type": "worker", - "index_type": "i64", - "key_names": [], - "key_types": [] - } - ], - "ricardian_clauses": [], - "variants": [], - "action_results": [] -} \ No newline at end of file diff --git a/tests/contracts/gpu.scd/gpu.scd.wasm b/tests/contracts/gpu.scd/gpu.scd.wasm deleted file mode 100755 index 6881852..0000000 Binary files a/tests/contracts/gpu.scd/gpu.scd.wasm and /dev/null differ diff --git a/tests/contracts/skygpu-contract b/tests/contracts/skygpu-contract new file mode 160000 index 0000000..87794ff --- /dev/null +++ b/tests/contracts/skygpu-contract @@ -0,0 +1 @@ +Subproject commit 87794ffb45340103159450694c56f241615431b2 diff --git a/tests/test_chain.py b/tests/test_chain.py index 97adfe6..4f23848 100644 --- a/tests/test_chain.py +++ b/tests/test_chain.py @@ -1,13 +1,231 @@ import trio +import pytest +from leap.errors import TransactionPushError from msgspec import json +from skynet.contract import RequestNotFound, ConfigNotFound from skynet.types import BodyV0, BodyV0Params - from skynet._testing import open_test_worker +async def test_system(skynet_cleos): + gpu, cleos = skynet_cleos + + # assert config can only be init once (done by fixture) + with pytest.raises(TransactionPushError): + await gpu.init_config('eosio.token', '4,TLOS') + + # test clean function + + # fill tables with data + + # accounts + users = [] + quantity = '1.0000 TLOS' + usr_num = 3 + for _ in range(usr_num): + test_user = cleos.new_account() + cleos.transfer_token('eosio', test_user, quantity, 'clean testing') + await gpu.deposit(test_user, quantity) + # will throw if user not found + await gpu.get_user(test_user) + users.append(test_user) + + # queue + for user in users: + await gpu.enqueue( + user, + BodyV0( + method='txt2img', + params=BodyV0Params( + prompt='ayy lmao', + model='skygpu/mocker', + step=1, + seed=0, + guidance=10.0 + ) + ), + min_verification=2 # make reqs stay after 1 result + ) + + # check requests are in queue + queue = await gpu.get_queue() + assert len(queue) == usr_num + + # workers + workers = [] + quantity = '1.0000 TLOS' + wrk_num = 3 + for _ in range(wrk_num): + worker = cleos.new_account() + await gpu.register_worker(worker, 'http://localhost') + # will throw if worker not found + await gpu.get_worker(worker) + workers.append(worker) + + # status + for i in range(wrk_num): + req = queue[i] + worker = workers[i] + await gpu.accept_work(worker, req.id) + # will throw is status not found + await gpu.get_worker_status_for_request(req.id, worker) + + # results + # make one of the workers finish to populate result + await gpu.submit_work( + workers[0], + queue[0].id, + 'ff' * 32, + 'null hash' + ) + + results = await gpu.get_results(queue[0].id) + assert len(results) == 1 + + # all tables populated + + # run clean nuke == false + await gpu.clean_tables() + + # assert tables empty + assert len(await gpu.get_queue()) == 0 + for req in queue: + assert len(await gpu.get_statuses_for_request(req.id)) == 0 + assert len(await gpu.get_results(req.id)) == 0 + + # check config, accounts and workers still there + await gpu.get_config() # raises if not found + assert len(await gpu.get_users()) == usr_num + assert len(await gpu.get_workers()) == wrk_num + + # test nuke + await gpu.clean_tables(nuke=True) + with pytest.raises(ConfigNotFound): + await gpu.get_config() + + assert len(await gpu.get_users()) == 0 + assert len(await gpu.get_workers()) == 0 + + # re init config in case other tests run + await gpu.init_config('eosio.token', '4,TLOS') + + +async def test_balance(skynet_cleos): + gpu, cleos = skynet_cleos + + # create fresh account + account = cleos.new_account() + + # try call withdraw with no user account reg'd + with pytest.raises(TransactionPushError): + await gpu.withdraw(account, '1.0000 TLOS') + + # give tokens and deposit to gpu + quantity = '1000.0000 TLOS' + cleos.transfer_token('eosio', account, quantity) + await gpu.deposit(account, quantity) + + # check if balance increased + account_row = await gpu.get_user(account) + assert account_row.balance == quantity + + # try call withdraw with more than deposited + with pytest.raises(TransactionPushError): + await gpu.withdraw(account, '1000.0001 TLOS') + + # withdraw full correct amount + await gpu.withdraw(account, quantity) + + # check if balance decreased + account_row = await gpu.get_user(account) + assert account_row.balance == '0.0000 TLOS' + +async def test_worker_reg(skynet_cleos): + gpu, cleos = skynet_cleos + + # create fresh account + worker = cleos.new_account() + url = 'https://nvidia.com' + + await gpu.register_worker(worker, url) + + # find and check vals + worker_row = await gpu.get_worker(worker) + assert worker_row.account == worker + assert worker_row.url == url + assert worker_row.joined != '1970-01-01T00:00:00' + assert worker_row.left == '1970-01-01T00:00:00' + + # attempt to register twice + with pytest.raises(TransactionPushError): + await gpu.register_worker(worker, url) + + # unregister + reason = 'testing' + await gpu.unregister_worker(worker, reason) + + worker_row = await gpu.get_worker(worker) + assert worker_row.account == worker + assert worker_row.url == url + assert worker_row.left != '1970-01-01T00:00:00' + + # attempt to unreg twice + with pytest.raises(TransactionPushError): + await gpu.unregister_worker(worker, reason) + + +async def test_queue(skynet_cleos): + gpu, cleos = skynet_cleos + + body = BodyV0( + method='txt2img', + params=BodyV0Params( + prompt='cyberpunk hacker travis bickle dystopic alley graffiti', + model='skygpu/mocker', + step=4, + seed=0, + guidance=10.0 + ) + ) + + # create account + account = cleos.new_account() + quantity = '1000.0000 TLOS' + cleos.transfer_token('eosio', account, quantity) + + # attempt to create request without prev deposit + with pytest.raises(TransactionPushError): + await gpu.enqueue(account, body) + + # deposit tokens into gpu + await gpu.deposit(account, quantity) + + # finally enqueue + req = await gpu.enqueue(account, body) + + # search by id + req_found = await gpu.get_request(req.id) + assert req == req_found + + # search by timestamp + reqs = await gpu.get_requests_since(60 * 60) + assert len(reqs) == 1 + assert reqs[0] == req + + # attempt to dequeue wrong req + with pytest.raises(TransactionPushError): + await gpu.dequeue(account, 999999) + + # dequeue correctly + await gpu.dequeue(account, req.id) + + # check deletion + with pytest.raises(RequestNotFound): + await gpu.get_request(req.id) + async def test_full_flow(inject_mockers, skynet_cleos, ipfs_node): - cleos = skynet_cleos + gpu, cleos = skynet_cleos # create account and deposit tokens into gpu account = cleos.new_account() diff --git a/uv.lock b/uv.lock index c806e12..430c272 100644 --- a/uv.lock +++ b/uv.lock @@ -1599,7 +1599,7 @@ wheels = [ [[package]] name = "py-leap" version = "0.1a35" -source = { editable = "../py-leap" } +source = { git = "https://github.com/guilledk/py-leap.git?branch=struct_unwrap#20f2e1f74e98e3d75984e8e1eee13c3100c17652" } dependencies = [ { name = "base58" }, { name = "cryptos" }, @@ -1609,33 +1609,6 @@ dependencies = [ { name = "ripemd-hash" }, ] -[package.metadata] -requires-dist = [ - { name = "base58", specifier = ">=2.1.1,<3" }, - { name = "cryptos", specifier = ">=2.0.9,<3" }, - { name = "httpx", specifier = ">=0.28.1,<0.29" }, - { name = "msgspec", specifier = ">=0.19.0" }, - { name = "requests", specifier = "<2.32.0" }, - { name = "ripemd-hash", specifier = ">=1.0.1,<2" }, -] - -[package.metadata.requires-dev] -dev = [ - { name = "docker", specifier = ">=6.1.3,<7" }, - { name = "pdbpp", specifier = ">=0.10.3,<0.11" }, - { name = "pytest", specifier = ">=8.3.4,<9" }, - { name = "pytest-trio", specifier = ">=0.8.0,<0.9" }, -] -docs = [ - { name = "sphinx", specifier = "==7.1.2" }, - { name = "sphinx-rtd-theme", specifier = "==1.3.0" }, -] -snaps = [ - { name = "bs4", specifier = ">=0.0.2,<0.0.3" }, - { name = "tdqm", specifier = ">=0.0.1,<0.0.2" }, - { name = "zstandard", specifier = ">=0.21.0,<0.22" }, -] - [[package]] name = "pycparser" version = "2.22" @@ -2152,7 +2125,7 @@ requires-dist = [ { name = "outcome", specifier = ">=1.3.0.post0" }, { name = "pillow", specifier = ">=10.0.1,<11" }, { name = "protobuf", specifier = ">=5.29.3,<6" }, - { name = "py-leap", editable = "../py-leap" }, + { name = "py-leap", git = "https://github.com/guilledk/py-leap.git?branch=struct_unwrap" }, { name = "pytz", specifier = "~=2023.3.post1" }, { name = "toml", specifier = ">=0.10.2,<0.11" }, { name = "trio", specifier = ">=0.22.2,<0.23" },