Further improvements in indentation and logic in daemons maybe_serve_one, also might of fixed a bug related to using id instead of request_id in the search existing results phase, and add way more logging

guilles_counter_review
Guillermo Rodriguez 2025-02-03 20:07:45 -03:00
parent b62cdd6802
commit 399299c62b
No known key found for this signature in database
GPG Key ID: 002CC5F1E6BDA53E
4 changed files with 130 additions and 115 deletions

View File

@ -1,3 +1,5 @@
import logging
import trio
from hypercorn.config import Config
@ -16,6 +18,10 @@ async def open_dgpu_node(config: dict) -> None:
and *maybe* serve a `hypercorn` web API.
'''
# suppress logs from httpx (logs url + status after every query)
logging.getLogger("httpx").setLevel(logging.WARNING)
conn = NetConnector(config)
mm = ModelMngr(config)
daemon = WorkerDaemon(mm, conn, config)
@ -33,6 +39,7 @@ async def open_dgpu_node(config: dict) -> None:
# TODO, consider a more explicit `as hypercorn_serve`
# to clarify?
if api:
logging.info(f'serving api @ {config["api_bind"]}')
tn.start_soon(serve, api, api_conf)
# block until cancelled

View File

@ -83,8 +83,8 @@ class ModelMngr:
# self.load_model(DEFAULT_INITAL_MODEL, 'txt2img')
def log_debug_info(self):
logging.info('memory summary:')
logging.info('\n' + torch.cuda.memory_summary())
logging.debug('memory summary:')
logging.debug('\n' + torch.cuda.memory_summary())
def is_model_loaded(self, name: str, mode: str):
if (name == self._model_name and
@ -114,6 +114,8 @@ class ModelMngr:
name, mode, cache_dir=self.cache_dir)
self._model_mode = mode
self._model_name = name
logging.info('{name} loaded!')
self.log_debug_info()
def compute_one(
self,
@ -126,11 +128,7 @@ class ModelMngr:
if self._should_cancel:
should_raise = trio.from_thread.run(self._should_cancel, request_id)
if should_raise:
logging.warn(f'cancelling work at step {step}')
# ?TODO, this is never caught, so why is it
# raised specially?
raise DGPUInferenceCancelled()
logging.warn(f'CANCELLING work at step {step}')
return {}
@ -206,8 +204,6 @@ class ModelMngr:
raise DGPUComputeError('Unsupported compute method')
except BaseException as err:
logging.error(err)
# to see the src exc in tb
raise DGPUComputeError(str(err)) from err
finally:

View File

@ -105,7 +105,11 @@ class WorkerDaemon:
for status in self._snap['requests'][request_id]
if status['worker'] != self.account
])
return bool(self.non_compete & competitors)
logging.info('should cancel work?')
logging.info(f'competitors: {competitors}')
should_cancel = bool(self.non_compete & competitors)
logging.info(f'cancel: {should_cancel}')
return should_cancel
async def snap_updater_task(self):
@ -150,6 +154,7 @@ class WorkerDaemon:
req: dict,
):
rid = req['id']
logging.info(f'maybe serve request #{rid}')
# parse request
body = json.loads(req['body'])
@ -161,7 +166,7 @@ class WorkerDaemon:
and
model not in MODELS
):
logging.warning(f'Unknown model {model}')
logging.warning(f'unknown model {model}!, skip...')
return False
# only handle whitelisted models
@ -170,98 +175,110 @@ class WorkerDaemon:
and
model not in self.model_whitelist
):
logging.warning('model not whitelisted!, skip...')
return False
# if blacklist contains model skip
if model in self.model_blacklist:
logging.warning('model not blacklisted!, skip...')
return False
results = [res['id'] for res in self._snap['results']]
results = [res['request_id'] for res in self._snap['results']]
# if worker is already on that request or
# if worker has a stale status for that request
if rid in results or rid not in self._snap['requests']:
logging.info(f'request {rid} already beign worked on, skip...')
return
# if worker already produced a result for this request
if rid in results:
logging.info(f'worker already submitted a result for request #{rid}, skip...')
return False
statuses = self._snap['requests'][rid]
if len(statuses) == 0:
inputs = []
for _input in req['binary_data'].split(','):
if _input:
for _ in range(3):
try:
# user `GPUConnector` to IO with
# storage layer to seed the compute
# task.
img = await self.conn.get_input_data(_input)
inputs.append(img)
break
except BaseException:
logging.exception(
'Model input error !?!\n'
# skip if workers in non_compete already on it
competitors = set((status['worker'] for status in statuses))
if bool(self.non_compete & competitors):
logging.info('worker in configured non_compete list already working on request, skip...')
return False
# resolve the ipfs hashes into the actual data behind them
inputs = []
raw_inputs = req['binary_data'].split(',')
if raw_inputs:
logging.info(f'fetching IPFS inputs: {raw_inputs}')
retry = 3
for _input in req['binary_data'].split(','):
if _input:
for r in range(retry):
try:
# user `GPUConnector` to IO with
# storage layer to seed the compute
# task.
img = await self.conn.get_input_data(_input)
inputs.append(img)
logging.info(f'retrieved {_input}!')
break
except BaseException:
logging.exception(
f'IPFS fetch input error !?! retries left {retry - r - 1}\n'
)
# compute unique request hash used on submit
hash_str = (
str(req['nonce'])
+
req['body']
+
req['binary_data']
)
logging.debug(f'hashing: {hash_str}')
request_hash = sha256(hash_str.encode('utf-8')).hexdigest()
logging.info(f'calculated request hash: {request_hash}')
# TODO: validate request
resp = await self.conn.begin_work(rid)
if not resp or 'code' in resp:
logging.info('begin_work error, probably being worked on already... skip.')
else:
try:
output_type = 'png'
if 'output_type' in body['params']:
output_type = body['params']['output_type']
output = None
output_hash = None
match self.backend:
case 'sync-on-thread':
self.mm._should_cancel = self.should_cancel_work
output_hash, output = await trio.to_thread.run_sync(
partial(
self.mm.compute_one,
rid,
body['method'], body['params'],
inputs=inputs
)
)
hash_str = (
str(req['nonce'])
+
req['body']
+
req['binary_data']
)
logging.info(f'hashing: {hash_str}')
request_hash = sha256(hash_str.encode('utf-8')).hexdigest()
case _:
raise DGPUComputeError(
f'Unsupported backend {self.backend}'
)
# TODO: validate request
self._last_generation_ts: str = datetime.now().isoformat()
self._last_benchmark: list[float] = self._benchmark
self._benchmark: list[float] = []
# perform work
logging.info(f'working on {body}')
ipfs_hash = await self.conn.publish_on_ipfs(output, typ=output_type)
resp = await self.conn.begin_work(rid)
if not resp or 'code' in resp:
logging.info('probably being worked on already... skip.')
await self.conn.submit_work(rid, request_hash, output_hash, ipfs_hash)
else:
try:
output_type = 'png'
if 'output_type' in body['params']:
output_type = body['params']['output_type']
except BaseException as err:
logging.exception('Failed to serve model request !?\n')
await self.conn.cancel_work(rid, str(err))
output = None
output_hash = None
match self.backend:
case 'sync-on-thread':
self.mm._should_cancel = self.should_cancel_work
output_hash, output = await trio.to_thread.run_sync(
partial(
self.mm.compute_one,
rid,
body['method'], body['params'],
inputs=inputs
)
)
case _:
raise DGPUComputeError(
f'Unsupported backend {self.backend}'
)
self._last_generation_ts: str = datetime.now().isoformat()
self._last_benchmark: list[float] = self._benchmark
self._benchmark: list[float] = []
ipfs_hash = await self.conn.publish_on_ipfs(output, typ=output_type)
await self.conn.submit_work(rid, request_hash, output_hash, ipfs_hash)
except BaseException as err:
logging.exception('Failed to serve model request !?\n')
# traceback.print_exc() # TODO? <- replaced by above ya?
await self.conn.cancel_work(rid, str(err))
finally:
return True
finally:
return True
# TODO, as per above on `.maybe_serve_one()`, it's likely a bit
# more *trionic* to define this all as a module level task-func

View File

@ -72,9 +72,6 @@ class NetConnector:
self.cleos = CLEOS(endpoint=self.node_url)
self.cleos.load_abi('gpu.scd', GPU_CONTRACT_ABI)
self.ipfs_gateway_url = None
if 'ipfs_gateway_url' in config:
self.ipfs_gateway_url = config['ipfs_gateway_url']
self.ipfs_url = config['ipfs_url']
self.ipfs_client = AsyncIPFSHTTP(self.ipfs_url)
@ -89,7 +86,7 @@ class NetConnector:
async def get_work_requests_last_hour(self):
logging.info('get_work_requests_last_hour')
return await failable(
rows = await failable(
partial(
self.cleos.aget_table,
'gpu.scd', 'gpu.scd', 'queue',
@ -98,13 +95,19 @@ class NetConnector:
lower_bound=int(time.time()) - 3600
), ret_fail=[])
logging.info(f'found {len(rows)} requests on queue')
return rows
async def get_status_by_request_id(self, request_id: int):
logging.info('get_status_by_request_id')
return await failable(
rows = await failable(
partial(
self.cleos.aget_table,
'gpu.scd', request_id, 'status'), ret_fail=[])
logging.info(f'found status for workers: {[r["worker"] for r in rows]}')
return rows
async def get_global_config(self):
logging.info('get_global_config')
rows = await failable(
@ -113,8 +116,11 @@ class NetConnector:
'gpu.scd', 'gpu.scd', 'config'))
if rows:
return rows[0]
cfg = rows[0]
logging.info(f'config found: {cfg}')
return cfg
else:
logging.error('global config not found, is the contract initialized?')
return None
async def get_worker_balance(self):
@ -130,20 +136,13 @@ class NetConnector:
))
if rows:
return rows[0]['balance']
b = rows[0]['balance']
logging.info(f'balance: {b}')
return b
else:
logging.info('no balance info found')
return None
async def get_competitors_for_req(self, request_id: int) -> set:
competitors = [
status['worker']
for status in
(await self.get_status_by_request_id(request_id))
if status['worker'] != self.account
]
logging.info(f'competitors: {competitors}')
return set(competitors)
# TODO, considery making this a NON-method and instead
# handing in the `snap['queue']` output beforehand?
# -> since that call is the only usage of `self`?
@ -172,7 +171,7 @@ class NetConnector:
step.
'''
logging.info('begin_work')
logging.info(f'begin_work on #{request_id}')
return await failable(
partial(
self.cleos.a_push_action,
@ -189,7 +188,7 @@ class NetConnector:
)
async def cancel_work(self, request_id: int, reason: str):
logging.info('cancel_work')
logging.info(f'cancel_work on #{request_id}')
return await failable(
partial(
self.cleos.a_push_action,
@ -229,7 +228,7 @@ class NetConnector:
async def find_results(self):
logging.info('find_results')
return await failable(
rows = await failable(
partial(
self.cleos.aget_table,
'gpu.scd', 'gpu.scd', 'results',
@ -239,6 +238,7 @@ class NetConnector:
upper_bound=self.account
)
)
return rows
async def submit_work(
self,
@ -247,7 +247,7 @@ class NetConnector:
result_hash: str,
ipfs_hash: str
):
logging.info('submit_work')
logging.info('submit_work #{request_id}')
return await failable(
partial(
self.cleos.a_push_action,
@ -280,17 +280,12 @@ class NetConnector:
case _:
raise ValueError(f'Unsupported output type: {typ}')
if self.ipfs_gateway_url:
# check peer connections, reconnect to skynet gateway if not
gateway_id = Path(self.ipfs_gateway_url).name
peers = await self.ipfs_client.peers()
if gateway_id not in [p['Peer'] for p in peers]:
await self.ipfs_client.connect(self.ipfs_gateway_url)
file_info = await self.ipfs_client.add(Path(target_file))
file_cid = file_info['Hash']
logging.info(f'added file to ipfs, CID: {file_cid}')
await self.ipfs_client.pin(file_cid)
logging.info(f'pinned {file_cid}')
return file_cid
@ -306,11 +301,11 @@ class NetConnector:
link = f'https://{self.ipfs_domain}/ipfs/{ipfs_hash}'
res = await get_ipfs_file(link, timeout=1)
logging.info(f'got response from {link}')
if not res or res.status_code != 200:
logging.warning(f'couldn\'t get ipfs binary data at {link}!')
# attempt to decode as image
input_data = Image.open(io.BytesIO(res.raw))
logging.info('decoded as image successfully')
return input_data