Remove breakpoints, simplify backoff logic

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-27 22:01:44 -04:00
parent ff22f2d240
commit 8af5fe3c64
1 changed files with 20 additions and 20 deletions

View File

@ -181,6 +181,7 @@ class Client:
self._key_id = config.key_id self._key_id = config.key_id
self._key_secret = config.key_secret self._key_secret = config.key_secret
self._key_passphrase = config.key_passphrase self._key_passphrase = config.key_passphrase
log.info('User credentials added')
def _gen_auth_req_headers( def _gen_auth_req_headers(
self, self,
@ -193,7 +194,6 @@ class Client:
https://docs.kucoin.com/#authentication https://docs.kucoin.com/#authentication
''' '''
breakpoint()
now = int(time.time() * 1000) now = int(time.time() * 1000)
path = f'/api/{api_v}{endpoint}' path = f'/api/{api_v}{endpoint}'
str_to_sign = str(now) + action + path str_to_sign = str(now) + action + path
@ -273,6 +273,8 @@ class Client:
entries = await self._request('GET', '/symbols') entries = await self._request('GET', '/symbols')
syms = {item['name']: KucoinMktPair(**item) for item in entries} syms = {item['name']: KucoinMktPair(**item) for item in entries}
log.info('Kucoin market pairs fetches')
return syms return syms
async def cache_pairs( async def cache_pairs(
@ -358,7 +360,7 @@ class Client:
if not isinstance(data, list): if not isinstance(data, list):
# Do a gradual backoff if Kucoin is rate limiting us # Do a gradual backoff if Kucoin is rate limiting us
backoff_interval = i + (randint(0, 1000) / 1000) backoff_interval = i
log.warn(f'History call failed, backing off for {backoff_interval}s') log.warn(f'History call failed, backing off for {backoff_interval}s')
await trio.sleep(backoff_interval) await trio.sleep(backoff_interval)
else: else:
@ -388,7 +390,6 @@ class Client:
case 'index': case 'index':
row.append(int(value)) row.append(int(value))
case 'time': case 'time':
# row.append(int(value) + (3600 * 4))
row.append(value) row.append(value)
case _: case _:
row.append(float(value)) row.append(float(value))
@ -399,10 +400,6 @@ class Client:
return array return array
def kucoin_timestamp(dt: datetime):
return math.trunc(time.mktime(dt.timetuple()))
def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str: def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str:
pair_data = pairs[fqsn] pair_data = pairs[fqsn]
return pair_data.baseCurrency + '-' + pair_data.quoteCurrency return pair_data.baseCurrency + '-' + pair_data.quoteCurrency
@ -483,12 +480,13 @@ async def stream_quotes(
''' '''
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# TODO: cache this task so it's only called once
async def ping_server(): async def ping_server():
while True: while True:
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.info(f'Starting ping task for {sym}')
n.start_soon(ping_server) n.start_soon(ping_server)
yield ws yield ws
@ -497,12 +495,13 @@ async def stream_quotes(
# Spawn the ping task here # Spawn the ping task here
async with open_ping_task(ws) as ws: async with open_ping_task(ws) as ws:
# subscribe to market feedz here tasks = []
log.info(f'Subscribing to {kucoin_sym} feed') tasks.append(make_sub(kucoin_sym, connect_id, level='l3'))
trade_sub = make_sub(kucoin_sym, connect_id, level='l3') tasks.append(make_sub(kucoin_sym, connect_id, level='l1'))
l1_sub = make_sub(kucoin_sym, connect_id, level='l1')
await ws.send_msg(trade_sub) for task in tasks:
await ws.send_msg(l1_sub) log.info(f'Subscribing to {task.level} feed for {sym}')
await ws.send_msg(task)
yield yield
@ -547,6 +546,7 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]:
'privateChannel': False, 'privateChannel': False,
'response': True, 'response': True,
} }
case 'l3': case 'l3':
return { return {
'id': connect_id, 'id': connect_id,
@ -555,8 +555,6 @@ def make_sub(sym, connect_id, level='l1') -> dict[str, str | bool]:
'privateChannel': False, 'privateChannel': False,
'response': True, 'response': True,
} }
case _:
return {}
async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
@ -630,6 +628,7 @@ async def open_history_client(
type: str = '1m', type: str = '1m',
) -> AsyncGenerator[Callable, None]: ) -> AsyncGenerator[Callable, None]:
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
log.info('Attempting to open kucoin history client') log.info('Attempting to open kucoin history client')
async def get_ohlc_history( async def get_ohlc_history(
@ -637,6 +636,7 @@ async def open_history_client(
end_dt: datetime | None = None, end_dt: datetime | None = None,
start_dt: datetime | None = None, start_dt: datetime | None = None,
) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end ) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end
if timeframe != 60: if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported') raise DataUnavailable('Only 1m bars are supported')
@ -647,6 +647,7 @@ async def open_history_client(
) )
times = array['time'] times = array['time']
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
@ -656,13 +657,12 @@ async def open_history_client(
if (inow - times[-1]) > 60: if (inow - times[-1]) > 60:
await tractor.breakpoint() await tractor.breakpoint()
start_dt = pendulum.from_timestamp(times[0]) start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1]) end_dt = pendulum.from_timestamp(times[-1])
log.info('History succesfully fetched baby') log.info('History succesfully fetched baby')
# breakpoint()
# print(f'OUTPUTTED END TIME: {time.ctime(kucoin_timestamp(end_dt))}')
# print(f'OUTPUTTED START TIME: {time.ctime(kucoin_timestamp(start_dt))}')
# print(f'DIFFERENCE IN MINUTES {(end_dt - start_dt).in_minutes()}')
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc_history, {} yield get_ohlc_history, {}