From 13c6e85ac96fe7f90f44b94143f755c99bbcc147 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sat, 3 Jun 2023 12:22:24 -0300 Subject: [PATCH] Make pinner less spammy lool, and gpu more resiliant --- skynet/cli.py | 12 +++++++++--- skynet/dgpu.py | 16 ++++++++++------ 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/skynet/cli.py b/skynet/cli.py index 09bb781..8270b20 100644 --- a/skynet/cli.py +++ b/skynet/cli.py @@ -418,6 +418,7 @@ class IPFSHTTP: @click.option('--loglevel', '-l', default='INFO', help='logging level') @click.option('--name', '-n', default='skynet-ipfs', help='container name') def ipfs(loglevel, name): + logging.basicConfig(level=loglevel) with open_ipfs_node(name=name): ... @@ -432,6 +433,7 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): ipfs_node = IPFSHTTP(ipfs_rpc) hyperion = HyperionAPI(hyperion_url) + pinned = set() async def _async_main(): async def capture_enqueues(after: datetime): @@ -448,7 +450,8 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): cids = [] for action in enqueues['actions']: cid = action['act']['data']['binary_data'] - if cid: + if cid and cid not in pinned: + pinned.add(cid) cids.append(cid) return cids @@ -467,11 +470,14 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): cids = [] for action in submits['actions']: cid = action['act']['data']['ipfs_hash'] - cids.append(cid) + if cid and cid not in pinned: + pinned.add(cid) + cids.append(cid) return cids async def task_pin(cid: str): + logging.info(f'pinning {cid}...') resp = await ipfs_node.a_pin(cid) if resp.status_code != 200: logging.error(f'error pinning {cid}:\n{resp.text}') @@ -483,7 +489,7 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): async with trio.open_nursery() as n: while True: now = datetime.now() - prev_second = now - timedelta(seconds=1) + prev_second = now - timedelta(seconds=10) # filter for the ones not already pinned cids = [ diff --git a/skynet/dgpu.py b/skynet/dgpu.py index 67ef251..e3b882d 100644 --- a/skynet/dgpu.py +++ b/skynet/dgpu.py @@ -172,12 +172,16 @@ async def open_dgpu_node( async def get_work_requests_last_hour(): logging.info('get_work_requests_last_hour') - return await cleos.aget_table( - 'telos.gpu', 'telos.gpu', 'queue', - index_position=2, - key_type='i64', - lower_bound=int(time.time()) - 3600 - ) + try: + return await cleos.aget_table( + 'telos.gpu', 'telos.gpu', 'queue', + index_position=2, + key_type='i64', + lower_bound=int(time.time()) - 3600 + ) + + except asks.errors.RequestTimeout: + return [] async def get_status_by_request_id(request_id: int): logging.info('get_status_by_request_id')