Implement `open_history_client()` correctly for `kraken`

m4_corrections
Tyler Goodlet 2022-04-29 11:26:15 -04:00
parent b8704e1b7f
commit 260b632f07
1 changed files with 72 additions and 9 deletions

View File

@ -20,7 +20,8 @@ Kraken backend.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field from dataclasses import asdict, field
from typing import Any, Optional, AsyncIterator, Callable from datetime import datetime
from typing import Any, Optional, AsyncIterator, Callable, Union
import time import time
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -40,7 +41,13 @@ import base64
from .. import config from .. import config
from .._cacheables import open_cached_client from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import (
resproc,
SymbolNotFound,
BrokerError,
DataThrottle,
DataUnavailable,
)
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ShmArray from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs from ..data._web_bs import open_autorecon_ws, NoBsWs
@ -391,17 +398,26 @@ class Client:
async def bars( async def bars(
self, self,
symbol: str = 'XBTUSD', symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20 # UTC 2017-07-02 12:53:20
since: int = None, since: Optional[Union[int, datetime]] = None,
count: int = 720, # <- max allowed per query count: int = 720, # <- max allowed per query
as_np: bool = True, as_np: bool = True,
) -> dict: ) -> dict:
if since is None: if since is None:
since = pendulum.now('UTC').start_of('minute').subtract( since = pendulum.now('UTC').start_of('minute').subtract(
minutes=count).timestamp() minutes=count).timestamp()
elif isinstance(since, int):
since = pendulum.from_timestamp(since).timestamp()
else: # presumably a pendulum datetime
since = since.timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value # UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, since)) since = str(max(1499000000, int(since)))
json = await self._public( json = await self._public(
'OHLC', 'OHLC',
data={ data={
@ -445,7 +461,16 @@ class Client:
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array return array
except KeyError: except KeyError:
raise SymbolNotFound(json['error'][0] + f': {symbol}') errmsg = json['error'][0]
if 'not found' in errmsg:
raise SymbolNotFound(errmsg + f': {symbol}')
elif 'Too many requests' in errmsg:
raise DataThrottle(f'{symbol}')
else:
raise BrokerError(errmsg)
@acm @acm
@ -668,8 +693,8 @@ async def handle_order_requests(
oid=msg.oid, oid=msg.oid,
reqid=msg.reqid, reqid=msg.reqid,
symbol=msg.symbol, symbol=msg.symbol,
# TODO: maybe figure out if pending cancels will # TODO: maybe figure out if pending
# eventually get cancelled # cancels will eventually get cancelled
reason="Order cancel is still pending?", reason="Order cancel is still pending?",
broker_details=resp broker_details=resp
).dict() ).dict()
@ -1003,7 +1028,45 @@ async def open_history_client(
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client: async with open_cached_client('kraken') as client:
yield client
# lol, kraken won't send any more then the "last"
# 720 1m bars.. so we have to just ignore further
# requests of this type..
queries: int = 0
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
nonlocal queries
if queries > 0:
raise DataUnavailable
count = 0
while count <= 3:
try:
array = await client.bars(
symbol,
since=end_dt,
)
count += 1
queries += 1
break
except DataThrottle:
log.warning(f'kraken OHLC throttle for {symbol}')
await trio.sleep(1)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc
async def backfill_bars( async def backfill_bars(