diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py
index cc04cbf1..22c001b4 100644
--- a/piker/brokers/kraken.py
+++ b/piker/brokers/kraken.py
@@ -1,5 +1,5 @@
# piker: trading gear for hackers
-# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
+# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@@ -15,10 +15,12 @@
# along with this program. If not, see .
"""
-Kraken backend.
+kraken buttz.
+
+Get da crypto bois pampin da btcccccssssss (and or da tezos)
+
"""
-import sys
from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
@@ -30,6 +32,8 @@ from trio_websocket._impl import ConnectionClosed, DisconnectionTimeout
import arrow
import asks
import numpy as np
+from numba import njit, float64
+from numba import from_dtype
import trio
import tractor
@@ -46,8 +50,9 @@ log = get_logger(__name__)
# //
-_url = 'https://api.kraken.com/0'
+_url: str = 'https://api.kraken.com/0'
+_epoch_s: int = 1499000000
# Broker specific ohlc schema which includes a vwap field
_ohlc_dtype = [
@@ -62,13 +67,49 @@ _ohlc_dtype = [
('bar_wap', float),
]
+_trade_dtype = [
+ # ('index', int),
+ ('price', float),
+ ('volume', float),
+ ('time', int),
+ ('is_bid', bool),
+ ('is_limit', bool),
+ ('exchange', 'U16'),
+]
+
# UI components allow this to be declared such that additional
# (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype)
+trade_dtype = np.dtype(_trade_dtype)
+trade_ndtype = from_dtype(trade_dtype)
_show_wap_in_history = True
+# @njit
+def json2np(
+ trades: list,
+ out: np.ndarray,
+ # _dtype=_trade_dtype,
+) -> np.ndarray:
+
+ for i, trade in enumerate(trades):
+ price, volume, t, direction, typ, misc = trade
+ time_ns = float64(t) * 1e9
+ is_bid = {'s': False, 'b': True}[direction]
+ is_trade = True # do we care if it's a market vs. limti?
+ out[i] = (
+ float64(price),
+ float64(volume),
+ time_ns,
+ is_bid,
+ is_trade,
+ 'kraken',
+ )
+
+ return out
+
+
class Client:
def __init__(self) -> None:
@@ -102,19 +143,57 @@ class Client:
true_pair_key, data = next(iter(resp['result'].items()))
return data
+ async def trades(
+ self,
+ symbol: str = 'XBTUSD',
+ # UTC 2017-07-02 12:53:20
+ since: int = 0, # this is a special value indicating epoch of symbol
+ as_np: bool = True,
+ ) -> dict:
+
+ # UTC 2017-07-02 12:53:20 is oldest seconds value
+ # since_s = max(_epoch_s, int(since))
+
+ # pick a timestamp 1H ago
+ since_ns = time.time_ns() - 60*60*1e9
+
+ json = await self._public(
+ 'Trades',
+ data={
+ 'pair': symbol,
+ 'since': since_ns,
+ },
+ )
+ res = json['result']
+ last_ns = res.pop('last')
+ trades = next(iter(res.values()))
+
+ out = np.zeros(1000, dtype=trade_ndtype)
+ array = json2np(trades, out)
+ return array
+
async def bars(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
- since: int = None,
count: int = 720, # <- max allowed per query
as_np: bool = True,
) -> dict:
- if since is None:
- since = arrow.utcnow().floor('minute').shift(
- minutes=-count).timestamp
- # UTC 2017-07-02 12:53:20 is oldest seconds value
- since = str(max(1499000000, since))
+ """Retreive OHLC bars.
+
+ Note only a max of 720 candles for each sampling interval
+ can be retreived. To acquire longer term history use .`trades()`
+ above. See here:
+ https://support.kraken.com/hc/en-us/articles/218198197-How-to-retrieve-historical-time-and-sales-trading-history-using-the-REST-API-Trades-endpoint-
+
+ We're mostly just keeping this method for bookkeeping
+ but we can probably just remove it eventually.
+
+ """
+ # member 720 is farthest back they'll go
+ since = arrow.utcnow().floor('minute').shift(
+ minutes=-720).timestamp
+
json = await self._public(
'OHLC',
data={
@@ -131,12 +210,14 @@ class Client:
first = bars[0]
last_nz_vwap = first[-3]
+
if last_nz_vwap == 0:
# use close if vwap is zero
last_nz_vwap = first[-4]
# convert all fields to native types
for i, bar in enumerate(bars):
+
# normalize weird zero-ed vwap values..cmon kraken..
# indicates vwap didn't change since last bar
vwap = float(bar.pop(-3))
@@ -295,31 +376,34 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
async def fill_bars(
first_bars,
shm,
+ client,
symbol: str,
count: int = 75
) -> None:
- async with get_client() as client:
+ # async with get_client() as client:
- next_dt = first_bars[0][1]
- i = 0
- while i < count:
-
- try:
- bars_array = await client.bars(
- symbol=symbol,
- since=arrow.get(next_dt).floor('minute')
- .shift(minutes=-720).timestamp
- )
- shm.push(bars_array, prepend=True)
- i += 1
- next_dt = bars_array[0][1]
+ next_dt = first_bars[0][1]
+ i = 0
+ while i < count:
- await trio.sleep(5)
+ # timestamp in seconds?
+ since = arrow.get(next_dt).floor(
+ 'minute').shift(minutes=-(2*720)).timestamp
- except BaseException as e:
- log.exception(e)
- await tractor.breakpoint()
+ try:
+ bars_array = await client.bars(symbol=symbol, since=since)
+
+ # push to shared mem
+ shm.push(bars_array, prepend=True)
+
+ i += 1
+
+ next_dt = bars_array[0][1]
+
+ except BaseException as e:
+ log.exception(e)
+ await tractor.breakpoint()
_local_buffer_writers = {}
@@ -372,17 +456,19 @@ async def stream_quotes(
# we are writer
readonly=False,
)
- bars = await client.bars(symbol=symbol)
+ trades = await client.trades(symbol=symbol)
+ await tractor.breakpoint()
shm.push(bars)
shm_token = shm.token
- ln.start_soon(fill_bars, bars, shm, symbol)
+ ln.start_soon(fill_bars, bars, shm, client, symbol)
+ #
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
-
+
# pass back token, and bool, signalling if we're the writer
await ctx.send_yield((shm_token, not writer_already_exists))
@@ -391,7 +477,7 @@ async def stream_quotes(
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
) as ws:
-
+
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client: