Make pinner less spammy lool, and gpu more resiliant

add-txt2txt-models
Guillermo Rodriguez 2023-06-03 12:22:24 -03:00
parent 320f13260c
commit 13c6e85ac9
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
2 changed files with 19 additions and 9 deletions

View File

@ -418,6 +418,7 @@ class IPFSHTTP:
@click.option('--loglevel', '-l', default='INFO', help='logging level') @click.option('--loglevel', '-l', default='INFO', help='logging level')
@click.option('--name', '-n', default='skynet-ipfs', help='container name') @click.option('--name', '-n', default='skynet-ipfs', help='container name')
def ipfs(loglevel, name): def ipfs(loglevel, name):
logging.basicConfig(level=loglevel)
with open_ipfs_node(name=name): with open_ipfs_node(name=name):
... ...
@ -432,6 +433,7 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
ipfs_node = IPFSHTTP(ipfs_rpc) ipfs_node = IPFSHTTP(ipfs_rpc)
hyperion = HyperionAPI(hyperion_url) hyperion = HyperionAPI(hyperion_url)
pinned = set()
async def _async_main(): async def _async_main():
async def capture_enqueues(after: datetime): async def capture_enqueues(after: datetime):
@ -448,7 +450,8 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
cids = [] cids = []
for action in enqueues['actions']: for action in enqueues['actions']:
cid = action['act']['data']['binary_data'] cid = action['act']['data']['binary_data']
if cid: if cid and cid not in pinned:
pinned.add(cid)
cids.append(cid) cids.append(cid)
return cids return cids
@ -467,11 +470,14 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
cids = [] cids = []
for action in submits['actions']: for action in submits['actions']:
cid = action['act']['data']['ipfs_hash'] cid = action['act']['data']['ipfs_hash']
if cid and cid not in pinned:
pinned.add(cid)
cids.append(cid) cids.append(cid)
return cids return cids
async def task_pin(cid: str): async def task_pin(cid: str):
logging.info(f'pinning {cid}...')
resp = await ipfs_node.a_pin(cid) resp = await ipfs_node.a_pin(cid)
if resp.status_code != 200: if resp.status_code != 200:
logging.error(f'error pinning {cid}:\n{resp.text}') logging.error(f'error pinning {cid}:\n{resp.text}')
@ -483,7 +489,7 @@ def pinner(loglevel, ipfs_rpc, hyperion_url):
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
while True: while True:
now = datetime.now() now = datetime.now()
prev_second = now - timedelta(seconds=1) prev_second = now - timedelta(seconds=10)
# filter for the ones not already pinned # filter for the ones not already pinned
cids = [ cids = [

View File

@ -172,6 +172,7 @@ async def open_dgpu_node(
async def get_work_requests_last_hour(): async def get_work_requests_last_hour():
logging.info('get_work_requests_last_hour') logging.info('get_work_requests_last_hour')
try:
return await cleos.aget_table( return await cleos.aget_table(
'telos.gpu', 'telos.gpu', 'queue', 'telos.gpu', 'telos.gpu', 'queue',
index_position=2, index_position=2,
@ -179,6 +180,9 @@ async def open_dgpu_node(
lower_bound=int(time.time()) - 3600 lower_bound=int(time.time()) - 3600
) )
except asks.errors.RequestTimeout:
return []
async def get_status_by_request_id(request_id: int): async def get_status_by_request_id(request_id: int):
logging.info('get_status_by_request_id') logging.info('get_status_by_request_id')
return await cleos.aget_table( return await cleos.aget_table(