Implement `open_history_client()` correctly for `kraken`

incr_update_backup
Tyler Goodlet 2022-04-29 11:26:15 -04:00
parent 8110c4c70d
commit 8e5f5b6be6
1 changed files with 72 additions and 9 deletions

View File

@ -20,7 +20,8 @@ Kraken backend.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field
from typing import Any, Optional, AsyncIterator, Callable
from datetime import datetime
from typing import Any, Optional, AsyncIterator, Callable, Union
import time
from trio_typing import TaskStatus
@ -40,7 +41,13 @@ import base64
from .. import config
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ._util import (
resproc,
SymbolNotFound,
BrokerError,
DataThrottle,
DataUnavailable,
)
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs
@ -305,7 +312,7 @@ class Client:
action: str,
size: float,
reqid: str = None,
validate: bool = False # set True test call without a real submission
validate: bool = False # set True test call without a real submission
) -> dict:
'''
Place an order and return integer request id provided by client.
@ -391,17 +398,26 @@ class Client:
async def bars(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: int = None,
since: Optional[Union[int, datetime]] = None,
count: int = 720, # <- max allowed per query
as_np: bool = True,
) -> dict:
if since is None:
since = pendulum.now('UTC').start_of('minute').subtract(
minutes=count).timestamp()
elif isinstance(since, int):
since = pendulum.from_timestamp(since).timestamp()
else: # presumably a pendulum datetime
since = since.timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, since))
since = str(max(1499000000, int(since)))
json = await self._public(
'OHLC',
data={
@ -445,7 +461,16 @@ class Client:
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
except KeyError:
raise SymbolNotFound(json['error'][0] + f': {symbol}')
errmsg = json['error'][0]
if 'not found' in errmsg:
raise SymbolNotFound(errmsg + f': {symbol}')
elif 'Too many requests' in errmsg:
raise DataThrottle(f'{symbol}')
else:
raise BrokerError(errmsg)
@acm
@ -668,8 +693,8 @@ async def handle_order_requests(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
# TODO: maybe figure out if pending cancels will
# eventually get cancelled
# TODO: maybe figure out if pending
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
@ -1003,7 +1028,45 @@ async def open_history_client(
# TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client:
yield client
# lol, kraken won't send any more then the "last"
# 720 1m bars.. so we have to just ignore further
# requests of this type..
queries: int = 0
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
nonlocal queries
if queries > 0:
raise DataUnavailable
count = 0
while count <= 3:
try:
array = await client.bars(
symbol,
since=end_dt,
)
count += 1
queries += 1
break
except DataThrottle:
log.warning(f'kraken OHLC throttle for {symbol}')
await trio.sleep(1)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc
async def backfill_bars(