diff --git a/skynet/frontend/telegram/__init__.py b/skynet/frontend/telegram/__init__.py index 540a240..f900bd6 100644 --- a/skynet/frontend/telegram/__init__.py +++ b/skynet/frontend/telegram/__init__.py @@ -3,6 +3,9 @@ import random import logging import asyncio +import time +import httpx + from PIL import Image, UnidentifiedImageError from json import JSONDecodeError from decimal import Decimal @@ -13,7 +16,7 @@ from contextlib import asynccontextmanager as acm from leap.cleos import CLEOS from leap.protocol import Name, Asset -from leap.hyperion import HyperionAPI +# from leap.hyperion import HyperionAPI from telebot.types import InputMediaPhoto from telebot.async_telebot import AsyncTeleBot @@ -60,7 +63,7 @@ class SkynetTelegramFrontend: self.bot = AsyncTeleBot(token, exception_handler=SKYExceptionHandler) self.cleos = CLEOS(endpoint=node_url) self.cleos.load_abi('gpu.scd', GPU_CONTRACT_ABI) - self.hyperion = HyperionAPI(hyperion_url) + # self.hyperion = HyperionAPI(hyperion_url) self.ipfs_node = AsyncIPFSHTTP(ipfs_node) self._async_exit_stack = AsyncExitStack() @@ -103,6 +106,80 @@ class SkynetTelegramFrontend: **kwargs ) + async def _wait_for_submit_in_blocks( + self, + request_hash: str, + start_block: int, + timeout_seconds: int = 60 * 3, + ): + """ + Poll /v1/chain/get_block from start_block upwards until we see + gpu.scd::submit with the given request_hash, or we hit timeout. + + Returns (tx_id, ipfs_hash, worker) or (None, None, None) on timeout. + """ + endpoint = self.node_url.rstrip('/') + + next_block = start_block # inclusive + deadline = time.monotonic() + timeout_seconds + + async with httpx.AsyncClient() as client: + while time.monotonic() < deadline: + # Get current head block + info = (await client.post( + f'{endpoint}/v1/chain/get_info', + json={} + )).json() + head = info['head_block_num'] + + # No new blocks yet, wait a bit + if next_block > head: + await asyncio.sleep(0.5) + continue + + # Walk all blocks we haven't seen yet + while next_block <= head: + try: + block = (await client.post( + f'{endpoint}/v1/chain/get_block', + json={'block_num_or_id': next_block} + )).json() + except (httpx.RequestError, ValueError): + logging.warning(f'failed to get block {next_block}, retrying...') + break # leave inner loop, re-fetch head + + for tx in block.get('transactions', []): + trx = tx.get('trx') + # Sometimes trx can be just a string (id) — skip those. + if isinstance(trx, str): + continue + + tx_id = trx.get('id') + tx_obj = trx.get('transaction') or {} + actions = tx_obj.get('actions', []) or [] + + for act in actions: + if ( + act.get('account') == 'gpu.scd' + and act.get('name') == 'submit' + ): + data = act.get('data') or {} + if data.get('request_hash') == request_hash: + ipfs_hash = data.get('ipfs_hash') + worker = data.get('worker') + logging.info( + f'Found matching submit in block {next_block}, ' + f'tx {tx_id}' + ) + return tx_id, ipfs_hash, worker + + next_block += 1 + + # Caught up with head and still nothing; wait for more blocks + await asyncio.sleep(0.5) + + return None, None, None + async def work_request( self, user, @@ -170,45 +247,36 @@ class SkynetTelegramFrontend: parse_mode='HTML' ) - out = res['processed']['action_traces'][0]['console'] + + out = res['processed']['action_traces'][0]['console'] request_id, nonce = out.split(':') request_hash = sha256( - (nonce + body + ','.join(inputs)).encode('utf-8')).hexdigest().upper() + (nonce + body + ','.join(inputs)).encode('utf-8') + ).hexdigest().upper() request_id = int(request_id) logging.info(f'{request_id} enqueued.') - tx_hash = None - ipfs_hash = None - for i in range(60 * 3): - try: - submits = await self.hyperion.aget_actions( - account=self.account, - filter='gpu.scd:submit', - sort='desc', - after=request_time - ) - actions = [ - action - for action in submits['actions'] - if action[ - 'act']['data']['request_hash'] == request_hash - ] - if len(actions) > 0: - tx_hash = actions[0]['trx_id'] - data = actions[0]['act']['data'] - ipfs_hash = data['ipfs_hash'] - worker = data['worker'] - logging.info('Found matching submit!') - break + # Prefer the block number from the push_transaction response + enqueue_block_num = res.get('processed', {}).get('block_num') + if not enqueue_block_num: + # Fallback: start from current head if block_num is missing + async with httpx.AsyncClient() as client: + info = (await client.post( + f'{self.node_url.rstrip("/")}/v1/chain/get_info', + json={} + )).json() + enqueue_block_num = info['head_block_num'] - except JSONDecodeError: - logging.error(f'network error while getting actions, retry..') - - await asyncio.sleep(1) + # Wait for submit via block polling + tx_hash, ipfs_hash, worker = await self._wait_for_submit_in_blocks( + request_hash=request_hash, + start_block=enqueue_block_num, + timeout_seconds=60 * 3, + ) if not ipfs_hash: await self.update_status_message( @@ -218,6 +286,7 @@ class SkynetTelegramFrontend: ) return False + tx_link = hlink( 'Your result on Skynet Explorer', f'https://{self.explorer_domain}/v2/explore/transaction/{tx_hash}'