From 33d2ca281b8673c5cb4fbf6b4e770efb64e23b80 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Mon, 29 May 2023 19:22:56 -0300 Subject: [PATCH] Pin all the things --- skynet/cli.py | 123 ++++++++++++++++++++++++++++---------------------- 1 file changed, 69 insertions(+), 54 deletions(-) diff --git a/skynet/cli.py b/skynet/cli.py index d2dab73..7248c6e 100644 --- a/skynet/cli.py +++ b/skynet/cli.py @@ -11,6 +11,7 @@ from datetime import datetime, timedelta from functools import partial import trio +import asks import click import docker import asyncio @@ -287,7 +288,7 @@ def nodeos(): @click.option( '--node-url', '-n', default='http://skynet.ancap.tech') @click.option( - '--ipfs-url', '-n', default='/ip4/169.197.142.4/tcp/4001/p2p/12D3KooWKHKPFuqJPeqYgtUJtfZTHvEArRX2qvThYBrjuTuPg2Nx') + '--ipfs-url', '-n', default='/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKHKPFuqJPeqYgtUJtfZTHvEArRX2qvThYBrjuTuPg2Nx') @click.option( '--algos', '-A', default=json.dumps(['midj'])) def dgpu( @@ -395,6 +396,12 @@ class IPFSHTTP: params={'arg': cid} ) + async def a_pin(self, cid: str): + return await asks.post( + f'{self.endpoint}/api/v0/pin/add', + params={'arg': cid} + ) + @run.command() @click.option('--loglevel', '-l', default='INFO', help='logging level') @@ -414,71 +421,79 @@ def pinner(loglevel, ipfs_rpc, hyperion_url): ipfs_node = IPFSHTTP(ipfs_rpc) hyperion = HyperionAPI(hyperion_url) - last_pinned: dict[str, datetime] = {} + already_pinned: set[str] = set() - def capture_enqueues(half_min_ago: datetime): - # get all enqueues with binary data - # in the last minute - enqueues = hyperion.get_actions( - account='telos.gpu', - filter='telos.gpu:enqueue', - sort='desc', - after=half_min_ago.isoformat() - ) + async def _async_main(): - cids = [] - for action in enqueues['actions']: - cid = action['act']['data']['binary_data'] - if cid and cid not in last_pinned: - cids.append(cid) + async def capture_enqueues(last_hour: datetime): + # get all enqueuesin the last hour + enqueues = await hyperion.aget_actions( + account='telos.gpu', + filter='telos.gpu:enqueue', + sort='desc', + after=last_hour.isoformat(), + limit=1000 + ) - return cids + logging.info(f'got {len(enqueues)} enqueue actions.') - def capture_submits(half_min_ago: datetime): - # get all submits in the last minute - submits = hyperion.get_actions( - account='telos.gpu', - filter='telos.gpu:submit', - sort='desc', - after=half_min_ago.isoformat() - ) + cids = [] + for action in enqueues['actions']: + cid = action['act']['data']['binary_data'] + if cid and cid not in already_pinned: + cids.append(cid) - cids = [] - for action in submits['actions']: - cid = action['act']['data']['ipfs_hash'] - if cid and cid not in last_pinned: - cids.append(cid) + return cids - return cids + async def capture_submits(last_hour: datetime): + # get all submits in the last hour + submits = await hyperion.aget_actions( + account='telos.gpu', + filter='telos.gpu:submit', + sort='desc', + after=last_hour.isoformat(), + limit=1000 + ) - def cleanup_pinned(now: datetime): - for cid in set(last_pinned.keys()): - ts = last_pinned[cid] - if now - ts > timedelta(minutes=1): - del last_pinned[cid] + logging.info(f'got {len(submits)} submits actions.') - try: - while True: - now = datetime.now() - half_min_ago = now - timedelta(seconds=30) + cids = [] + for action in submits['actions']: + cid = action['act']['data']['ipfs_hash'] + if cid and cid not in already_pinned: + cids.append(cid) - # filter for the ones not already pinned - cids = [*capture_enqueues(half_min_ago), *capture_submits(half_min_ago)] + return cids - # pin and remember - for cid in cids: - last_pinned[cid] = now + async def task_pin(cid: str): + already_pinned.add(cid) - resp = ipfs_node.pin(cid) - if resp.status_code != 200: - logging.error(f'error pinning {cid}:\n{resp.text}') + resp = await ipfs_node.a_pin(cid) + if resp.status_code != 200: + logging.error(f'error pinning {cid}:\n{resp.text}') - else: - logging.info(f'pinned {cid}') + else: + logging.info(f'pinned {cid}') - cleanup_pinned(now) + try: + async with trio.open_nursery() as n: + while True: + now = datetime.now() + last_hour = now - timedelta(hours=1) - time.sleep(0.1) + # filter for the ones not already pinned + cids = [ + *(await capture_enqueues(last_hour)), + *(await capture_submits(last_hour)) + ] - except KeyboardInterrupt: - ... + # pin and remember (in parallel) + for cid in cids: + n.start_soon(task_pin, cid) + + await trio.sleep(1) + + except KeyboardInterrupt: + ... + + trio.run(_async_main)