Run autopep8, add default case for message stream match case

py311
jaredgoldman 2023-04-21 21:16:14 -04:00
parent ae3f6696a7
commit 3836f7d458
1 changed files with 22 additions and 13 deletions

View File

@ -197,10 +197,12 @@ class Client:
''' '''
if not self._config: if not self._config:
raise ValueError('No config found when trying to send authenticated request') raise ValueError(
'No config found when trying to send authenticated request')
str_to_sign = ( str_to_sign = (
str(int(time.time() * 1000)) + action + f'/api/{api_v}{endpoint}' str(int(time.time() * 1000))
+ action + f'/api/{api_v}{endpoint}'
) )
signature = base64.b64encode( signature = base64.b64encode(
@ -240,7 +242,8 @@ class Client:
''' '''
if self._config: if self._config:
headers = self._gen_auth_req_headers(action, endpoint, api_v) headers = self._gen_auth_req_headers(
action, endpoint, api_v)
api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}'
@ -271,7 +274,8 @@ class Client:
'POST', f'/bullet-{token_type}', 'v1' 'POST', f'/bullet-{token_type}', 'v1'
) )
except Exception as e: except Exception as e:
log.error(f'Error making request for Kucoin ws token -> {str(e)}') log.error(
f'Error making request for Kucoin ws token -> {str(e)}')
return None return None
if data and 'token' in data: if data and 'token' in data:
@ -371,7 +375,8 @@ class Client:
end_dt = pendulum.now('UTC').add(minutes=1) end_dt = pendulum.now('UTC').add(minutes=1)
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of('minute').subtract(minutes=limit) start_dt = end_dt.start_of(
'minute').subtract(minutes=limit)
start_dt = int(start_dt.timestamp()) start_dt = int(start_dt.timestamp())
end_dt = int(end_dt.timestamp()) end_dt = int(end_dt.timestamp())
@ -429,7 +434,8 @@ class Client:
) )
) )
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars array = np.array(
new_bars, dtype=_ohlc_dtype) if as_np else bars
return array return array
@ -497,7 +503,8 @@ async def stream_quotes(
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = '', loglevel: str = '',
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]
] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
Required piker api to stream real-time data. Required piker api to stream real-time data.
@ -556,7 +563,7 @@ async def stream_quotes(
@acm @acm
async def subscribe(ws: wsproto.WSConnection, connect_id, sym): async def subscribe(ws: wsproto.WSConnection, connect_id, sym) -> AsyncGenerator[None, None]:
# level 2 sub # level 2 sub
await ws.send_msg( await ws.send_msg(
{ {
@ -608,7 +615,8 @@ async def stream_messages(
if cs.cancelled_caught: if cs.cancelled_caught:
timeouts += 1 timeouts += 1
if timeouts > 2: if timeouts > 2:
log.error('kucoin feed is sh**ing the bed... rebooting...') log.error(
'kucoin feed is sh**ing the bed... rebooting...')
await ws._connect() await ws._connect()
continue continue
@ -670,7 +678,8 @@ async def stream_messages(
], ],
} }
case _:
log.warn(f'Unhandled message: {msg}')
@acm @acm
@ -685,9 +694,9 @@ async def open_history_client(
end_dt: datetime | None = None, end_dt: datetime | None = None,
start_dt: datetime | None = None, start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[
np.ndarray, datetime | np.ndarray, datetime
None, datetime | | None, datetime
None | None
]: # start # end ]: # start # end
if timeframe != 60: if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported') raise DataUnavailable('Only 1m bars are supported')