Add historical bars retreival

unleash_the_kraken
Tyler Goodlet 2020-07-15 08:20:03 -04:00
parent 7bccfc7b10
commit ffe47acf1d
1 changed files with 123 additions and 6 deletions

View File

@ -1,29 +1,137 @@
""" """
Kraken backend. Kraken backend.
""" """
from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from typing import List from itertools import starmap
from typing import List, Dict, Any
import json import json
import tractor
from trio_websocket import open_websocket_url from trio_websocket import open_websocket_url
import arrow
import asks
import numpy as np
import tractor
from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger
log = get_logger(__name__)
# <uri>/<version>/
_url = 'https://api.kraken.com/0'
# conversion to numpy worthy types
ohlc_dtype = [
('index', int),
('time', int),
('open', float),
('high', float),
('low', float),
('close', float),
('vwap', float),
('volume', float),
('count', int)
]
class Client:
def __init__(self) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url
self._sesh.headers.update({
'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
async def _public(
self,
method: str,
data: dict,
) -> Dict[str, Any]:
resp = await self._sesh.post(
path=f'/public/{method}',
json=data,
timeout=float('inf')
)
return resproc(resp, log)
async def symbol_info(
self,
pair: str = 'all',
):
resp = await self._public('AssetPairs', {'pair': pair})
assert not resp['error']
true_pair_key, data = next(iter(resp['result'].items()))
return data
async def bars(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: int = None,
count: int = 720, # <- max allowed per query
as_np: bool = True,
) -> dict:
if since is None:
since = arrow.utcnow().floor('minute').shift(
minutes=-count).timestamp
# UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, since))
json = await self._public(
'OHLC',
data={
'pair': symbol,
'since': since,
},
)
try:
res = json['result']
res.pop('last')
bars = next(iter(res.values()))
# convert all fields to native types
bars = list(starmap(
lambda i, bar:
(i,) + tuple(
ftype(bar[i]) for i, (name, ftype)
in enumerate(ohlc_dtype[1:])
),
enumerate(bars))
)
return np.array(bars, dtype=ohlc_dtype) if as_np else bars
except KeyError:
raise SymbolNotFound(json['error'][0] + f': {symbol}')
@asynccontextmanager
async def get_client() -> Client:
yield Client()
async def stream_quotes( async def stream_quotes(
pairs: List[str] = ['BTC/USD', 'XRP/USD'], symbols: List[str] = ['BTC/USD', 'XRP/USD'],
sub_type: str = 'ohlc', sub_type: str = 'ohlc',
) -> None: ) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``. """Subscribe for ohlc stream of quotes for ``pairs``.
``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>. ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
""" """
ws_pairs = {}
async with get_client() as client:
for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
async with open_websocket_url( async with open_websocket_url(
'wss://ws.kraken.com', 'wss://ws.kraken.com',
) as ws: ) as ws:
# setup subs # setup subs
# see: https://docs.kraken.com/websockets/#message-subscribe # see: https://docs.kraken.com/websockets/#message-subscribe
subs = { subs = {
'pair': pairs, 'pair': list(ws_pairs.values()),
'event': 'subscribe', 'event': 'subscribe',
'subscription': { 'subscription': {
'name': sub_type, 'name': sub_type,
@ -50,18 +158,27 @@ async def stream_quotes(
low: float # Low price within interval low: float # Low price within interval
close: float # Close price of interval close: float # Close price of interval
vwap: float # Volume weighted average price within interval vwap: float # Volume weighted average price within interval
volume: int # Accumulated volume within interval volume: float # Accumulated volume within interval
count: int # Number of trades within interval count: int # Number of trades within interval
# XXX: ugh, super hideous.. why doesn't
def __post_init__(self):
for field, val in self.__dataclass_fields__.items():
setattr(self, field, val.type(getattr(self, field)))
while True: while True:
msg = await recv() msg = await recv()
if isinstance(msg, dict): if isinstance(msg, dict):
if msg.get('event') == 'heartbeat': if msg.get('event') == 'heartbeat':
continue continue
err = msg.get('errorMessage')
if err:
raise BrokerError(err)
else: else:
chan_id, ohlc_array, chan_name, pair = msg chan_id, ohlc_array, chan_name, pair = msg
ohlc = OHLC(chan_id, chan_name, pair, *ohlc_array) ohlc = OHLC(chan_id, chan_name, pair, *ohlc_array)
yield ohlc print(ohlc)
yield asdict(ohlc)
if __name__ == '__main__': if __name__ == '__main__':