diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 44e4e6b0..0f5e2f2a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -20,7 +20,8 @@ Kraken backend. ''' from contextlib import asynccontextmanager as acm from dataclasses import asdict, field -from typing import Any, Optional, AsyncIterator, Callable +from datetime import datetime +from typing import Any, Optional, AsyncIterator, Callable, Union import time from trio_typing import TaskStatus @@ -40,7 +41,13 @@ import base64 from .. import config from .._cacheables import open_cached_client -from ._util import resproc, SymbolNotFound, BrokerError +from ._util import ( + resproc, + SymbolNotFound, + BrokerError, + DataThrottle, + DataUnavailable, +) from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws, NoBsWs @@ -305,7 +312,7 @@ class Client: action: str, size: float, reqid: str = None, - validate: bool = False # set True test call without a real submission + validate: bool = False # set True test call without a real submission ) -> dict: ''' Place an order and return integer request id provided by client. @@ -391,17 +398,26 @@ class Client: async def bars( self, symbol: str = 'XBTUSD', + # UTC 2017-07-02 12:53:20 - since: int = None, + since: Optional[Union[int, datetime]] = None, count: int = 720, # <- max allowed per query as_np: bool = True, + ) -> dict: + if since is None: since = pendulum.now('UTC').start_of('minute').subtract( minutes=count).timestamp() + elif isinstance(since, int): + since = pendulum.from_timestamp(since).timestamp() + + else: # presumably a pendulum datetime + since = since.timestamp() + # UTC 2017-07-02 12:53:20 is oldest seconds value - since = str(max(1499000000, since)) + since = str(max(1499000000, int(since))) json = await self._public( 'OHLC', data={ @@ -445,7 +461,16 @@ class Client: array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array except KeyError: - raise SymbolNotFound(json['error'][0] + f': {symbol}') + errmsg = json['error'][0] + + if 'not found' in errmsg: + raise SymbolNotFound(errmsg + f': {symbol}') + + elif 'Too many requests' in errmsg: + raise DataThrottle(f'{symbol}') + + else: + raise BrokerError(errmsg) @acm @@ -668,8 +693,8 @@ async def handle_order_requests( oid=msg.oid, reqid=msg.reqid, symbol=msg.symbol, - # TODO: maybe figure out if pending cancels will - # eventually get cancelled + # TODO: maybe figure out if pending + # cancels will eventually get cancelled reason="Order cancel is still pending?", broker_details=resp ).dict() @@ -1003,7 +1028,45 @@ async def open_history_client( # TODO implement history getter for the new storage layer. async with open_cached_client('kraken') as client: - yield client + + # lol, kraken won't send any more then the "last" + # 720 1m bars.. so we have to just ignore further + # requests of this type.. + queries: int = 0 + + async def get_ohlc( + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, + + ) -> tuple[ + np.ndarray, + datetime, # start + datetime, # end + ]: + + nonlocal queries + if queries > 0: + raise DataUnavailable + + count = 0 + while count <= 3: + try: + array = await client.bars( + symbol, + since=end_dt, + ) + count += 1 + queries += 1 + break + except DataThrottle: + log.warning(f'kraken OHLC throttle for {symbol}') + await trio.sleep(1) + + start_dt = pendulum.from_timestamp(array[0]['time']) + end_dt = pendulum.from_timestamp(array[-1]['time']) + return array, start_dt, end_dt + + yield get_ohlc async def backfill_bars(