diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 85ee7de6..743a78c2 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -197,10 +197,12 @@ class Client: ''' if not self._config: - raise ValueError('No config found when trying to send authenticated request') + raise ValueError( + 'No config found when trying to send authenticated request') str_to_sign = ( - str(int(time.time() * 1000)) + action + f'/api/{api_v}{endpoint}' + str(int(time.time() * 1000)) + + action + f'/api/{api_v}{endpoint}' ) signature = base64.b64encode( @@ -240,7 +242,8 @@ class Client: ''' if self._config: - headers = self._gen_auth_req_headers(action, endpoint, api_v) + headers = self._gen_auth_req_headers( + action, endpoint, api_v) api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' @@ -271,7 +274,8 @@ class Client: 'POST', f'/bullet-{token_type}', 'v1' ) except Exception as e: - log.error(f'Error making request for Kucoin ws token -> {str(e)}') + log.error( + f'Error making request for Kucoin ws token -> {str(e)}') return None if data and 'token' in data: @@ -371,7 +375,8 @@ class Client: end_dt = pendulum.now('UTC').add(minutes=1) if start_dt is None: - start_dt = end_dt.start_of('minute').subtract(minutes=limit) + start_dt = end_dt.start_of( + 'minute').subtract(minutes=limit) start_dt = int(start_dt.timestamp()) end_dt = int(end_dt.timestamp()) @@ -429,7 +434,8 @@ class Client: ) ) - array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars + array = np.array( + new_bars, dtype=_ohlc_dtype) if as_np else bars return array @@ -497,7 +503,8 @@ async def stream_quotes( feed_is_live: trio.Event, loglevel: str = '', # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple[dict, dict] + ] = trio.TASK_STATUS_IGNORED, ) -> None: ''' Required piker api to stream real-time data. @@ -556,7 +563,7 @@ async def stream_quotes( @acm -async def subscribe(ws: wsproto.WSConnection, connect_id, sym): +async def subscribe(ws: wsproto.WSConnection, connect_id, sym) -> AsyncGenerator[None, None]: # level 2 sub await ws.send_msg( { @@ -608,7 +615,8 @@ async def stream_messages( if cs.cancelled_caught: timeouts += 1 if timeouts > 2: - log.error('kucoin feed is sh**ing the bed... rebooting...') + log.error( + 'kucoin feed is sh**ing the bed... rebooting...') await ws._connect() continue @@ -670,7 +678,8 @@ async def stream_messages( ], } - + case _: + log.warn(f'Unhandled message: {msg}') @acm @@ -685,9 +694,9 @@ async def open_history_client( end_dt: datetime | None = None, start_dt: datetime | None = None, ) -> tuple[ - np.ndarray, datetime | - None, datetime | - None + np.ndarray, datetime + | None, datetime + | None ]: # start # end if timeframe != 60: raise DataUnavailable('Only 1m bars are supported')