mirror of https://github.com/skygpu/skynet.git
Drop hyperion on telegram frontend
parent
868e2489b3
commit
0d76953782
|
|
@ -3,6 +3,9 @@ import random
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
|
import time
|
||||||
|
import httpx
|
||||||
|
|
||||||
from PIL import Image, UnidentifiedImageError
|
from PIL import Image, UnidentifiedImageError
|
||||||
from json import JSONDecodeError
|
from json import JSONDecodeError
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
|
|
@ -13,7 +16,7 @@ from contextlib import asynccontextmanager as acm
|
||||||
|
|
||||||
from leap.cleos import CLEOS
|
from leap.cleos import CLEOS
|
||||||
from leap.protocol import Name, Asset
|
from leap.protocol import Name, Asset
|
||||||
from leap.hyperion import HyperionAPI
|
# from leap.hyperion import HyperionAPI
|
||||||
|
|
||||||
from telebot.types import InputMediaPhoto
|
from telebot.types import InputMediaPhoto
|
||||||
from telebot.async_telebot import AsyncTeleBot
|
from telebot.async_telebot import AsyncTeleBot
|
||||||
|
|
@ -60,7 +63,7 @@ class SkynetTelegramFrontend:
|
||||||
self.bot = AsyncTeleBot(token, exception_handler=SKYExceptionHandler)
|
self.bot = AsyncTeleBot(token, exception_handler=SKYExceptionHandler)
|
||||||
self.cleos = CLEOS(endpoint=node_url)
|
self.cleos = CLEOS(endpoint=node_url)
|
||||||
self.cleos.load_abi('gpu.scd', GPU_CONTRACT_ABI)
|
self.cleos.load_abi('gpu.scd', GPU_CONTRACT_ABI)
|
||||||
self.hyperion = HyperionAPI(hyperion_url)
|
# self.hyperion = HyperionAPI(hyperion_url)
|
||||||
self.ipfs_node = AsyncIPFSHTTP(ipfs_node)
|
self.ipfs_node = AsyncIPFSHTTP(ipfs_node)
|
||||||
|
|
||||||
self._async_exit_stack = AsyncExitStack()
|
self._async_exit_stack = AsyncExitStack()
|
||||||
|
|
@ -103,6 +106,80 @@ class SkynetTelegramFrontend:
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def _wait_for_submit_in_blocks(
|
||||||
|
self,
|
||||||
|
request_hash: str,
|
||||||
|
start_block: int,
|
||||||
|
timeout_seconds: int = 60 * 3,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Poll /v1/chain/get_block from start_block upwards until we see
|
||||||
|
gpu.scd::submit with the given request_hash, or we hit timeout.
|
||||||
|
|
||||||
|
Returns (tx_id, ipfs_hash, worker) or (None, None, None) on timeout.
|
||||||
|
"""
|
||||||
|
endpoint = self.node_url.rstrip('/')
|
||||||
|
|
||||||
|
next_block = start_block # inclusive
|
||||||
|
deadline = time.monotonic() + timeout_seconds
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
# Get current head block
|
||||||
|
info = (await client.post(
|
||||||
|
f'{endpoint}/v1/chain/get_info',
|
||||||
|
json={}
|
||||||
|
)).json()
|
||||||
|
head = info['head_block_num']
|
||||||
|
|
||||||
|
# No new blocks yet, wait a bit
|
||||||
|
if next_block > head:
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Walk all blocks we haven't seen yet
|
||||||
|
while next_block <= head:
|
||||||
|
try:
|
||||||
|
block = (await client.post(
|
||||||
|
f'{endpoint}/v1/chain/get_block',
|
||||||
|
json={'block_num_or_id': next_block}
|
||||||
|
)).json()
|
||||||
|
except (httpx.RequestError, ValueError):
|
||||||
|
logging.warning(f'failed to get block {next_block}, retrying...')
|
||||||
|
break # leave inner loop, re-fetch head
|
||||||
|
|
||||||
|
for tx in block.get('transactions', []):
|
||||||
|
trx = tx.get('trx')
|
||||||
|
# Sometimes trx can be just a string (id) — skip those.
|
||||||
|
if isinstance(trx, str):
|
||||||
|
continue
|
||||||
|
|
||||||
|
tx_id = trx.get('id')
|
||||||
|
tx_obj = trx.get('transaction') or {}
|
||||||
|
actions = tx_obj.get('actions', []) or []
|
||||||
|
|
||||||
|
for act in actions:
|
||||||
|
if (
|
||||||
|
act.get('account') == 'gpu.scd'
|
||||||
|
and act.get('name') == 'submit'
|
||||||
|
):
|
||||||
|
data = act.get('data') or {}
|
||||||
|
if data.get('request_hash') == request_hash:
|
||||||
|
ipfs_hash = data.get('ipfs_hash')
|
||||||
|
worker = data.get('worker')
|
||||||
|
logging.info(
|
||||||
|
f'Found matching submit in block {next_block}, '
|
||||||
|
f'tx {tx_id}'
|
||||||
|
)
|
||||||
|
return tx_id, ipfs_hash, worker
|
||||||
|
|
||||||
|
next_block += 1
|
||||||
|
|
||||||
|
# Caught up with head and still nothing; wait for more blocks
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
return None, None, None
|
||||||
|
|
||||||
async def work_request(
|
async def work_request(
|
||||||
self,
|
self,
|
||||||
user,
|
user,
|
||||||
|
|
@ -170,45 +247,36 @@ class SkynetTelegramFrontend:
|
||||||
parse_mode='HTML'
|
parse_mode='HTML'
|
||||||
)
|
)
|
||||||
|
|
||||||
out = res['processed']['action_traces'][0]['console']
|
|
||||||
|
out = res['processed']['action_traces'][0]['console']
|
||||||
|
|
||||||
request_id, nonce = out.split(':')
|
request_id, nonce = out.split(':')
|
||||||
|
|
||||||
request_hash = sha256(
|
request_hash = sha256(
|
||||||
(nonce + body + ','.join(inputs)).encode('utf-8')).hexdigest().upper()
|
(nonce + body + ','.join(inputs)).encode('utf-8')
|
||||||
|
).hexdigest().upper()
|
||||||
|
|
||||||
request_id = int(request_id)
|
request_id = int(request_id)
|
||||||
|
|
||||||
logging.info(f'{request_id} enqueued.')
|
logging.info(f'{request_id} enqueued.')
|
||||||
|
|
||||||
tx_hash = None
|
# Prefer the block number from the push_transaction response
|
||||||
ipfs_hash = None
|
enqueue_block_num = res.get('processed', {}).get('block_num')
|
||||||
for i in range(60 * 3):
|
if not enqueue_block_num:
|
||||||
try:
|
# Fallback: start from current head if block_num is missing
|
||||||
submits = await self.hyperion.aget_actions(
|
async with httpx.AsyncClient() as client:
|
||||||
account=self.account,
|
info = (await client.post(
|
||||||
filter='gpu.scd:submit',
|
f'{self.node_url.rstrip("/")}/v1/chain/get_info',
|
||||||
sort='desc',
|
json={}
|
||||||
after=request_time
|
)).json()
|
||||||
)
|
enqueue_block_num = info['head_block_num']
|
||||||
actions = [
|
|
||||||
action
|
|
||||||
for action in submits['actions']
|
|
||||||
if action[
|
|
||||||
'act']['data']['request_hash'] == request_hash
|
|
||||||
]
|
|
||||||
if len(actions) > 0:
|
|
||||||
tx_hash = actions[0]['trx_id']
|
|
||||||
data = actions[0]['act']['data']
|
|
||||||
ipfs_hash = data['ipfs_hash']
|
|
||||||
worker = data['worker']
|
|
||||||
logging.info('Found matching submit!')
|
|
||||||
break
|
|
||||||
|
|
||||||
except JSONDecodeError:
|
# Wait for submit via block polling
|
||||||
logging.error(f'network error while getting actions, retry..')
|
tx_hash, ipfs_hash, worker = await self._wait_for_submit_in_blocks(
|
||||||
|
request_hash=request_hash,
|
||||||
await asyncio.sleep(1)
|
start_block=enqueue_block_num,
|
||||||
|
timeout_seconds=60 * 3,
|
||||||
|
)
|
||||||
|
|
||||||
if not ipfs_hash:
|
if not ipfs_hash:
|
||||||
await self.update_status_message(
|
await self.update_status_message(
|
||||||
|
|
@ -218,6 +286,7 @@ class SkynetTelegramFrontend:
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
tx_link = hlink(
|
tx_link = hlink(
|
||||||
'Your result on Skynet Explorer',
|
'Your result on Skynet Explorer',
|
||||||
f'https://{self.explorer_domain}/v2/explore/transaction/{tx_hash}'
|
f'https://{self.explorer_domain}/v2/explore/transaction/{tx_hash}'
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue