Use singlequotes
							parent
							
								
									9fcfb8d780
								
							
						
					
					
						commit
						fcdddadec1
					
				|  | @ -13,10 +13,10 @@ | |||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| """ | ||||
| ''' | ||||
| Kucoin broker backend | ||||
| 
 | ||||
| """ | ||||
| ''' | ||||
| 
 | ||||
| from typing import Any, Callable, Literal, AsyncGenerator | ||||
| from contextlib import asynccontextmanager as acm | ||||
|  | @ -50,23 +50,23 @@ from ..data._web_bs import ( | |||
| log = get_logger(__name__) | ||||
| 
 | ||||
| _ohlc_dtype = [ | ||||
|     ("index", int), | ||||
|     ("time", int), | ||||
|     ("open", float), | ||||
|     ("high", float), | ||||
|     ("low", float), | ||||
|     ("close", float), | ||||
|     ("volume", float), | ||||
|     ("bar_wap", float),  # will be zeroed by sampler if not filled | ||||
|     ('index', int), | ||||
|     ('time', int), | ||||
|     ('open', float), | ||||
|     ('high', float), | ||||
|     ('low', float), | ||||
|     ('close', float), | ||||
|     ('volume', float), | ||||
|     ('bar_wap', float),  # will be zeroed by sampler if not filled | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| class KucoinMktPair(Struct, frozen=True): | ||||
|     """ | ||||
|     ''' | ||||
|     Kucoin's pair format: | ||||
|     https://docs.kucoin.com/#get-symbols-list | ||||
| 
 | ||||
|     """ | ||||
|     ''' | ||||
| 
 | ||||
|     baseCurrency: str | ||||
|     baseIncrement: float | ||||
|  | @ -88,11 +88,11 @@ class KucoinMktPair(Struct, frozen=True): | |||
| 
 | ||||
| 
 | ||||
| class AccountTrade(Struct, frozen=True): | ||||
|     """ | ||||
|     ''' | ||||
|     Historical trade format: | ||||
|     https://docs.kucoin.com/#get-account-ledgers | ||||
| 
 | ||||
|     """ | ||||
|     ''' | ||||
| 
 | ||||
|     id: str | ||||
|     currency: str | ||||
|  | @ -101,16 +101,16 @@ class AccountTrade(Struct, frozen=True): | |||
|     balance: float | ||||
|     accountType: str | ||||
|     bizType: str | ||||
|     direction: Literal["in", "out"] | ||||
|     direction: Literal['in', 'out'] | ||||
|     createdAt: float | ||||
|     context: list[str] | ||||
| 
 | ||||
| 
 | ||||
| class AccountResponse(Struct, frozen=True): | ||||
|     """ | ||||
|     ''' | ||||
|     https://docs.kucoin.com/#get-account-ledgers | ||||
| 
 | ||||
|     """ | ||||
|     ''' | ||||
| 
 | ||||
|     currentPage: int | ||||
|     pageSize: int | ||||
|  | @ -120,11 +120,11 @@ class AccountResponse(Struct, frozen=True): | |||
| 
 | ||||
| 
 | ||||
| class KucoinTrade(Struct, frozen=True): | ||||
|     """ | ||||
|     ''' | ||||
|     Real-time trade format: | ||||
|     https://docs.kucoin.com/#symbol-ticker | ||||
| 
 | ||||
|     """ | ||||
|     ''' | ||||
| 
 | ||||
|     bestAsk: float | ||||
|     bestAskSize: float | ||||
|  | @ -137,11 +137,11 @@ class KucoinTrade(Struct, frozen=True): | |||
| 
 | ||||
| 
 | ||||
| class KucoinL2(Struct, frozen=True): | ||||
|     """ | ||||
|     ''' | ||||
|     Real-time L2 order book format: | ||||
|     https://docs.kucoin.com/#level2-5-best-ask-bid-orders | ||||
| 
 | ||||
|     """ | ||||
|     ''' | ||||
| 
 | ||||
|     asks: list[list[float]] | ||||
|     bids: list[list[float]] | ||||
|  | @ -149,10 +149,10 @@ class KucoinL2(Struct, frozen=True): | |||
| 
 | ||||
| 
 | ||||
| class KucoinMsg(Struct, frozen=True): | ||||
|     """ | ||||
|     ''' | ||||
|     Generic outer-wrapper for any Kucoin ws msg | ||||
| 
 | ||||
|     """ | ||||
|     ''' | ||||
| 
 | ||||
|     type: str | ||||
|     topic: str | ||||
|  | @ -169,10 +169,10 @@ class BrokerConfig(Struct, frozen=True): | |||
| def get_config() -> BrokerConfig | None: | ||||
|     conf, _ = config.load() | ||||
| 
 | ||||
|     section = conf.get("kucoin") | ||||
|     section = conf.get('kucoin') | ||||
| 
 | ||||
|     if section is None: | ||||
|         log.warning("No config section found for kucoin in config") | ||||
|         log.warning('No config section found for kucoin in config') | ||||
|         return None | ||||
| 
 | ||||
|     return BrokerConfig(**section).copy() | ||||
|  | @ -186,94 +186,94 @@ class Client: | |||
| 
 | ||||
|     def _gen_auth_req_headers( | ||||
|         self, | ||||
|         action: Literal["POST", "GET"], | ||||
|         action: Literal['POST', 'GET'], | ||||
|         endpoint: str, | ||||
|         api_v: str = "v2", | ||||
|         api_v: str = 'v2', | ||||
|     ) -> dict[str, str | bytes]: | ||||
|         """ | ||||
|         ''' | ||||
|         Generate authenticated request headers | ||||
|         https://docs.kucoin.com/#authentication | ||||
| 
 | ||||
|         """ | ||||
|         ''' | ||||
|         str_to_sign = ( | ||||
|             str(int(time.time() * 1000)) + action + f"/api/{api_v}{endpoint}" | ||||
|             str(int(time.time() * 1000)) + action + f'/api/{api_v}{endpoint}' | ||||
|         ) | ||||
| 
 | ||||
|         signature = base64.b64encode( | ||||
|             hmac.new( | ||||
|                 self._config.key_secret.encode("utf-8"), | ||||
|                 str_to_sign.encode("utf-8"), | ||||
|                 self._config.key_secret.encode('utf-8'), | ||||
|                 str_to_sign.encode('utf-8'), | ||||
|                 hashlib.sha256, | ||||
|             ).digest() | ||||
|         ) | ||||
| 
 | ||||
|         passphrase = base64.b64encode( | ||||
|             hmac.new( | ||||
|                 self._config.key_secret.encode("utf-8"), | ||||
|                 self._config.key_passphrase.encode("utf-8"), | ||||
|                 self._config.key_secret.encode('utf-8'), | ||||
|                 self._config.key_passphrase.encode('utf-8'), | ||||
|                 hashlib.sha256, | ||||
|             ).digest() | ||||
|         ) | ||||
| 
 | ||||
|         return { | ||||
|             "KC-API-SIGN": signature, | ||||
|             "KC-API-TIMESTAMP": str(pendulum.now().int_timestamp * 1000), | ||||
|             "KC-API-KEY": self._config.key_id, | ||||
|             "KC-API-PASSPHRASE": passphrase, | ||||
|             'KC-API-SIGN': signature, | ||||
|             'KC-API-TIMESTAMP': str(pendulum.now().int_timestamp * 1000), | ||||
|             'KC-API-KEY': self._config.key_id, | ||||
|             'KC-API-PASSPHRASE': passphrase, | ||||
|             # XXX: Even if using the v1 api - this stays the same | ||||
|             "KC-API-KEY-VERSION": "2", | ||||
|             'KC-API-KEY-VERSION': '2', | ||||
|         } | ||||
| 
 | ||||
|     async def _request( | ||||
|         self, | ||||
|         action: Literal["POST", "GET"], | ||||
|         action: Literal['POST', 'GET'], | ||||
|         endpoint: str, | ||||
|         api_v: str = "v2", | ||||
|         api_v: str = 'v2', | ||||
|         headers: dict = {}, | ||||
|     ) -> Any: | ||||
|         """ | ||||
|         ''' | ||||
|         Generic request wrapper for Kucoin API | ||||
| 
 | ||||
|         """ | ||||
|         ''' | ||||
|         if self._config: | ||||
|             headers = self._gen_auth_req_headers(action, endpoint, api_v) | ||||
| 
 | ||||
|         api_url = f"https://api.kucoin.com/api/{api_v}{endpoint}" | ||||
|         api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' | ||||
| 
 | ||||
|         res = await asks.request(action, api_url, headers=headers) | ||||
| 
 | ||||
|         if "data" in res.json(): | ||||
|             return res.json()["data"] | ||||
|         if 'data' in res.json(): | ||||
|             return res.json()['data'] | ||||
|         else: | ||||
|             log.error( | ||||
|                 f'Error making request to {api_url} -> {res.json()["msg"]}' | ||||
|             ) | ||||
|             return res.json()["msg"] | ||||
|             return res.json()['msg'] | ||||
| 
 | ||||
|     async def _get_ws_token( | ||||
|         self, | ||||
|         private: bool = False, | ||||
|     ) -> tuple[str, int] | None: | ||||
|         """ | ||||
|         ''' | ||||
|         Fetch ws token needed for sub access: | ||||
|         https://docs.kucoin.com/#apply-connect-token | ||||
|         returns a token and the interval we must ping | ||||
|         the server at to keep the connection alive | ||||
| 
 | ||||
|         """ | ||||
|         token_type = "private" if private else "public" | ||||
|         ''' | ||||
|         token_type = 'private' if private else 'public' | ||||
|         try: | ||||
|             data: dict[str, Any] | None = await self._request( | ||||
|                 "POST", f"/bullet-{token_type}", "v1" | ||||
|                 'POST', f'/bullet-{token_type}', 'v1' | ||||
|             ) | ||||
|         except Exception as e: | ||||
|             log.error(f"Error making request for Kucoin ws token -> {str(e)}") | ||||
|             log.error(f'Error making request for Kucoin ws token -> {str(e)}') | ||||
|             return None | ||||
| 
 | ||||
|         if data and "token" in data: | ||||
|         if data and 'token' in data: | ||||
|             # ping_interval is in ms | ||||
|             ping_interval: int = data["instanceServers"][0]["pingInterval"] | ||||
|             return data["token"], ping_interval | ||||
|             ping_interval: int = data['instanceServers'][0]['pingInterval'] | ||||
|             return data['token'], ping_interval | ||||
|         elif data: | ||||
|             log.error( | ||||
|                 'Error making request for Kucoin ws token' | ||||
|  | @ -283,22 +283,22 @@ class Client: | |||
|     async def _get_pairs( | ||||
|         self, | ||||
|     ) -> dict[str, KucoinMktPair]: | ||||
|         entries = await self._request("GET", "/symbols") | ||||
|         entries = await self._request('GET', '/symbols') | ||||
|         syms = { | ||||
|             kucoin_sym_to_fqsn(item["name"]): KucoinMktPair(**item) | ||||
|             kucoin_sym_to_fqsn(item['name']): KucoinMktPair(**item) | ||||
|             for item in entries | ||||
|         } | ||||
| 
 | ||||
|         log.info(f" {len(syms)} Kucoin market pairs fetched") | ||||
|         log.info(f' {len(syms)} Kucoin market pairs fetched') | ||||
|         return syms | ||||
| 
 | ||||
|     async def cache_pairs( | ||||
|         self, | ||||
|     ) -> dict[str, KucoinMktPair]: | ||||
|         """ | ||||
|         ''' | ||||
|         Get cached pairs and convert keyed symbols into fqsns if ya want | ||||
| 
 | ||||
|         """ | ||||
|         ''' | ||||
|         if not self._pairs: | ||||
|             self._pairs = await self._get_pairs() | ||||
| 
 | ||||
|  | @ -319,7 +319,7 @@ class Client: | |||
| 
 | ||||
|     async def last_trades(self, sym: str) -> list[AccountTrade]: | ||||
|         trades = await self._request( | ||||
|             "GET", f"/accounts/ledgers?currency={sym}", "v1" | ||||
|             'GET', f'/accounts/ledgers?currency={sym}', 'v1' | ||||
|         ) | ||||
|         trades = AccountResponse(**trades) | ||||
|         return trades.items | ||||
|  | @ -331,21 +331,21 @@ class Client: | |||
|         end_dt: datetime | None = None, | ||||
|         limit: int = 1000, | ||||
|         as_np: bool = True, | ||||
|         type: str = "1min", | ||||
|         type: str = '1min', | ||||
|     ) -> np.ndarray: | ||||
|         """ | ||||
|         ''' | ||||
|         Get OHLC data and convert to numpy array for perffff: | ||||
|         https://docs.kucoin.com/#get-klines | ||||
| 
 | ||||
|         Kucoin bar data format: | ||||
|         [ | ||||
|              "1545904980",             //Start time of the candle cycle 0 | ||||
|              "0.058",                  //opening price 1 | ||||
|              "0.049",                  //closing price 2 | ||||
|              "0.058",                  //highest price 3 | ||||
|              "0.049",                  //lowest price 4 | ||||
|              "0.018",                  //Transaction volume 5 | ||||
|              "0.000945"                //Transaction amount 6 | ||||
|              '1545904980',             //Start time of the candle cycle 0 | ||||
|              '0.058',                  //opening price 1 | ||||
|              '0.049',                  //closing price 2 | ||||
|              '0.058',                  //highest price 3 | ||||
|              '0.049',                  //lowest price 4 | ||||
|              '0.018',                  //Transaction volume 5 | ||||
|              '0.000945'                //Transaction amount 6 | ||||
|         ], | ||||
| 
 | ||||
|         piker ohlc numpy array format: | ||||
|  | @ -360,14 +360,14 @@ class Client: | |||
|             ('bar_wap', float),  # will be zeroed by sampler if not filled | ||||
|         ] | ||||
| 
 | ||||
|         """ | ||||
|         ''' | ||||
|         # Generate generic end and start time if values not passed | ||||
|         # Currently gives us 12hrs of data | ||||
|         if end_dt is None: | ||||
|             end_dt = pendulum.now("UTC").add(minutes=1) | ||||
|             end_dt = pendulum.now('UTC').add(minutes=1) | ||||
| 
 | ||||
|         if start_dt is None: | ||||
|             start_dt = end_dt.start_of("minute").subtract(minutes=limit) | ||||
|             start_dt = end_dt.start_of('minute').subtract(minutes=limit) | ||||
| 
 | ||||
|         start_dt = int(start_dt.timestamp()) | ||||
|         end_dt = int(end_dt.timestamp()) | ||||
|  | @ -375,24 +375,24 @@ class Client: | |||
|         kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) | ||||
| 
 | ||||
|         url = ( | ||||
|             f"/market/candles?type={type}" | ||||
|             f"&symbol={kucoin_sym}" | ||||
|             f"&startAt={start_dt}" | ||||
|             f"&endAt={end_dt}" | ||||
|             f'/market/candles?type={type}' | ||||
|             f'&symbol={kucoin_sym}' | ||||
|             f'&startAt={start_dt}' | ||||
|             f'&endAt={end_dt}' | ||||
|         ) | ||||
| 
 | ||||
|         for i in range(10): | ||||
|             data = await self._request( | ||||
|                 "GET", | ||||
|                 'GET', | ||||
|                 url, | ||||
|                 api_v="v1", | ||||
|                 api_v='v1', | ||||
|             ) | ||||
| 
 | ||||
|             if not isinstance(data, list): | ||||
|                 # Do a gradual backoff if Kucoin is rate limiting us | ||||
|                 backoff_interval = i | ||||
|                 log.warn( | ||||
|                     f"History call failed, backing off for {backoff_interval}s" | ||||
|                     f'History call failed, backing off for {backoff_interval}s' | ||||
|                 ) | ||||
|                 await trio.sleep(backoff_interval) | ||||
|             else: | ||||
|  | @ -431,11 +431,11 @@ class Client: | |||
| 
 | ||||
| def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str: | ||||
|     pair_data = pairs[fqsn] | ||||
|     return pair_data.baseCurrency + "-" + pair_data.quoteCurrency | ||||
|     return pair_data.baseCurrency + '-' + pair_data.quoteCurrency | ||||
| 
 | ||||
| 
 | ||||
| def kucoin_sym_to_fqsn(sym: str) -> str: | ||||
|     return sym.lower().replace("-", "") | ||||
|     return sym.lower().replace('-', '') | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  | @ -450,7 +450,7 @@ async def get_client() -> AsyncGenerator[Client, None]: | |||
| async def open_symbol_search( | ||||
|     ctx: tractor.Context, | ||||
| ) -> None: | ||||
|     async with open_cached_client("kucoin") as client: | ||||
|     async with open_cached_client('kucoin') as client: | ||||
|         # load all symbols locally for fast search | ||||
|         await client.cache_pairs() | ||||
|         await ctx.started() | ||||
|  | @ -458,25 +458,25 @@ async def open_symbol_search( | |||
|         async with ctx.open_stream() as stream: | ||||
|             async for pattern in stream: | ||||
|                 await stream.send(await client.search_symbols(pattern)) | ||||
|                 log.info("Kucoin symbol search opened") | ||||
|                 log.info('Kucoin symbol search opened') | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_ping_task(ws: wsproto.WSConnection, ping_interval, connect_id): | ||||
|     """ | ||||
|     ''' | ||||
|     Spawn a non-blocking task that pings the ws | ||||
|     server every ping_interval so Kucoin doesn't drop | ||||
|     our connection | ||||
| 
 | ||||
|     """ | ||||
|     ''' | ||||
|     async with trio.open_nursery() as n: | ||||
|         # TODO: cache this task so it's only called once | ||||
|         async def ping_server(): | ||||
|             while True: | ||||
|                 await trio.sleep((ping_interval - 1000) / 1000) | ||||
|                 await ws.send_msg({"id": connect_id, "type": "ping"}) | ||||
|                 await ws.send_msg({'id': connect_id, 'type': 'ping'}) | ||||
| 
 | ||||
|         log.info("Starting ping task for kucoin ws connection") | ||||
|         log.info('Starting ping task for kucoin ws connection') | ||||
|         n.start_soon(ping_server) | ||||
| 
 | ||||
|         yield | ||||
|  | @ -488,22 +488,22 @@ async def stream_quotes( | |||
|     send_chan: trio.abc.SendChannel, | ||||
|     symbols: list[str], | ||||
|     feed_is_live: trio.Event, | ||||
|     loglevel: str = "", | ||||
|     loglevel: str = '', | ||||
|     # startup sync | ||||
|     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, | ||||
| ) -> None: | ||||
|     """ | ||||
|     ''' | ||||
|     Required piker api to stream real-time data. | ||||
|     Where the rubber hits the road baby | ||||
| 
 | ||||
|     """ | ||||
|     async with open_cached_client("kucoin") as client: | ||||
|     ''' | ||||
|     async with open_cached_client('kucoin') as client: | ||||
|         token, ping_interval = await client._get_ws_token() | ||||
|         connect_id = str(uuid4()) | ||||
|         pairs = await client.cache_pairs() | ||||
|         ws_url = ( | ||||
|             f"wss://ws-api-spot.kucoin.com/?" | ||||
|             f"token={token}&[connectId={connect_id}]" | ||||
|             f'wss://ws-api-spot.kucoin.com/?' | ||||
|             f'token={token}&[connectId={connect_id}]' | ||||
|         ) | ||||
| 
 | ||||
|         # open ping task | ||||
|  | @ -511,7 +511,7 @@ async def stream_quotes( | |||
|             open_autorecon_ws(ws_url) as ws, | ||||
|             open_ping_task(ws, ping_interval, connect_id), | ||||
|         ): | ||||
|             log.info("Starting up quote stream") | ||||
|             log.info('Starting up quote stream') | ||||
|             # loop through symbols and sub to feedz | ||||
|             for sym in symbols: | ||||
|                 pair: KucoinMktPair = pairs[sym] | ||||
|  | @ -521,13 +521,13 @@ async def stream_quotes( | |||
|                     # pass back token, and bool, signalling if we're the writer | ||||
|                     # and that history has been written | ||||
|                     sym: { | ||||
|                         "symbol_info": { | ||||
|                             "asset_type": "crypto", | ||||
|                             "price_tick_size": float(pair.baseIncrement), | ||||
|                             "lot_tick_size": float(pair.baseMinSize), | ||||
|                         'symbol_info': { | ||||
|                             'asset_type': 'crypto', | ||||
|                             'price_tick_size': float(pair.baseIncrement), | ||||
|                             'lot_tick_size': float(pair.baseMinSize), | ||||
|                         }, | ||||
|                         "shm_write_opts": {"sum_tick_vml": False}, | ||||
|                         "fqsn": sym, | ||||
|                         'shm_write_opts': {'sum_tick_vml': False}, | ||||
|                         'fqsn': sym, | ||||
|                     } | ||||
|                 } | ||||
| 
 | ||||
|  | @ -536,7 +536,7 @@ async def stream_quotes( | |||
|                     stream_messages(ws, sym) as msg_gen, | ||||
|                 ): | ||||
|                     typ, quote = await anext(msg_gen) | ||||
|                     while typ != "trade": | ||||
|                     while typ != 'trade': | ||||
|                         # take care to not unblock here until we get a real | ||||
|                         # trade quote | ||||
|                         typ, quote = await anext(msg_gen) | ||||
|  | @ -553,22 +553,22 @@ async def subscribe(ws: wsproto.WSConnection, connect_id, sym): | |||
|     # level 2 sub | ||||
|     await ws.send_msg( | ||||
|         { | ||||
|             "id": connect_id, | ||||
|             "type": "subscribe", | ||||
|             "topic": f"/spotMarket/level2Depth5:{sym}", | ||||
|             "privateChannel": False, | ||||
|             "response": True, | ||||
|             'id': connect_id, | ||||
|             'type': 'subscribe', | ||||
|             'topic': f'/spotMarket/level2Depth5:{sym}', | ||||
|             'privateChannel': False, | ||||
|             'response': True, | ||||
|         } | ||||
|     ) | ||||
| 
 | ||||
|     # watch trades | ||||
|     await ws.send_msg( | ||||
|         { | ||||
|             "id": connect_id, | ||||
|             "type": "subscribe", | ||||
|             "topic": f"/market/ticker:{sym}", | ||||
|             "privateChannel": False, | ||||
|             "response": True, | ||||
|             'id': connect_id, | ||||
|             'type': 'subscribe', | ||||
|             'topic': f'/market/ticker:{sym}', | ||||
|             'privateChannel': False, | ||||
|             'response': True, | ||||
|         } | ||||
|     ) | ||||
| 
 | ||||
|  | @ -576,14 +576,14 @@ async def subscribe(ws: wsproto.WSConnection, connect_id, sym): | |||
| 
 | ||||
|     # unsub | ||||
|     if ws.connected(): | ||||
|         log.info(f"Unsubscribing to {sym} feed") | ||||
|         log.info(f'Unsubscribing to {sym} feed') | ||||
|         await ws.send_msg( | ||||
|             { | ||||
|                 "id": connect_id, | ||||
|                 "type": "unsubscribe", | ||||
|                 "topic": f"/market/ticker:{sym}", | ||||
|                 "privateChannel": False, | ||||
|                 "response": True, | ||||
|                 'id': connect_id, | ||||
|                 'type': 'unsubscribe', | ||||
|                 'topic': f'/market/ticker:{sym}', | ||||
|                 'privateChannel': False, | ||||
|                 'response': True, | ||||
|             } | ||||
|         ) | ||||
| 
 | ||||
|  | @ -601,15 +601,15 @@ async def stream_messages( | |||
|         if cs.cancelled_caught: | ||||
|             timeouts += 1 | ||||
|             if timeouts > 2: | ||||
|                 log.error("kucoin feed is sh**ing the bed... rebooting...") | ||||
|                 log.error('kucoin feed is sh**ing the bed... rebooting...') | ||||
|                 await ws._connect() | ||||
| 
 | ||||
|             continue | ||||
| 
 | ||||
|         if msg.get("subject"): | ||||
|         if msg.get('subject'): | ||||
|             msg = KucoinMsg(**msg) | ||||
|             match msg.subject: | ||||
|                 case "trade.ticker": | ||||
|                 case 'trade.ticker': | ||||
|                     trade_data = KucoinTrade(**msg.data) | ||||
| 
 | ||||
|                     # XXX: Filter for duplicate messages as ws feed will | ||||
|  | @ -620,46 +620,46 @@ async def stream_messages( | |||
| 
 | ||||
|                     last_trade_ts = trade_data.time | ||||
| 
 | ||||
|                     yield "trade", { | ||||
|                         "symbol": sym, | ||||
|                         "last": trade_data.price, | ||||
|                         "brokerd_ts": last_trade_ts, | ||||
|                         "ticks": [ | ||||
|                     yield 'trade', { | ||||
|                         'symbol': sym, | ||||
|                         'last': trade_data.price, | ||||
|                         'brokerd_ts': last_trade_ts, | ||||
|                         'ticks': [ | ||||
|                             { | ||||
|                                 "type": "trade", | ||||
|                                 "price": float(trade_data.price), | ||||
|                                 "size": float(trade_data.size), | ||||
|                                 "broker_ts": last_trade_ts, | ||||
|                                 'type': 'trade', | ||||
|                                 'price': float(trade_data.price), | ||||
|                                 'size': float(trade_data.size), | ||||
|                                 'broker_ts': last_trade_ts, | ||||
|                             } | ||||
|                         ], | ||||
|                     } | ||||
| 
 | ||||
|                 case "level2": | ||||
|                 case 'level2': | ||||
|                     l2_data = KucoinL2(**msg.data) | ||||
|                     first_ask = l2_data.asks[0] | ||||
|                     first_bid = l2_data.bids[0] | ||||
|                     yield "l1", { | ||||
|                         "symbol": sym, | ||||
|                         "ticks": [ | ||||
|                     yield 'l1', { | ||||
|                         'symbol': sym, | ||||
|                         'ticks': [ | ||||
|                             { | ||||
|                                 "type": "bid", | ||||
|                                 "price": float(first_bid[0]), | ||||
|                                 "size": float(first_bid[1]), | ||||
|                                 'type': 'bid', | ||||
|                                 'price': float(first_bid[0]), | ||||
|                                 'size': float(first_bid[1]), | ||||
|                             }, | ||||
|                             { | ||||
|                                 "type": "bsize", | ||||
|                                 "price": float(first_bid[0]), | ||||
|                                 "size": float(first_bid[1]), | ||||
|                                 'type': 'bsize', | ||||
|                                 'price': float(first_bid[0]), | ||||
|                                 'size': float(first_bid[1]), | ||||
|                             }, | ||||
|                             { | ||||
|                                 "type": "ask", | ||||
|                                 "price": float(first_ask[0]), | ||||
|                                 "size": float(first_ask[1]), | ||||
|                                 'type': 'ask', | ||||
|                                 'price': float(first_ask[0]), | ||||
|                                 'size': float(first_ask[1]), | ||||
|                             }, | ||||
|                             { | ||||
|                                 "type": "asize", | ||||
|                                 "price": float(first_ask[0]), | ||||
|                                 "size": float(first_ask[1]), | ||||
|                                 'type': 'asize', | ||||
|                                 'price': float(first_ask[0]), | ||||
|                                 'size': float(first_ask[1]), | ||||
|                             }, | ||||
|                         ], | ||||
|                     } | ||||
|  | @ -668,10 +668,10 @@ async def stream_messages( | |||
| @acm | ||||
| async def open_history_client( | ||||
|     symbol: str, | ||||
|     type: str = "1m", | ||||
|     type: str = '1m', | ||||
| ) -> AsyncGenerator[Callable, None]: | ||||
|     async with open_cached_client("kucoin") as client: | ||||
|         log.info("Attempting to open kucoin history client") | ||||
|     async with open_cached_client('kucoin') as client: | ||||
|         log.info('Attempting to open kucoin history client') | ||||
| 
 | ||||
|         async def get_ohlc_history( | ||||
|             timeframe: float, | ||||
|  | @ -681,7 +681,7 @@ async def open_history_client( | |||
|             np.ndarray, datetime | None, datetime | None | ||||
|         ]:  # start  # end | ||||
|             if timeframe != 60: | ||||
|                 raise DataUnavailable("Only 1m bars are supported") | ||||
|                 raise DataUnavailable('Only 1m bars are supported') | ||||
| 
 | ||||
|             array = await client._get_bars( | ||||
|                 symbol, | ||||
|  | @ -689,14 +689,14 @@ async def open_history_client( | |||
|                 end_dt=end_dt, | ||||
|             ) | ||||
| 
 | ||||
|             times = array["time"] | ||||
|             times = array['time'] | ||||
| 
 | ||||
|             if end_dt is None: | ||||
|                 inow = round(time.time()) | ||||
| 
 | ||||
|                 print( | ||||
|                     f"difference in time between load and processing" | ||||
|                     f"{inow - times[-1]}" | ||||
|                     f'difference in time between load and processing' | ||||
|                     f'{inow - times[-1]}' | ||||
|                 ) | ||||
| 
 | ||||
|                 if (inow - times[-1]) > 60: | ||||
|  | @ -705,7 +705,7 @@ async def open_history_client( | |||
|             start_dt = pendulum.from_timestamp(times[0]) | ||||
|             end_dt = pendulum.from_timestamp(times[-1]) | ||||
| 
 | ||||
|             log.info("History succesfully fetched baby") | ||||
|             log.info('History succesfully fetched baby') | ||||
| 
 | ||||
|             return array, start_dt, end_dt | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue