From 8af5fe3c64fe9e1605d86a5c35499e74fca51caf Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Mon, 27 Mar 2023 22:01:44 -0400 Subject: [PATCH] Remove breakpoints, simplify backoff logic --- piker/brokers/kucoin.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index e42ed37d..c851ed6d 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -181,6 +181,7 @@ class Client: self._key_id = config.key_id self._key_secret = config.key_secret self._key_passphrase = config.key_passphrase + log.info('User credentials added') def _gen_auth_req_headers( self, @@ -193,7 +194,6 @@ class Client: https://docs.kucoin.com/#authentication ''' - breakpoint() now = int(time.time() * 1000) path = f'/api/{api_v}{endpoint}' str_to_sign = str(now) + action + path @@ -273,6 +273,8 @@ class Client: entries = await self._request('GET', '/symbols') syms = {item['name']: KucoinMktPair(**item) for item in entries} + + log.info('Kucoin market pairs fetches') return syms async def cache_pairs( @@ -358,7 +360,7 @@ class Client: if not isinstance(data, list): # Do a gradual backoff if Kucoin is rate limiting us - backoff_interval = i + (randint(0, 1000) / 1000) + backoff_interval = i log.warn(f'History call failed, backing off for {backoff_interval}s') await trio.sleep(backoff_interval) else: @@ -388,7 +390,6 @@ class Client: case 'index': row.append(int(value)) case 'time': - # row.append(int(value) + (3600 * 4)) row.append(value) case _: row.append(float(value)) @@ -399,10 +400,6 @@ class Client: return array -def kucoin_timestamp(dt: datetime): - return math.trunc(time.mktime(dt.timetuple())) - - def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str: pair_data = pairs[fqsn] return pair_data.baseCurrency + '-' + pair_data.quoteCurrency @@ -483,12 +480,13 @@ async def stream_quotes( ''' async with trio.open_nursery() as n: - + # TODO: cache this task so it's only called once async def ping_server(): while True: await trio.sleep((ping_interval - 1000) / 1000) await ws.send_msg({'id': connect_id, 'type': 'ping'}) + log.info(f'Starting ping task for {sym}') n.start_soon(ping_server) yield ws @@ -497,12 +495,13 @@ async def stream_quotes( # Spawn the ping task here async with open_ping_task(ws) as ws: - # subscribe to market feedz here - log.info(f'Subscribing to {kucoin_sym} feed') - trade_sub = make_sub(kucoin_sym, connect_id, level='l3') - l1_sub = make_sub(kucoin_sym, connect_id, level='l1') - await ws.send_msg(trade_sub) - await ws.send_msg(l1_sub) + tasks = [] + tasks.append(make_sub(kucoin_sym, connect_id, level='l3')) + tasks.append(make_sub(kucoin_sym, connect_id, level='l1')) + + for task in tasks: + log.info(f'Subscribing to {task.level} feed for {sym}') + await ws.send_msg(task) yield @@ -547,6 +546,7 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]: 'privateChannel': False, 'response': True, } + case 'l3': return { 'id': connect_id, @@ -555,8 +555,6 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]: 'privateChannel': False, 'response': True, } - case _: - return {} async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: @@ -630,6 +628,7 @@ async def open_history_client( type: str = '1m', ) -> AsyncGenerator[Callable, None]: async with open_cached_client('kucoin') as client: + log.info('Attempting to open kucoin history client') async def get_ohlc_history( @@ -637,6 +636,7 @@ async def open_history_client( end_dt: datetime | None = None, start_dt: datetime | None = None, ) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end + if timeframe != 60: raise DataUnavailable('Only 1m bars are supported') @@ -647,6 +647,7 @@ async def open_history_client( ) times = array['time'] + if end_dt is None: inow = round(time.time()) @@ -656,13 +657,12 @@ async def open_history_client( if (inow - times[-1]) > 60: await tractor.breakpoint() + start_dt = pendulum.from_timestamp(times[0]) end_dt = pendulum.from_timestamp(times[-1]) + log.info('History succesfully fetched baby') - # breakpoint() - # print(f'OUTPUTTED END TIME: {time.ctime(kucoin_timestamp(end_dt))}') - # print(f'OUTPUTTED START TIME: {time.ctime(kucoin_timestamp(start_dt))}') - # print(f'DIFFERENCE IN MINUTES {(end_dt - start_dt).in_minutes()}') + return array, start_dt, end_dt yield get_ohlc_history, {}