More linting fixes

kucoin_backend
jaredgoldman 2023-04-18 10:39:47 -04:00
parent 37ce04ca9a
commit 9fcfb8d780
1 changed files with 194 additions and 180 deletions

View File

@ -11,20 +11,14 @@
# GNU Affero General Public License for more details. # GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see # along with this program. If not, see <https://www.gnu.org/licenses/>.
# <https://www.gnu.org/licenses/>.
''' """
Kucoin broker backend Kucoin broker backend
''' """
from typing import ( from typing import Any, Callable, Literal, AsyncGenerator
Any,
Callable,
Literal,
AsyncGenerator
)
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
import time import time
@ -56,23 +50,24 @@ from ..data._web_bs import (
log = get_logger(__name__) log = get_logger(__name__)
_ohlc_dtype = [ _ohlc_dtype = [
('index', int), ("index", int),
('time', int), ("time", int),
('open', float), ("open", float),
('high', float), ("high", float),
('low', float), ("low", float),
('close', float), ("close", float),
('volume', float), ("volume", float),
('bar_wap', float), # will be zeroed by sampler if not filled ("bar_wap", float), # will be zeroed by sampler if not filled
] ]
class KucoinMktPair(Struct, frozen=True): class KucoinMktPair(Struct, frozen=True):
''' """
Kucoin's pair format: Kucoin's pair format:
https://docs.kucoin.com/#get-symbols-list https://docs.kucoin.com/#get-symbols-list
''' """
baseCurrency: str baseCurrency: str
baseIncrement: float baseIncrement: float
baseMaxSize: float baseMaxSize: float
@ -93,11 +88,12 @@ class KucoinMktPair(Struct, frozen=True):
class AccountTrade(Struct, frozen=True): class AccountTrade(Struct, frozen=True):
''' """
Historical trade format: Historical trade format:
https://docs.kucoin.com/#get-account-ledgers https://docs.kucoin.com/#get-account-ledgers
''' """
id: str id: str
currency: str currency: str
amount: float amount: float
@ -105,16 +101,17 @@ class AccountTrade(Struct, frozen=True):
balance: float balance: float
accountType: str accountType: str
bizType: str bizType: str
direction: Literal['in', 'out'] direction: Literal["in", "out"]
createdAt: float createdAt: float
context: list[str] context: list[str]
class AccountResponse(Struct, frozen=True): class AccountResponse(Struct, frozen=True):
''' """
https://docs.kucoin.com/#get-account-ledgers https://docs.kucoin.com/#get-account-ledgers
''' """
currentPage: int currentPage: int
pageSize: int pageSize: int
totalNum: int totalNum: int
@ -123,11 +120,12 @@ class AccountResponse(Struct, frozen=True):
class KucoinTrade(Struct, frozen=True): class KucoinTrade(Struct, frozen=True):
''' """
Real-time trade format: Real-time trade format:
https://docs.kucoin.com/#symbol-ticker https://docs.kucoin.com/#symbol-ticker
''' """
bestAsk: float bestAsk: float
bestAskSize: float bestAskSize: float
bestBid: float bestBid: float
@ -139,21 +137,23 @@ class KucoinTrade(Struct, frozen=True):
class KucoinL2(Struct, frozen=True): class KucoinL2(Struct, frozen=True):
''' """
Real-time L2 order book format: Real-time L2 order book format:
https://docs.kucoin.com/#level2-5-best-ask-bid-orders https://docs.kucoin.com/#level2-5-best-ask-bid-orders
''' """
asks: list[list[float]] asks: list[list[float]]
bids: list[list[float]] bids: list[list[float]]
timestamp: float timestamp: float
class KucoinMsg(Struct, frozen=True): class KucoinMsg(Struct, frozen=True):
''' """
Generic outer-wrapper for any Kucoin ws msg Generic outer-wrapper for any Kucoin ws msg
''' """
type: str type: str
topic: str topic: str
subject: str subject: str
@ -169,10 +169,10 @@ class BrokerConfig(Struct, frozen=True):
def get_config() -> BrokerConfig | None: def get_config() -> BrokerConfig | None:
conf, _ = config.load() conf, _ = config.load()
section = conf.get('kucoin') section = conf.get("kucoin")
if section is None: if section is None:
log.warning('No config section found for kucoin in config') log.warning("No config section found for kucoin in config")
return None return None
return BrokerConfig(**section).copy() return BrokerConfig(**section).copy()
@ -186,118 +186,119 @@ class Client:
def _gen_auth_req_headers( def _gen_auth_req_headers(
self, self,
action: Literal['POST', 'GET'], action: Literal["POST", "GET"],
endpoint: str, endpoint: str,
api_v: str = 'v2', api_v: str = "v2",
) -> dict[str, str | bytes]: ) -> dict[str, str | bytes]:
''' """
Generate authenticated request headers Generate authenticated request headers
https://docs.kucoin.com/#authentication https://docs.kucoin.com/#authentication
''' """
str_to_sign = str(int(time.time() * 1000)) + \ str_to_sign = (
action + f'/api/{api_v}{endpoint}' str(int(time.time() * 1000)) + action + f"/api/{api_v}{endpoint}"
)
signature = base64.b64encode( signature = base64.b64encode(
hmac.new( hmac.new(
self._config.key_secret.encode('utf-8'), self._config.key_secret.encode("utf-8"),
str_to_sign.encode('utf-8'), str_to_sign.encode("utf-8"),
hashlib.sha256, hashlib.sha256,
).digest() ).digest()
) )
passphrase = base64.b64encode( passphrase = base64.b64encode(
hmac.new( hmac.new(
self._config.key_secret.encode('utf-8'), self._config.key_secret.encode("utf-8"),
self._config.key_passphrase.encode('utf-8'), self._config.key_passphrase.encode("utf-8"),
hashlib.sha256, hashlib.sha256,
).digest() ).digest()
) )
return { return {
'KC-API-SIGN': signature, "KC-API-SIGN": signature,
'KC-API-TIMESTAMP': str(pendulum.now().int_timestamp * 1000), "KC-API-TIMESTAMP": str(pendulum.now().int_timestamp * 1000),
'KC-API-KEY': self._config.key_id, "KC-API-KEY": self._config.key_id,
'KC-API-PASSPHRASE': passphrase, "KC-API-PASSPHRASE": passphrase,
# XXX: Even if using the v1 api - this stays the same # XXX: Even if using the v1 api - this stays the same
'KC-API-KEY-VERSION': '2', "KC-API-KEY-VERSION": "2",
} }
async def _request( async def _request(
self, self,
action: Literal['POST', 'GET'], action: Literal["POST", "GET"],
endpoint: str, endpoint: str,
api_v: str = 'v2', api_v: str = "v2",
headers: dict = {}, headers: dict = {},
) -> Any: ) -> Any:
''' """
Generic request wrapper for Kucoin API Generic request wrapper for Kucoin API
''' """
if self._config: if self._config:
headers = self._gen_auth_req_headers( headers = self._gen_auth_req_headers(action, endpoint, api_v)
action, endpoint, api_v)
api_url = f'https://api.kucoin.com/api/{api_v}{endpoint}' api_url = f"https://api.kucoin.com/api/{api_v}{endpoint}"
res = await asks.request(action, api_url, headers=headers) res = await asks.request(action, api_url, headers=headers)
if 'data' in res.json(): if "data" in res.json():
return res.json()['data'] return res.json()["data"]
else: else:
log.error( log.error(
f'Error making request to {api_url} -> {res.json()["msg"]}') f'Error making request to {api_url} -> {res.json()["msg"]}'
return res.json()['msg'] )
return res.json()["msg"]
async def _get_ws_token( async def _get_ws_token(
self, self,
private: bool = False, private: bool = False,
) -> tuple[str, int] | None: ) -> tuple[str, int] | None:
''' """
Fetch ws token needed for sub access: Fetch ws token needed for sub access:
https://docs.kucoin.com/#apply-connect-token https://docs.kucoin.com/#apply-connect-token
returns a token and the interval we must ping returns a token and the interval we must ping
the server at to keep the connection alive the server at to keep the connection alive
''' """
token_type = 'private' if private else 'public' token_type = "private" if private else "public"
try: try:
data: dict[str, Any] | None = await self._request( data: dict[str, Any] | None = await self._request(
'POST', f'/bullet-{token_type}', 'v1' "POST", f"/bullet-{token_type}", "v1"
) )
except Exception as e: except Exception as e:
log.error( log.error(f"Error making request for Kucoin ws token -> {str(e)}")
f'Error making request for Kucoin ws token -> {str(e)}')
return None return None
if data and 'token' in data: if data and "token" in data:
# ping_interval is in ms # ping_interval is in ms
ping_interval: int = data['instanceServers'][0]['pingInterval'] ping_interval: int = data["instanceServers"][0]["pingInterval"]
return data['token'], ping_interval return data["token"], ping_interval
elif data: elif data:
log.error( log.error(
f'Error making request for Kucoin ws token -> {data.json()["msg"]}' 'Error making request for Kucoin ws token'
f'{data.json()["msg"]}'
) )
async def _get_pairs( async def _get_pairs(
self, self,
) -> dict[str, KucoinMktPair]: ) -> dict[str, KucoinMktPair]:
entries = await self._request('GET', '/symbols') entries = await self._request("GET", "/symbols")
syms = { syms = {
kucoin_sym_to_fqsn( kucoin_sym_to_fqsn(item["name"]): KucoinMktPair(**item)
item['name']): KucoinMktPair( for item in entries
**item) for item in entries} }
log.info(f' {len(syms)} Kucoin market pairs fetched') log.info(f" {len(syms)} Kucoin market pairs fetched")
return syms return syms
async def cache_pairs( async def cache_pairs(
self, self,
) -> dict[str, KucoinMktPair]: ) -> dict[str, KucoinMktPair]:
''' """
Get cached pairs and convert keyed symbols into fqsns if ya want Get cached pairs and convert keyed symbols into fqsns if ya want
''' """
if not self._pairs: if not self._pairs:
self._pairs = await self._get_pairs() self._pairs = await self._get_pairs()
@ -311,12 +312,15 @@ class Client:
data = await self.cache_pairs() data = await self.cache_pairs()
matches = fuzzy.extractBests( matches = fuzzy.extractBests(
pattern, data, score_cutoff=35, limit=limit) pattern, data, score_cutoff=35, limit=limit
)
# repack in dict form # repack in dict form
return {item[0].name: item[0] for item in matches} return {item[0].name: item[0] for item in matches}
async def last_trades(self, sym: str) -> list[AccountTrade]: async def last_trades(self, sym: str) -> list[AccountTrade]:
trades = await self._request('GET', f'/accounts/ledgers?currency={sym}', 'v1') trades = await self._request(
"GET", f"/accounts/ledgers?currency={sym}", "v1"
)
trades = AccountResponse(**trades) trades = AccountResponse(**trades)
return trades.items return trades.items
@ -327,9 +331,9 @@ class Client:
end_dt: datetime | None = None, end_dt: datetime | None = None,
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
type: str = '1min', type: str = "1min",
) -> np.ndarray: ) -> np.ndarray:
''' """
Get OHLC data and convert to numpy array for perffff: Get OHLC data and convert to numpy array for perffff:
https://docs.kucoin.com/#get-klines https://docs.kucoin.com/#get-klines
@ -356,35 +360,40 @@ class Client:
('bar_wap', float), # will be zeroed by sampler if not filled ('bar_wap', float), # will be zeroed by sampler if not filled
] ]
''' """
# Generate generic end and start time if values not passed # Generate generic end and start time if values not passed
# Currently gives us 12hrs of data # Currently gives us 12hrs of data
if end_dt is None: if end_dt is None:
end_dt = pendulum.now('UTC').add(minutes=1) end_dt = pendulum.now("UTC").add(minutes=1)
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of("minute").subtract(minutes=limit)
'minute').subtract(minutes=limit)
start_dt = int(start_dt.timestamp()) start_dt = int(start_dt.timestamp())
end_dt = int(end_dt.timestamp()) end_dt = int(end_dt.timestamp())
kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs) kucoin_sym = fqsn_to_kucoin_sym(fqsn, self._pairs)
url = f'/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}' url = (
f"/market/candles?type={type}"
f"&symbol={kucoin_sym}"
f"&startAt={start_dt}"
f"&endAt={end_dt}"
)
for i in range(10): for i in range(10):
data = await self._request( data = await self._request(
'GET', "GET",
url, url,
api_v='v1', api_v="v1",
) )
if not isinstance(data, list): if not isinstance(data, list):
# Do a gradual backoff if Kucoin is rate limiting us # Do a gradual backoff if Kucoin is rate limiting us
backoff_interval = i backoff_interval = i
log.warn( log.warn(
f'History call failed, backing off for {backoff_interval}s') f"History call failed, backing off for {backoff_interval}s"
)
await trio.sleep(backoff_interval) await trio.sleep(backoff_interval)
else: else:
bars: list[list[str]] = data bars: list[list[str]] = data
@ -416,20 +425,17 @@ class Client:
) )
) )
array = np.array( array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
new_bars,
dtype=_ohlc_dtype) if as_np else bars
return array return array
def fqsn_to_kucoin_sym( def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str:
fqsn: str, pairs: dict[str, KucoinMktPair]) -> str:
pair_data = pairs[fqsn] pair_data = pairs[fqsn]
return pair_data.baseCurrency + '-' + pair_data.quoteCurrency return pair_data.baseCurrency + "-" + pair_data.quoteCurrency
def kucoin_sym_to_fqsn(sym: str) -> str: def kucoin_sym_to_fqsn(sym: str) -> str:
return sym.lower().replace('-', '') return sym.lower().replace("-", "")
@acm @acm
@ -444,7 +450,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
async def open_symbol_search( async def open_symbol_search(
ctx: tractor.Context, ctx: tractor.Context,
) -> None: ) -> None:
async with open_cached_client('kucoin') as client: async with open_cached_client("kucoin") as client:
# load all symbols locally for fast search # load all symbols locally for fast search
await client.cache_pairs() await client.cache_pairs()
await ctx.started() await ctx.started()
@ -452,25 +458,25 @@ async def open_symbol_search(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for pattern in stream: async for pattern in stream:
await stream.send(await client.search_symbols(pattern)) await stream.send(await client.search_symbols(pattern))
log.info('Kucoin symbol search opened') log.info("Kucoin symbol search opened")
@acm @acm
async def open_ping_task(ws: wsproto.WSConnection, ping_interval, connect_id): async def open_ping_task(ws: wsproto.WSConnection, ping_interval, connect_id):
''' """
Spawn a non-blocking task that pings the ws Spawn a non-blocking task that pings the ws
server every ping_interval so Kucoin doesn't drop server every ping_interval so Kucoin doesn't drop
our connection our connection
''' """
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# TODO: cache this task so it's only called once # TODO: cache this task so it's only called once
async def ping_server(): async def ping_server():
while True: while True:
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({"id": connect_id, "type": "ping"})
log.info(f'Starting ping task for kucoin ws connection') log.info("Starting ping task for kucoin ws connection")
n.start_soon(ping_server) n.start_soon(ping_server)
yield yield
@ -482,29 +488,30 @@ async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = '', loglevel: str = "",
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict] task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' """
Required piker api to stream real-time data. Required piker api to stream real-time data.
Where the rubber hits the road baby Where the rubber hits the road baby
''' """
async with open_cached_client('kucoin') as client: async with open_cached_client("kucoin") as client:
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
connect_id = str(uuid4()) connect_id = str(uuid4())
pairs = await client.cache_pairs() pairs = await client.cache_pairs()
ws_url = (
f"wss://ws-api-spot.kucoin.com/?"
f"token={token}&[connectId={connect_id}]"
)
# open ping task # open ping task
async with ( async with (
open_autorecon_ws( open_autorecon_ws(ws_url) as ws,
f'wss://ws-api-spot.kucoin.com/?token={token}&[connectId={connect_id}]'
) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
): ):
log.info('Starting up quote stream') log.info("Starting up quote stream")
# loop through symbols and sub to feedz # loop through symbols and sub to feedz
for sym in symbols: for sym in symbols:
pair: KucoinMktPair = pairs[sym] pair: KucoinMktPair = pairs[sym]
@ -514,13 +521,13 @@ async def stream_quotes(
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
sym: { sym: {
'symbol_info': { "symbol_info": {
'asset_type': 'crypto', "asset_type": "crypto",
'price_tick_size': float(pair.baseIncrement), "price_tick_size": float(pair.baseIncrement),
'lot_tick_size': float(pair.baseMinSize), "lot_tick_size": float(pair.baseMinSize),
}, },
'shm_write_opts': {'sum_tick_vml': False}, "shm_write_opts": {"sum_tick_vml": False},
'fqsn': sym, "fqsn": sym,
} }
} }
@ -529,7 +536,7 @@ async def stream_quotes(
stream_messages(ws, sym) as msg_gen, stream_messages(ws, sym) as msg_gen,
): ):
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)
while typ != 'trade': while typ != "trade":
# take care to not unblock here until we get a real # take care to not unblock here until we get a real
# trade quote # trade quote
typ, quote = await anext(msg_gen) typ, quote = await anext(msg_gen)
@ -544,41 +551,47 @@ async def stream_quotes(
@acm @acm
async def subscribe(ws: wsproto.WSConnection, connect_id, sym): async def subscribe(ws: wsproto.WSConnection, connect_id, sym):
# level 2 sub # level 2 sub
await ws.send_msg({ await ws.send_msg(
'id': connect_id, {
'type': 'subscribe', "id": connect_id,
'topic': f'/spotMarket/level2Depth5:{sym}', "type": "subscribe",
'privateChannel': False, "topic": f"/spotMarket/level2Depth5:{sym}",
'response': True, "privateChannel": False,
}) "response": True,
}
)
# watch trades # watch trades
await ws.send_msg({ await ws.send_msg(
'id': connect_id, {
'type': 'subscribe', "id": connect_id,
'topic': f'/market/ticker:{sym}', "type": "subscribe",
'privateChannel': False, "topic": f"/market/ticker:{sym}",
'response': True, "privateChannel": False,
}) "response": True,
}
)
yield yield
# unsub # unsub
if ws.connected(): if ws.connected():
log.info(f'Unsubscribing to {sym} feed') log.info(f"Unsubscribing to {sym} feed")
await ws.send_msg( await ws.send_msg(
{ {
'id': connect_id, "id": connect_id,
'type': 'unsubscribe', "type": "unsubscribe",
'topic': f'/market/ticker:{sym}', "topic": f"/market/ticker:{sym}",
'privateChannel': False, "privateChannel": False,
'response': True, "response": True,
} }
) )
@trio_async_generator @trio_async_generator
async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]: async def stream_messages(
ws: NoBsWs, sym: str
) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0 timeouts = 0
last_trade_ts = 0 last_trade_ts = 0
@ -588,65 +601,65 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
if cs.cancelled_caught: if cs.cancelled_caught:
timeouts += 1 timeouts += 1
if timeouts > 2: if timeouts > 2:
log.error( log.error("kucoin feed is sh**ing the bed... rebooting...")
'kucoin feed is sh**ing the bed... rebooting...')
await ws._connect() await ws._connect()
continue continue
if msg.get('subject'): if msg.get("subject"):
msg = KucoinMsg(**msg) msg = KucoinMsg(**msg)
match msg.subject: match msg.subject:
case 'trade.ticker': case "trade.ticker":
trade_data = KucoinTrade(**msg.data) trade_data = KucoinTrade(**msg.data)
# XXX: Filter for duplicate messages as ws feed will send duplicate market state # XXX: Filter for duplicate messages as ws feed will
# send duplicate market state
# https://docs.kucoin.com/#level2-5-best-ask-bid-orders # https://docs.kucoin.com/#level2-5-best-ask-bid-orders
if trade_data.time == last_trade_ts: if trade_data.time == last_trade_ts:
continue continue
last_trade_ts = trade_data.time last_trade_ts = trade_data.time
yield 'trade', { yield "trade", {
'symbol': sym, "symbol": sym,
'last': trade_data.price, "last": trade_data.price,
'brokerd_ts': last_trade_ts, "brokerd_ts": last_trade_ts,
'ticks': [ "ticks": [
{ {
'type': 'trade', "type": "trade",
'price': float(trade_data.price), "price": float(trade_data.price),
'size': float(trade_data.size), "size": float(trade_data.size),
'broker_ts': last_trade_ts, "broker_ts": last_trade_ts,
} }
], ],
} }
case 'level2': case "level2":
l2_data = KucoinL2(**msg.data) l2_data = KucoinL2(**msg.data)
first_ask = l2_data.asks[0] first_ask = l2_data.asks[0]
first_bid = l2_data.bids[0] first_bid = l2_data.bids[0]
yield 'l1', { yield "l1", {
'symbol': sym, "symbol": sym,
'ticks': [ "ticks": [
{ {
'type': 'bid', "type": "bid",
'price': float(first_bid[0]), "price": float(first_bid[0]),
'size': float(first_bid[1]), "size": float(first_bid[1]),
}, },
{ {
'type': 'bsize', "type": "bsize",
'price': float(first_bid[0]), "price": float(first_bid[0]),
'size': float(first_bid[1]), "size": float(first_bid[1]),
}, },
{ {
'type': 'ask', "type": "ask",
'price': float(first_ask[0]), "price": float(first_ask[0]),
'size': float(first_ask[1]), "size": float(first_ask[1]),
}, },
{ {
'type': 'asize', "type": "asize",
'price': float(first_ask[0]), "price": float(first_ask[0]),
'size': float(first_ask[1]), "size": float(first_ask[1]),
}, },
], ],
} }
@ -655,20 +668,20 @@ async def stream_messages(ws: NoBsWs, sym: str) -> AsyncGenerator[NoBsWs, dict]:
@acm @acm
async def open_history_client( async def open_history_client(
symbol: str, symbol: str,
type: str = '1m', type: str = "1m",
) -> AsyncGenerator[Callable, None]: ) -> AsyncGenerator[Callable, None]:
async with open_cached_client('kucoin') as client: async with open_cached_client("kucoin") as client:
log.info("Attempting to open kucoin history client")
log.info('Attempting to open kucoin history client')
async def get_ohlc_history( async def get_ohlc_history(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
start_dt: datetime | None = None, start_dt: datetime | None = None,
) -> tuple[np.ndarray, datetime | None, datetime | None]: # start # end ) -> tuple[
np.ndarray, datetime | None, datetime | None
]: # start # end
if timeframe != 60: if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported') raise DataUnavailable("Only 1m bars are supported")
array = await client._get_bars( array = await client._get_bars(
symbol, symbol,
@ -676,13 +689,14 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
times = array['time'] times = array["time"]
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
print( print(
f'difference in time between load and processing {inow - times[-1]}' f"difference in time between load and processing"
f"{inow - times[-1]}"
) )
if (inow - times[-1]) > 60: if (inow - times[-1]) > 60:
@ -691,7 +705,7 @@ async def open_history_client(
start_dt = pendulum.from_timestamp(times[0]) start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1]) end_dt = pendulum.from_timestamp(times[-1])
log.info('History succesfully fetched baby') log.info("History succesfully fetched baby")
return array, start_dt, end_dt return array, start_dt, end_dt