Add support for Trades ep

kraken_trades_data
Tyler Goodlet 2021-01-11 08:34:59 -05:00
parent e56d065dbc
commit 4bd42b52c9
1 changed files with 118 additions and 32 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -15,10 +15,12 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Kraken backend. kraken buttz.
Get da crypto bois pampin da btcccccssssss (and or da tezos)
""" """
import sys
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict, field from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
@ -30,6 +32,8 @@ from trio_websocket._impl import ConnectionClosed, DisconnectionTimeout
import arrow import arrow
import asks import asks
import numpy as np import numpy as np
from numba import njit, float64
from numba import from_dtype
import trio import trio
import tractor import tractor
@ -46,8 +50,9 @@ log = get_logger(__name__)
# <uri>/<version>/ # <uri>/<version>/
_url = 'https://api.kraken.com/0' _url: str = 'https://api.kraken.com/0'
_epoch_s: int = 1499000000
# Broker specific ohlc schema which includes a vwap field # Broker specific ohlc schema which includes a vwap field
_ohlc_dtype = [ _ohlc_dtype = [
@ -62,13 +67,49 @@ _ohlc_dtype = [
('bar_wap', float), ('bar_wap', float),
] ]
_trade_dtype = [
# ('index', int),
('price', float),
('volume', float),
('time', int),
('is_bid', bool),
('is_limit', bool),
('exchange', 'U16'),
]
# UI components allow this to be declared such that additional # UI components allow this to be declared such that additional
# (historical) fields can be exposed. # (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype) ohlc_dtype = np.dtype(_ohlc_dtype)
trade_dtype = np.dtype(_trade_dtype)
trade_ndtype = from_dtype(trade_dtype)
_show_wap_in_history = True _show_wap_in_history = True
# @njit
def json2np(
trades: list,
out: np.ndarray,
# _dtype=_trade_dtype,
) -> np.ndarray:
for i, trade in enumerate(trades):
price, volume, t, direction, typ, misc = trade
time_ns = float64(t) * 1e9
is_bid = {'s': False, 'b': True}[direction]
is_trade = True # do we care if it's a market vs. limti?
out[i] = (
float64(price),
float64(volume),
time_ns,
is_bid,
is_trade,
'kraken',
)
return out
class Client: class Client:
def __init__(self) -> None: def __init__(self) -> None:
@ -102,19 +143,57 @@ class Client:
true_pair_key, data = next(iter(resp['result'].items())) true_pair_key, data = next(iter(resp['result'].items()))
return data return data
async def trades(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: int = 0, # this is a special value indicating epoch of symbol
as_np: bool = True,
) -> dict:
# UTC 2017-07-02 12:53:20 is oldest seconds value
# since_s = max(_epoch_s, int(since))
# pick a timestamp 1H ago
since_ns = time.time_ns() - 60*60*1e9
json = await self._public(
'Trades',
data={
'pair': symbol,
'since': since_ns,
},
)
res = json['result']
last_ns = res.pop('last')
trades = next(iter(res.values()))
out = np.zeros(1000, dtype=trade_ndtype)
array = json2np(trades, out)
return array
async def bars( async def bars(
self, self,
symbol: str = 'XBTUSD', symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20 # UTC 2017-07-02 12:53:20
since: int = None,
count: int = 720, # <- max allowed per query count: int = 720, # <- max allowed per query
as_np: bool = True, as_np: bool = True,
) -> dict: ) -> dict:
if since is None: """Retreive OHLC bars.
since = arrow.utcnow().floor('minute').shift(
minutes=-count).timestamp Note only a max of 720 candles for each sampling interval
# UTC 2017-07-02 12:53:20 is oldest seconds value can be retreived. To acquire longer term history use .`trades()`
since = str(max(1499000000, since)) above. See here:
https://support.kraken.com/hc/en-us/articles/218198197-How-to-retrieve-historical-time-and-sales-trading-history-using-the-REST-API-Trades-endpoint-
We're mostly just keeping this method for bookkeeping
but we can probably just remove it eventually.
"""
# member 720 is farthest back they'll go
since = arrow.utcnow().floor('minute').shift(
minutes=-720).timestamp
json = await self._public( json = await self._public(
'OHLC', 'OHLC',
data={ data={
@ -131,12 +210,14 @@ class Client:
first = bars[0] first = bars[0]
last_nz_vwap = first[-3] last_nz_vwap = first[-3]
if last_nz_vwap == 0: if last_nz_vwap == 0:
# use close if vwap is zero # use close if vwap is zero
last_nz_vwap = first[-4] last_nz_vwap = first[-4]
# convert all fields to native types # convert all fields to native types
for i, bar in enumerate(bars): for i, bar in enumerate(bars):
# normalize weird zero-ed vwap values..cmon kraken.. # normalize weird zero-ed vwap values..cmon kraken..
# indicates vwap didn't change since last bar # indicates vwap didn't change since last bar
vwap = float(bar.pop(-3)) vwap = float(bar.pop(-3))
@ -295,31 +376,34 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
async def fill_bars( async def fill_bars(
first_bars, first_bars,
shm, shm,
client,
symbol: str, symbol: str,
count: int = 75 count: int = 75
) -> None: ) -> None:
async with get_client() as client: # async with get_client() as client:
next_dt = first_bars[0][1] next_dt = first_bars[0][1]
i = 0 i = 0
while i < count: while i < count:
try:
bars_array = await client.bars(
symbol=symbol,
since=arrow.get(next_dt).floor('minute')
.shift(minutes=-720).timestamp
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars_array[0][1]
await trio.sleep(5) # timestamp in seconds?
since = arrow.get(next_dt).floor(
'minute').shift(minutes=-(2*720)).timestamp
except BaseException as e: try:
log.exception(e) bars_array = await client.bars(symbol=symbol, since=since)
await tractor.breakpoint()
# push to shared mem
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars_array[0][1]
except BaseException as e:
log.exception(e)
await tractor.breakpoint()
_local_buffer_writers = {} _local_buffer_writers = {}
@ -372,17 +456,19 @@ async def stream_quotes(
# we are writer # we are writer
readonly=False, readonly=False,
) )
bars = await client.bars(symbol=symbol) trades = await client.trades(symbol=symbol)
await tractor.breakpoint()
shm.push(bars) shm.push(bars)
shm_token = shm.token shm_token = shm.token
ln.start_soon(fill_bars, bars, shm, symbol) ln.start_soon(fill_bars, bars, shm, client, symbol)
#
times = shm.array['time'] times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s) subscribe_ohlc_for_increment(shm, delay_s)
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
await ctx.send_yield((shm_token, not writer_already_exists)) await ctx.send_yield((shm_token, not writer_already_exists))
@ -391,7 +477,7 @@ async def stream_quotes(
async with trio_websocket.open_websocket_url( async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com', 'wss://ws.kraken.com',
) as ws: ) as ws:
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client: # specific logic for this in kraken's shitty sync client: