mirror of https://github.com/skygpu/skynet.git
Pin all the things
parent
1494e47b34
commit
33d2ca281b
123
skynet/cli.py
123
skynet/cli.py
|
@ -11,6 +11,7 @@ from datetime import datetime, timedelta
|
|||
from functools import partial
|
||||
|
||||
import trio
|
||||
import asks
|
||||
import click
|
||||
import docker
|
||||
import asyncio
|
||||
|
@ -287,7 +288,7 @@ def nodeos():
|
|||
@click.option(
|
||||
'--node-url', '-n', default='http://skynet.ancap.tech')
|
||||
@click.option(
|
||||
'--ipfs-url', '-n', default='/ip4/169.197.142.4/tcp/4001/p2p/12D3KooWKHKPFuqJPeqYgtUJtfZTHvEArRX2qvThYBrjuTuPg2Nx')
|
||||
'--ipfs-url', '-n', default='/ip4/169.197.140.154/tcp/4001/p2p/12D3KooWKHKPFuqJPeqYgtUJtfZTHvEArRX2qvThYBrjuTuPg2Nx')
|
||||
@click.option(
|
||||
'--algos', '-A', default=json.dumps(['midj']))
|
||||
def dgpu(
|
||||
|
@ -395,6 +396,12 @@ class IPFSHTTP:
|
|||
params={'arg': cid}
|
||||
)
|
||||
|
||||
async def a_pin(self, cid: str):
|
||||
return await asks.post(
|
||||
f'{self.endpoint}/api/v0/pin/add',
|
||||
params={'arg': cid}
|
||||
)
|
||||
|
||||
|
||||
@run.command()
|
||||
@click.option('--loglevel', '-l', default='INFO', help='logging level')
|
||||
|
@ -414,71 +421,79 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
|
|||
ipfs_node = IPFSHTTP(ipfs_rpc)
|
||||
hyperion = HyperionAPI(hyperion_url)
|
||||
|
||||
last_pinned: dict[str, datetime] = {}
|
||||
already_pinned: set[str] = set()
|
||||
|
||||
def capture_enqueues(half_min_ago: datetime):
|
||||
# get all enqueues with binary data
|
||||
# in the last minute
|
||||
enqueues = hyperion.get_actions(
|
||||
account='telos.gpu',
|
||||
filter='telos.gpu:enqueue',
|
||||
sort='desc',
|
||||
after=half_min_ago.isoformat()
|
||||
)
|
||||
async def _async_main():
|
||||
|
||||
cids = []
|
||||
for action in enqueues['actions']:
|
||||
cid = action['act']['data']['binary_data']
|
||||
if cid and cid not in last_pinned:
|
||||
cids.append(cid)
|
||||
async def capture_enqueues(last_hour: datetime):
|
||||
# get all enqueuesin the last hour
|
||||
enqueues = await hyperion.aget_actions(
|
||||
account='telos.gpu',
|
||||
filter='telos.gpu:enqueue',
|
||||
sort='desc',
|
||||
after=last_hour.isoformat(),
|
||||
limit=1000
|
||||
)
|
||||
|
||||
return cids
|
||||
logging.info(f'got {len(enqueues)} enqueue actions.')
|
||||
|
||||
def capture_submits(half_min_ago: datetime):
|
||||
# get all submits in the last minute
|
||||
submits = hyperion.get_actions(
|
||||
account='telos.gpu',
|
||||
filter='telos.gpu:submit',
|
||||
sort='desc',
|
||||
after=half_min_ago.isoformat()
|
||||
)
|
||||
cids = []
|
||||
for action in enqueues['actions']:
|
||||
cid = action['act']['data']['binary_data']
|
||||
if cid and cid not in already_pinned:
|
||||
cids.append(cid)
|
||||
|
||||
cids = []
|
||||
for action in submits['actions']:
|
||||
cid = action['act']['data']['ipfs_hash']
|
||||
if cid and cid not in last_pinned:
|
||||
cids.append(cid)
|
||||
return cids
|
||||
|
||||
return cids
|
||||
async def capture_submits(last_hour: datetime):
|
||||
# get all submits in the last hour
|
||||
submits = await hyperion.aget_actions(
|
||||
account='telos.gpu',
|
||||
filter='telos.gpu:submit',
|
||||
sort='desc',
|
||||
after=last_hour.isoformat(),
|
||||
limit=1000
|
||||
)
|
||||
|
||||
def cleanup_pinned(now: datetime):
|
||||
for cid in set(last_pinned.keys()):
|
||||
ts = last_pinned[cid]
|
||||
if now - ts > timedelta(minutes=1):
|
||||
del last_pinned[cid]
|
||||
logging.info(f'got {len(submits)} submits actions.')
|
||||
|
||||
try:
|
||||
while True:
|
||||
now = datetime.now()
|
||||
half_min_ago = now - timedelta(seconds=30)
|
||||
cids = []
|
||||
for action in submits['actions']:
|
||||
cid = action['act']['data']['ipfs_hash']
|
||||
if cid and cid not in already_pinned:
|
||||
cids.append(cid)
|
||||
|
||||
# filter for the ones not already pinned
|
||||
cids = [*capture_enqueues(half_min_ago), *capture_submits(half_min_ago)]
|
||||
return cids
|
||||
|
||||
# pin and remember
|
||||
for cid in cids:
|
||||
last_pinned[cid] = now
|
||||
async def task_pin(cid: str):
|
||||
already_pinned.add(cid)
|
||||
|
||||
resp = ipfs_node.pin(cid)
|
||||
if resp.status_code != 200:
|
||||
logging.error(f'error pinning {cid}:\n{resp.text}')
|
||||
resp = await ipfs_node.a_pin(cid)
|
||||
if resp.status_code != 200:
|
||||
logging.error(f'error pinning {cid}:\n{resp.text}')
|
||||
|
||||
else:
|
||||
logging.info(f'pinned {cid}')
|
||||
else:
|
||||
logging.info(f'pinned {cid}')
|
||||
|
||||
cleanup_pinned(now)
|
||||
try:
|
||||
async with trio.open_nursery() as n:
|
||||
while True:
|
||||
now = datetime.now()
|
||||
last_hour = now - timedelta(hours=1)
|
||||
|
||||
time.sleep(0.1)
|
||||
# filter for the ones not already pinned
|
||||
cids = [
|
||||
*(await capture_enqueues(last_hour)),
|
||||
*(await capture_submits(last_hour))
|
||||
]
|
||||
|
||||
except KeyboardInterrupt:
|
||||
...
|
||||
# pin and remember (in parallel)
|
||||
for cid in cids:
|
||||
n.start_soon(task_pin, cid)
|
||||
|
||||
await trio.sleep(1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
...
|
||||
|
||||
trio.run(_async_main)
|
||||
|
|
Loading…
Reference in New Issue