Update py-leap and use its new apis on testing, add a comment on fields of DgpuConfig struct, minor name changing and mechanics of open_worker, make logging configurable

pull/47/head
Guillermo Rodriguez 2025-02-07 20:24:31 -03:00
parent 149d9f9f33
commit f39859943d
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
10 changed files with 90 additions and 293 deletions

View File

@ -40,6 +40,7 @@ frontend = [
dev = [
"pdbpp>=0.10.3,<0.11",
"pytest>=7.4.2,<8",
"pytest-dockerctl",
"pytest-trio>=0.8.0,<0.9",
]
cuda = [
@ -80,7 +81,8 @@ explicit = true
torch = { index = "torch" }
triton = { index = "torch" }
torchvision = { index = "torch" }
py-leap = { git = "https://github.com/guilledk/py-leap.git", rev = "v0.1a32" }
py-leap = { git = "https://github.com/guilledk/py-leap.git", rev = "v0.1a34" }
pytest-dockerctl = { git = "https://github.com/pikers/pytest-dockerctl.git", branch = "g_update" }
[build-system]
requires = ["hatchling"]

View File

@ -11,24 +11,25 @@ class ConfigParsingError(BaseException):
class DgpuConfig(msgspec.Struct):
account: str
permission: str
key: str
node_url: str
hyperion_url: str
ipfs_url: str
hf_token: str
ipfs_domain: str = DEFAULT_IPFS_DOMAIN
hf_home: str = 'hf_home'
non_compete: set[str] = set()
model_whitelist: set[str] = set()
model_blacklist: set[str] = set()
backend: str = 'sync-on-thread'
api_bind: str = False
tui: bool = False
poll_time: float = 0.5
account: str # worker account name
permission: str # account permission name associated with key
key: str # private key
node_url: str # antelope http api endpoint
ipfs_url: str # IPFS node http rpc endpoint
hf_token: str # hugging face token
ipfs_domain: str = DEFAULT_IPFS_DOMAIN # IPFS Gateway domain
hf_home: str = 'hf_home' # hugging face data cache location
non_compete: set[str] = set() # set of worker names to not compete in requests
model_whitelist: set[str] = set() # only run these models
model_blacklist: set[str] = set() # don't run this models
backend: str = 'sync-on-thread' # select inference backend
tui: bool = False # enable TUI monitor
poll_time: float = 0.5 # wait time for polling updates from contract
log_level: str = 'info'
log_file: str = 'dgpu.log' # log file path (only used when tui = true)
class TelegramConfig(msgspec.Struct):
class FrontendConfig(msgspec.Struct):
account: str
permission: str
key: str
@ -37,32 +38,27 @@ class TelegramConfig(msgspec.Struct):
ipfs_url: str
token: str
class DiscordConfig(msgspec.Struct):
account: str
permission: str
key: str
node_url: str
hyperion_url: str
ipfs_url: str
token: str
class PinnerConfig(msgspec.Struct):
hyperion_url: str
ipfs_url: str
class UserConfig(msgspec.Struct):
account: str
permission: str
key: str
node_url: str
class Config(msgspec.Struct):
dgpu: DgpuConfig | None = None
telegram: TelegramConfig | None = None
discord: DiscordConfig | None = None
telegram: FrontendConfig | None = None
discord: FrontendConfig | None = None
pinner: PinnerConfig | None = None
user: UserConfig | None = None
def load_skynet_toml(file_path=DEFAULT_CONFIG_PATH) -> Config:
with open(file_path, 'r') as file:
return msgspec.toml.decode(file.read(), type=Config)

View File

@ -1,21 +1,23 @@
import logging
from contextlib import asynccontextmanager as acm
import trio
import urwid
from skynet.config import Config
from skynet.dgpu.tui import init_tui
from skynet.dgpu.daemon import serve_forever
from skynet.dgpu.daemon import dgpu_serve_forever
from skynet.dgpu.network import NetConnector
async def _dgpu_main(config: Config) -> None:
@acm
async def open_worker(config: Config):
# suppress logs from httpx (logs url + status after every query)
logging.getLogger("httpx").setLevel(logging.WARNING)
tui = None
if config.tui:
tui = init_tui()
tui = init_tui(config)
conn = NetConnector(config)
@ -25,7 +27,12 @@ async def _dgpu_main(config: Config) -> None:
if tui:
n.start_soon(tui.run)
await serve_forever(config, conn)
yield conn
except *urwid.ExitMainLoop:
...
async def _dgpu_main(config: Config):
async with open_worker(config) as conn:
await dgpu_serve_forever(config, conn)

View File

@ -183,7 +183,7 @@ async def maybe_serve_one(
await conn.cancel_work(rid, 'reason not provided')
async def serve_forever(config: Config, conn: NetConnector):
async def dgpu_serve_forever(config: Config, conn: NetConnector):
await maybe_update_tui_balance(conn)
try:
async for tables in conn.iter_poll_update(config.poll_time):

View File

@ -5,6 +5,8 @@ import warnings
import trio
import urwid
from skynet.config import DgpuConfig as Config
class WorkerMonitor:
def __init__(self):
@ -166,13 +168,15 @@ class WorkerMonitor:
self.update_requests(queue)
def setup_logging_for_tui(level):
def setup_logging_for_tui(config: Config):
warnings.filterwarnings("ignore")
level = getattr(logging, config.log_level.upper(), logging.WARNING)
logger = logging.getLogger()
logger.setLevel(level)
fh = logging.FileHandler('dgpu.log')
fh = logging.FileHandler(config.log_file)
fh.setLevel(level)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
@ -185,11 +189,11 @@ def setup_logging_for_tui(level):
logger.removeHandler(handler)
_tui = None
def init_tui():
_tui: WorkerMonitor | None = None
def init_tui(config: Config):
global _tui
assert not _tui
setup_logging_for_tui(logging.INFO)
setup_logging_for_tui(config)
_tui = WorkerMonitor()
return _tui

View File

@ -1,143 +0,0 @@
import json
import time
import logging
from contextlib import contextmanager as cm
import docker
from leap.cleos import CLEOS
from leap.sugar import get_container, Symbol
@cm
def open_nodeos(cleanup: bool = True):
dclient = docker.from_env()
vtestnet = get_container(
dclient,
'guilledk/skynet:leap-4.0.1',
name='skynet-nodeos',
force_unique=True,
detach=True,
network='host')
try:
cleos = CLEOS(
dclient, vtestnet,
url='http://127.0.0.1:42000',
remote='http://127.0.0.1:42000'
)
cleos.start_keosd()
priv, pub = cleos.create_key_pair()
logging.info(f'SUDO KEYS: {(priv, pub)}')
cleos.setup_wallet(priv)
genesis = json.dumps({
"initial_timestamp": '2017-08-29T02:14:00.000',
"initial_key": pub,
"initial_configuration": {
"max_block_net_usage": 1048576,
"target_block_net_usage_pct": 1000,
"max_transaction_net_usage": 1048575,
"base_per_transaction_net_usage": 12,
"net_usage_leeway": 500,
"context_free_discount_net_usage_num": 20,
"context_free_discount_net_usage_den": 100,
"max_block_cpu_usage": 200000,
"target_block_cpu_usage_pct": 1000,
"max_transaction_cpu_usage": 150000,
"min_transaction_cpu_usage": 100,
"max_transaction_lifetime": 3600,
"deferred_trx_expiration_window": 600,
"max_transaction_delay": 3888000,
"max_inline_action_size": 4096,
"max_inline_action_depth": 4,
"max_authority_depth": 6
}
}, indent=4)
ec, out = cleos.run(
['bash', '-c', f'echo \'{genesis}\' > /root/skynet.json'])
assert ec == 0
place_holder = 'EOS5fLreY5Zq5owBhmNJTgQaLqQ4ufzXSTpStQakEyfxNFuUEgNs1=KEY:5JnvSc6pewpHHuUHwvbJopsew6AKwiGnexwDRc2Pj2tbdw6iML9'
sig_provider = f'{pub}=KEY:{priv}'
nodeos_config_ini = '/root/nodeos/config.ini'
ec, out = cleos.run(
['bash', '-c', f'sed -i -e \'s/{place_holder}/{sig_provider}/g\' {nodeos_config_ini}'])
assert ec == 0
cleos.start_nodeos_from_config(
nodeos_config_ini,
data_dir='/root/nodeos/data',
genesis='/root/skynet.json',
state_plugin=True)
time.sleep(0.5)
cleos.wait_blocks(1)
cleos.boot_sequence(token_sym=Symbol('GPU', 4))
priv, pub = cleos.create_key_pair()
cleos.import_key(priv)
cleos.private_keys['telos.gpu'] = priv
logging.info(f'GPU KEYS: {(priv, pub)}')
cleos.new_account('telos.gpu', ram=4200000, key=pub)
for i in range(1, 4):
priv, pub = cleos.create_key_pair()
cleos.import_key(priv)
cleos.private_keys[f'testworker{i}'] = priv
logging.info(f'testworker{i} KEYS: {(priv, pub)}')
cleos.create_account_staked(
'eosio', f'testworker{i}', key=pub)
priv, pub = cleos.create_key_pair()
cleos.import_key(priv)
logging.info(f'TELEGRAM KEYS: {(priv, pub)}')
cleos.create_account_staked(
'eosio', 'telegram', ram=500000, key=pub)
cleos.transfer_token(
'eosio', 'telegram', '1000000.0000 GPU', 'Initial testing funds')
cleos.deploy_contract_from_host(
'telos.gpu',
'tests/contracts/telos.gpu',
verify_hash=False,
create_account=False
)
ec, out = cleos.push_action(
'telos.gpu',
'config',
['eosio.token', '4,GPU'],
'telos.gpu@active'
)
assert ec == 0
ec, out = cleos.transfer_token(
'telegram', 'telos.gpu', '1000000.0000 GPU', 'Initial testing funds')
assert ec == 0
user_row = cleos.get_table(
'telos.gpu',
'telos.gpu',
'users',
index_position=1,
key_type='name',
lower_bound='telegram',
upper_bound='telegram'
)
assert len(user_row) == 1
yield cleos
finally:
# ec, out = cleos.list_all_keys()
# logging.info(out)
if cleanup:
vtestnet.stop()
vtestnet.remove()

View File

@ -2,23 +2,43 @@ import pytest
from skynet.config import *
from skynet.ipfs import AsyncIPFSHTTP
from skynet.nodeos import open_nodeos
@pytest.fixture(scope='session')
def ipfs_client():
yield AsyncIPFSHTTP('http://127.0.0.1:5001')
@pytest.fixture(scope='session')
def postgres_db():
from skynet.db import open_new_database
with open_new_database() as db_params:
yield db_params
@pytest.fixture(scope='session')
def cleos():
with open_nodeos() as cli:
yield cli
@pytest.fixture(scope='module')
def skynet_cleos(cleos_bs):
cleos = cleos_bs
priv, pub = cleos.create_key_pair()
cleos.import_key('telos.gpu', priv)
cleos.new_account('telos.gpu', ram=4200000, key=pub)
cleos.deploy_contract_from_path(
'telos.gpu',
'tests/contracts/telos.gpu',
create_account=False
)
cleos.push_action(
'telos.gpu',
'config',
['eosio.token', '4,GPU'],
'telos.gpu'
)
yield cleos
@pytest.fixture(scope='session')
def dgpu():

View File

@ -0,0 +1,3 @@
def test_dev(skynet_cleos):
cleos = skynet_cleos
...

View File

@ -1,104 +0,0 @@
import time
import json
from hashlib import sha256
from functools import partial
import trio
import requests
from skynet.constants import DEFAULT_IPFS_REMOTE
from skynet.dgpu import open_dgpu_node
from leap.sugar import collect_stdout
def test_enqueue_work(cleos):
user = 'telegram'
req = json.dumps({
'method': 'diffuse',
'params': {
'algo': 'midj',
'prompt': 'skynet terminator dystopic',
'width': 512,
'height': 512,
'guidance': 10,
'step': 28,
'seed': 420,
'upscaler': 'x4'
}
})
binary = ''
ec, out = cleos.push_action(
'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU', 1], f'{user}@active'
)
assert ec == 0
queue = cleos.get_table('telos.gpu', 'telos.gpu', 'queue')
assert len(queue) == 1
req_on_chain = queue[0]
assert req_on_chain['user'] == user
assert req_on_chain['body'] == req
assert req_on_chain['binary_data'] == binary
trio.run(
partial(
open_dgpu_node,
f'testworker1',
'active',
cleos,
DEFAULT_IPFS_REMOTE,
cleos.private_keys['testworker1'],
initial_algos=['midj']
)
)
queue = cleos.get_table('telos.gpu', 'telos.gpu', 'queue')
assert len(queue) == 0
def test_enqueue_dequeue(cleos):
user = 'telegram'
req = json.dumps({
'method': 'diffuse',
'params': {
'algo': 'midj',
'prompt': 'skynet terminator dystopic',
'width': 512,
'height': 512,
'guidance': 10,
'step': 28,
'seed': 420,
'upscaler': 'x4'
}
})
binary = ''
ec, out = cleos.push_action(
'telos.gpu', 'enqueue', [user, req, binary, '20.0000 GPU', 1], f'{user}@active'
)
assert ec == 0
request_id, _ = collect_stdout(out).split(':')
request_id = int(request_id)
queue = cleos.get_table('telos.gpu', 'telos.gpu', 'queue')
assert len(queue) == 1
ec, out = cleos.push_action(
'telos.gpu', 'dequeue', [user, request_id], f'{user}@active'
)
assert ec == 0
queue = cleos.get_table('telos.gpu', 'telos.gpu', 'queue')
assert len(queue) == 0

18
uv.lock
View File

@ -1730,8 +1730,8 @@ wheels = [
[[package]]
name = "py-leap"
version = "0.1a32"
source = { git = "https://github.com/guilledk/py-leap.git?rev=v0.1a32#c8137dec3d0a7d1e883cafe5212d58a8e9db9b84" }
version = "0.1a34"
source = { git = "https://github.com/guilledk/py-leap.git?rev=v0.1a34#6055ca7063c1eb32644e855c6726c29a1d7ac7e9" }
dependencies = [
{ name = "base58" },
{ name = "cryptos" },
@ -1832,6 +1832,16 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/51/ff/f6e8b8f39e08547faece4bd80f89d5a8de68a38b2d179cc1c4490ffa3286/pytest-7.4.4-py3-none-any.whl", hash = "sha256:b090cdf5ed60bf4c45261be03239c2c1c22df034fbffe691abe93cd80cea01d8", size = 325287 },
]
[[package]]
name = "pytest-dockerctl"
version = "0.2a0"
source = { git = "https://github.com/pikers/pytest-dockerctl.git?branch=g_update#d58e9317b55954f05f139730a62d55e1acb5f5d1" }
dependencies = [
{ name = "docker" },
{ name = "pytest" },
{ name = "pytest-trio" },
]
[[package]]
name = "pytest-trio"
version = "0.8.0"
@ -2268,6 +2278,7 @@ cuda = [
dev = [
{ name = "pdbpp" },
{ name = "pytest" },
{ name = "pytest-dockerctl" },
{ name = "pytest-trio" },
]
frontend = [
@ -2288,7 +2299,7 @@ requires-dist = [
{ name = "outcome", specifier = ">=1.3.0.post0" },
{ name = "pillow", specifier = ">=10.0.1,<11" },
{ name = "protobuf", specifier = ">=5.29.3,<6" },
{ name = "py-leap", git = "https://github.com/guilledk/py-leap.git?rev=v0.1a32" },
{ name = "py-leap", git = "https://github.com/guilledk/py-leap.git?rev=v0.1a34" },
{ name = "pytz", specifier = "~=2023.3.post1" },
{ name = "toml", specifier = ">=0.10.2,<0.11" },
{ name = "trio", specifier = ">=0.22.2,<0.23" },
@ -2320,6 +2331,7 @@ cuda = [
dev = [
{ name = "pdbpp", specifier = ">=0.10.3,<0.11" },
{ name = "pytest", specifier = ">=7.4.2,<8" },
{ name = "pytest-dockerctl", git = "https://github.com/pikers/pytest-dockerctl.git?branch=g_update" },
{ name = "pytest-trio", specifier = ">=0.8.0,<0.9" },
]
frontend = [