From 399299c62b43dac7ae340cbe750e36d653cc9d7e Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Mon, 3 Feb 2025 20:07:45 -0300 Subject: [PATCH] Further improvements in indentation and logic in daemons maybe_serve_one, also might of fixed a bug related to using id instead of request_id in the search existing results phase, and add way more logging --- skynet/dgpu/__init__.py | 7 ++ skynet/dgpu/compute.py | 14 ++-- skynet/dgpu/daemon.py | 171 ++++++++++++++++++++++------------------ skynet/dgpu/network.py | 53 ++++++------- 4 files changed, 130 insertions(+), 115 deletions(-) diff --git a/skynet/dgpu/__init__.py b/skynet/dgpu/__init__.py index eb93697..4371f83 100755 --- a/skynet/dgpu/__init__.py +++ b/skynet/dgpu/__init__.py @@ -1,3 +1,5 @@ +import logging + import trio from hypercorn.config import Config @@ -16,6 +18,10 @@ async def open_dgpu_node(config: dict) -> None: and *maybe* serve a `hypercorn` web API. ''' + + # suppress logs from httpx (logs url + status after every query) + logging.getLogger("httpx").setLevel(logging.WARNING) + conn = NetConnector(config) mm = ModelMngr(config) daemon = WorkerDaemon(mm, conn, config) @@ -33,6 +39,7 @@ async def open_dgpu_node(config: dict) -> None: # TODO, consider a more explicit `as hypercorn_serve` # to clarify? if api: + logging.info(f'serving api @ {config["api_bind"]}') tn.start_soon(serve, api, api_conf) # block until cancelled diff --git a/skynet/dgpu/compute.py b/skynet/dgpu/compute.py index 085b2a3..8b10bd7 100755 --- a/skynet/dgpu/compute.py +++ b/skynet/dgpu/compute.py @@ -83,8 +83,8 @@ class ModelMngr: # self.load_model(DEFAULT_INITAL_MODEL, 'txt2img') def log_debug_info(self): - logging.info('memory summary:') - logging.info('\n' + torch.cuda.memory_summary()) + logging.debug('memory summary:') + logging.debug('\n' + torch.cuda.memory_summary()) def is_model_loaded(self, name: str, mode: str): if (name == self._model_name and @@ -114,6 +114,8 @@ class ModelMngr: name, mode, cache_dir=self.cache_dir) self._model_mode = mode self._model_name = name + logging.info('{name} loaded!') + self.log_debug_info() def compute_one( self, @@ -126,11 +128,7 @@ class ModelMngr: if self._should_cancel: should_raise = trio.from_thread.run(self._should_cancel, request_id) if should_raise: - logging.warn(f'cancelling work at step {step}') - - # ?TODO, this is never caught, so why is it - # raised specially? - raise DGPUInferenceCancelled() + logging.warn(f'CANCELLING work at step {step}') return {} @@ -206,8 +204,6 @@ class ModelMngr: raise DGPUComputeError('Unsupported compute method') except BaseException as err: - logging.error(err) - # to see the src exc in tb raise DGPUComputeError(str(err)) from err finally: diff --git a/skynet/dgpu/daemon.py b/skynet/dgpu/daemon.py index 1e7cdb5..137259b 100755 --- a/skynet/dgpu/daemon.py +++ b/skynet/dgpu/daemon.py @@ -105,7 +105,11 @@ class WorkerDaemon: for status in self._snap['requests'][request_id] if status['worker'] != self.account ]) - return bool(self.non_compete & competitors) + logging.info('should cancel work?') + logging.info(f'competitors: {competitors}') + should_cancel = bool(self.non_compete & competitors) + logging.info(f'cancel: {should_cancel}') + return should_cancel async def snap_updater_task(self): @@ -150,6 +154,7 @@ class WorkerDaemon: req: dict, ): rid = req['id'] + logging.info(f'maybe serve request #{rid}') # parse request body = json.loads(req['body']) @@ -161,7 +166,7 @@ class WorkerDaemon: and model not in MODELS ): - logging.warning(f'Unknown model {model}') + logging.warning(f'unknown model {model}!, skip...') return False # only handle whitelisted models @@ -170,98 +175,110 @@ class WorkerDaemon: and model not in self.model_whitelist ): + logging.warning('model not whitelisted!, skip...') return False # if blacklist contains model skip if model in self.model_blacklist: + logging.warning('model not blacklisted!, skip...') return False - results = [res['id'] for res in self._snap['results']] + results = [res['request_id'] for res in self._snap['results']] - # if worker is already on that request or - # if worker has a stale status for that request - if rid in results or rid not in self._snap['requests']: - logging.info(f'request {rid} already beign worked on, skip...') - return + # if worker already produced a result for this request + if rid in results: + logging.info(f'worker already submitted a result for request #{rid}, skip...') + return False statuses = self._snap['requests'][rid] - if len(statuses) == 0: - inputs = [] - for _input in req['binary_data'].split(','): - if _input: - for _ in range(3): - try: - # user `GPUConnector` to IO with - # storage layer to seed the compute - # task. - img = await self.conn.get_input_data(_input) - inputs.append(img) - break - except BaseException: - logging.exception( - 'Model input error !?!\n' + # skip if workers in non_compete already on it + competitors = set((status['worker'] for status in statuses)) + if bool(self.non_compete & competitors): + logging.info('worker in configured non_compete list already working on request, skip...') + return False + + # resolve the ipfs hashes into the actual data behind them + inputs = [] + raw_inputs = req['binary_data'].split(',') + if raw_inputs: + logging.info(f'fetching IPFS inputs: {raw_inputs}') + + retry = 3 + for _input in req['binary_data'].split(','): + if _input: + for r in range(retry): + try: + # user `GPUConnector` to IO with + # storage layer to seed the compute + # task. + img = await self.conn.get_input_data(_input) + inputs.append(img) + logging.info(f'retrieved {_input}!') + break + + except BaseException: + logging.exception( + f'IPFS fetch input error !?! retries left {retry - r - 1}\n' + ) + + # compute unique request hash used on submit + hash_str = ( + str(req['nonce']) + + + req['body'] + + + req['binary_data'] + ) + logging.debug(f'hashing: {hash_str}') + request_hash = sha256(hash_str.encode('utf-8')).hexdigest() + logging.info(f'calculated request hash: {request_hash}') + + # TODO: validate request + + resp = await self.conn.begin_work(rid) + if not resp or 'code' in resp: + logging.info('begin_work error, probably being worked on already... skip.') + + else: + try: + output_type = 'png' + if 'output_type' in body['params']: + output_type = body['params']['output_type'] + + output = None + output_hash = None + match self.backend: + case 'sync-on-thread': + self.mm._should_cancel = self.should_cancel_work + output_hash, output = await trio.to_thread.run_sync( + partial( + self.mm.compute_one, + rid, + body['method'], body['params'], + inputs=inputs ) + ) - hash_str = ( - str(req['nonce']) - + - req['body'] - + - req['binary_data'] - ) - logging.info(f'hashing: {hash_str}') - request_hash = sha256(hash_str.encode('utf-8')).hexdigest() + case _: + raise DGPUComputeError( + f'Unsupported backend {self.backend}' + ) - # TODO: validate request + self._last_generation_ts: str = datetime.now().isoformat() + self._last_benchmark: list[float] = self._benchmark + self._benchmark: list[float] = [] - # perform work - logging.info(f'working on {body}') + ipfs_hash = await self.conn.publish_on_ipfs(output, typ=output_type) - resp = await self.conn.begin_work(rid) - if not resp or 'code' in resp: - logging.info('probably being worked on already... skip.') + await self.conn.submit_work(rid, request_hash, output_hash, ipfs_hash) - else: - try: - output_type = 'png' - if 'output_type' in body['params']: - output_type = body['params']['output_type'] + except BaseException as err: + logging.exception('Failed to serve model request !?\n') + await self.conn.cancel_work(rid, str(err)) - output = None - output_hash = None - match self.backend: - case 'sync-on-thread': - self.mm._should_cancel = self.should_cancel_work - output_hash, output = await trio.to_thread.run_sync( - partial( - self.mm.compute_one, - rid, - body['method'], body['params'], - inputs=inputs - ) - ) - - case _: - raise DGPUComputeError( - f'Unsupported backend {self.backend}' - ) - - self._last_generation_ts: str = datetime.now().isoformat() - self._last_benchmark: list[float] = self._benchmark - self._benchmark: list[float] = [] - - ipfs_hash = await self.conn.publish_on_ipfs(output, typ=output_type) - - await self.conn.submit_work(rid, request_hash, output_hash, ipfs_hash) - - except BaseException as err: - logging.exception('Failed to serve model request !?\n') - # traceback.print_exc() # TODO? <- replaced by above ya? - await self.conn.cancel_work(rid, str(err)) - - finally: - return True + finally: + return True # TODO, as per above on `.maybe_serve_one()`, it's likely a bit # more *trionic* to define this all as a module level task-func diff --git a/skynet/dgpu/network.py b/skynet/dgpu/network.py index 0f95b65..03fc303 100755 --- a/skynet/dgpu/network.py +++ b/skynet/dgpu/network.py @@ -72,9 +72,6 @@ class NetConnector: self.cleos = CLEOS(endpoint=self.node_url) self.cleos.load_abi('gpu.scd', GPU_CONTRACT_ABI) - self.ipfs_gateway_url = None - if 'ipfs_gateway_url' in config: - self.ipfs_gateway_url = config['ipfs_gateway_url'] self.ipfs_url = config['ipfs_url'] self.ipfs_client = AsyncIPFSHTTP(self.ipfs_url) @@ -89,7 +86,7 @@ class NetConnector: async def get_work_requests_last_hour(self): logging.info('get_work_requests_last_hour') - return await failable( + rows = await failable( partial( self.cleos.aget_table, 'gpu.scd', 'gpu.scd', 'queue', @@ -98,13 +95,19 @@ class NetConnector: lower_bound=int(time.time()) - 3600 ), ret_fail=[]) + logging.info(f'found {len(rows)} requests on queue') + return rows + async def get_status_by_request_id(self, request_id: int): logging.info('get_status_by_request_id') - return await failable( + rows = await failable( partial( self.cleos.aget_table, 'gpu.scd', request_id, 'status'), ret_fail=[]) + logging.info(f'found status for workers: {[r["worker"] for r in rows]}') + return rows + async def get_global_config(self): logging.info('get_global_config') rows = await failable( @@ -113,8 +116,11 @@ class NetConnector: 'gpu.scd', 'gpu.scd', 'config')) if rows: - return rows[0] + cfg = rows[0] + logging.info(f'config found: {cfg}') + return cfg else: + logging.error('global config not found, is the contract initialized?') return None async def get_worker_balance(self): @@ -130,20 +136,13 @@ class NetConnector: )) if rows: - return rows[0]['balance'] + b = rows[0]['balance'] + logging.info(f'balance: {b}') + return b else: + logging.info('no balance info found') return None - async def get_competitors_for_req(self, request_id: int) -> set: - competitors = [ - status['worker'] - for status in - (await self.get_status_by_request_id(request_id)) - if status['worker'] != self.account - ] - logging.info(f'competitors: {competitors}') - return set(competitors) - # TODO, considery making this a NON-method and instead # handing in the `snap['queue']` output beforehand? # -> since that call is the only usage of `self`? @@ -172,7 +171,7 @@ class NetConnector: step. ''' - logging.info('begin_work') + logging.info(f'begin_work on #{request_id}') return await failable( partial( self.cleos.a_push_action, @@ -189,7 +188,7 @@ class NetConnector: ) async def cancel_work(self, request_id: int, reason: str): - logging.info('cancel_work') + logging.info(f'cancel_work on #{request_id}') return await failable( partial( self.cleos.a_push_action, @@ -229,7 +228,7 @@ class NetConnector: async def find_results(self): logging.info('find_results') - return await failable( + rows = await failable( partial( self.cleos.aget_table, 'gpu.scd', 'gpu.scd', 'results', @@ -239,6 +238,7 @@ class NetConnector: upper_bound=self.account ) ) + return rows async def submit_work( self, @@ -247,7 +247,7 @@ class NetConnector: result_hash: str, ipfs_hash: str ): - logging.info('submit_work') + logging.info('submit_work #{request_id}') return await failable( partial( self.cleos.a_push_action, @@ -280,17 +280,12 @@ class NetConnector: case _: raise ValueError(f'Unsupported output type: {typ}') - if self.ipfs_gateway_url: - # check peer connections, reconnect to skynet gateway if not - gateway_id = Path(self.ipfs_gateway_url).name - peers = await self.ipfs_client.peers() - if gateway_id not in [p['Peer'] for p in peers]: - await self.ipfs_client.connect(self.ipfs_gateway_url) - file_info = await self.ipfs_client.add(Path(target_file)) file_cid = file_info['Hash'] + logging.info(f'added file to ipfs, CID: {file_cid}') await self.ipfs_client.pin(file_cid) + logging.info(f'pinned {file_cid}') return file_cid @@ -306,11 +301,11 @@ class NetConnector: link = f'https://{self.ipfs_domain}/ipfs/{ipfs_hash}' res = await get_ipfs_file(link, timeout=1) - logging.info(f'got response from {link}') if not res or res.status_code != 200: logging.warning(f'couldn\'t get ipfs binary data at {link}!') # attempt to decode as image input_data = Image.open(io.BytesIO(res.raw)) + logging.info('decoded as image successfully') return input_data