Add historical bars retreival

its_happening
Tyler Goodlet 2020-07-15 08:20:03 -04:00
parent cf8a5d04ce
commit 7395b22e3d
1 changed files with 123 additions and 6 deletions

View File

@ -1,29 +1,137 @@
"""
Kraken backend.
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict
from typing import List
from itertools import starmap
from typing import List, Dict, Any
import json
import tractor
from trio_websocket import open_websocket_url
import arrow
import asks
import numpy as np
import tractor
from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger
log = get_logger(__name__)
# <uri>/<version>/
_url = 'https://api.kraken.com/0'
# conversion to numpy worthy types
ohlc_dtype = [
('index', int),
('time', int),
('open', float),
('high', float),
('low', float),
('close', float),
('vwap', float),
('volume', float),
('count', int)
]
class Client:
def __init__(self) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url
self._sesh.headers.update({
'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
async def _public(
self,
method: str,
data: dict,
) -> Dict[str, Any]:
resp = await self._sesh.post(
path=f'/public/{method}',
json=data,
timeout=float('inf')
)
return resproc(resp, log)
async def symbol_info(
self,
pair: str = 'all',
):
resp = await self._public('AssetPairs', {'pair': pair})
assert not resp['error']
true_pair_key, data = next(iter(resp['result'].items()))
return data
async def bars(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: int = None,
count: int = 720, # <- max allowed per query
as_np: bool = True,
) -> dict:
if since is None:
since = arrow.utcnow().floor('minute').shift(
minutes=-count).timestamp
# UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, since))
json = await self._public(
'OHLC',
data={
'pair': symbol,
'since': since,
},
)
try:
res = json['result']
res.pop('last')
bars = next(iter(res.values()))
# convert all fields to native types
bars = list(starmap(
lambda i, bar:
(i,) + tuple(
ftype(bar[i]) for i, (name, ftype)
in enumerate(ohlc_dtype[1:])
),
enumerate(bars))
)
return np.array(bars, dtype=ohlc_dtype) if as_np else bars
except KeyError:
raise SymbolNotFound(json['error'][0] + f': {symbol}')
@asynccontextmanager
async def get_client() -> Client:
yield Client()
async def stream_quotes(
pairs: List[str] = ['BTC/USD', 'XRP/USD'],
symbols: List[str] = ['BTC/USD', 'XRP/USD'],
sub_type: str = 'ohlc',
) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``.
``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
"""
ws_pairs = {}
async with get_client() as client:
for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
async with open_websocket_url(
'wss://ws.kraken.com',
) as ws:
# setup subs
# see: https://docs.kraken.com/websockets/#message-subscribe
subs = {
'pair': pairs,
'pair': list(ws_pairs.values()),
'event': 'subscribe',
'subscription': {
'name': sub_type,
@ -50,18 +158,27 @@ async def stream_quotes(
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: int # Accumulated volume within interval
volume: float # Accumulated volume within interval
count: int # Number of trades within interval
# XXX: ugh, super hideous.. why doesn't
def __post_init__(self):
for field, val in self.__dataclass_fields__.items():
setattr(self, field, val.type(getattr(self, field)))
while True:
msg = await recv()
if isinstance(msg, dict):
if msg.get('event') == 'heartbeat':
continue
err = msg.get('errorMessage')
if err:
raise BrokerError(err)
else:
chan_id, ohlc_array, chan_name, pair = msg
ohlc = OHLC(chan_id, chan_name, pair, *ohlc_array)
yield ohlc
print(ohlc)
yield asdict(ohlc)
if __name__ == '__main__':