From 4bd42b52c90c2d8dfb521d119711cd8ae5195e1d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Jan 2021 08:34:59 -0500 Subject: [PATCH] Add support for Trades ep --- piker/brokers/kraken.py | 150 +++++++++++++++++++++++++++++++--------- 1 file changed, 118 insertions(+), 32 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index cc04cbf1..22c001b4 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -15,10 +15,12 @@ # along with this program. If not, see . """ -Kraken backend. +kraken buttz. + +Get da crypto bois pampin da btcccccssssss (and or da tezos) + """ -import sys from contextlib import asynccontextmanager from dataclasses import dataclass, asdict, field from typing import List, Dict, Any, Tuple, Optional, AsyncIterator @@ -30,6 +32,8 @@ from trio_websocket._impl import ConnectionClosed, DisconnectionTimeout import arrow import asks import numpy as np +from numba import njit, float64 +from numba import from_dtype import trio import tractor @@ -46,8 +50,9 @@ log = get_logger(__name__) # // -_url = 'https://api.kraken.com/0' +_url: str = 'https://api.kraken.com/0' +_epoch_s: int = 1499000000 # Broker specific ohlc schema which includes a vwap field _ohlc_dtype = [ @@ -62,13 +67,49 @@ _ohlc_dtype = [ ('bar_wap', float), ] +_trade_dtype = [ + # ('index', int), + ('price', float), + ('volume', float), + ('time', int), + ('is_bid', bool), + ('is_limit', bool), + ('exchange', 'U16'), +] + # UI components allow this to be declared such that additional # (historical) fields can be exposed. ohlc_dtype = np.dtype(_ohlc_dtype) +trade_dtype = np.dtype(_trade_dtype) +trade_ndtype = from_dtype(trade_dtype) _show_wap_in_history = True +# @njit +def json2np( + trades: list, + out: np.ndarray, + # _dtype=_trade_dtype, +) -> np.ndarray: + + for i, trade in enumerate(trades): + price, volume, t, direction, typ, misc = trade + time_ns = float64(t) * 1e9 + is_bid = {'s': False, 'b': True}[direction] + is_trade = True # do we care if it's a market vs. limti? + out[i] = ( + float64(price), + float64(volume), + time_ns, + is_bid, + is_trade, + 'kraken', + ) + + return out + + class Client: def __init__(self) -> None: @@ -102,19 +143,57 @@ class Client: true_pair_key, data = next(iter(resp['result'].items())) return data + async def trades( + self, + symbol: str = 'XBTUSD', + # UTC 2017-07-02 12:53:20 + since: int = 0, # this is a special value indicating epoch of symbol + as_np: bool = True, + ) -> dict: + + # UTC 2017-07-02 12:53:20 is oldest seconds value + # since_s = max(_epoch_s, int(since)) + + # pick a timestamp 1H ago + since_ns = time.time_ns() - 60*60*1e9 + + json = await self._public( + 'Trades', + data={ + 'pair': symbol, + 'since': since_ns, + }, + ) + res = json['result'] + last_ns = res.pop('last') + trades = next(iter(res.values())) + + out = np.zeros(1000, dtype=trade_ndtype) + array = json2np(trades, out) + return array + async def bars( self, symbol: str = 'XBTUSD', # UTC 2017-07-02 12:53:20 - since: int = None, count: int = 720, # <- max allowed per query as_np: bool = True, ) -> dict: - if since is None: - since = arrow.utcnow().floor('minute').shift( - minutes=-count).timestamp - # UTC 2017-07-02 12:53:20 is oldest seconds value - since = str(max(1499000000, since)) + """Retreive OHLC bars. + + Note only a max of 720 candles for each sampling interval + can be retreived. To acquire longer term history use .`trades()` + above. See here: + https://support.kraken.com/hc/en-us/articles/218198197-How-to-retrieve-historical-time-and-sales-trading-history-using-the-REST-API-Trades-endpoint- + + We're mostly just keeping this method for bookkeeping + but we can probably just remove it eventually. + + """ + # member 720 is farthest back they'll go + since = arrow.utcnow().floor('minute').shift( + minutes=-720).timestamp + json = await self._public( 'OHLC', data={ @@ -131,12 +210,14 @@ class Client: first = bars[0] last_nz_vwap = first[-3] + if last_nz_vwap == 0: # use close if vwap is zero last_nz_vwap = first[-4] # convert all fields to native types for i, bar in enumerate(bars): + # normalize weird zero-ed vwap values..cmon kraken.. # indicates vwap didn't change since last bar vwap = float(bar.pop(-3)) @@ -295,31 +376,34 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: async def fill_bars( first_bars, shm, + client, symbol: str, count: int = 75 ) -> None: - async with get_client() as client: + # async with get_client() as client: - next_dt = first_bars[0][1] - i = 0 - while i < count: - - try: - bars_array = await client.bars( - symbol=symbol, - since=arrow.get(next_dt).floor('minute') - .shift(minutes=-720).timestamp - ) - shm.push(bars_array, prepend=True) - i += 1 - next_dt = bars_array[0][1] + next_dt = first_bars[0][1] + i = 0 + while i < count: - await trio.sleep(5) + # timestamp in seconds? + since = arrow.get(next_dt).floor( + 'minute').shift(minutes=-(2*720)).timestamp - except BaseException as e: - log.exception(e) - await tractor.breakpoint() + try: + bars_array = await client.bars(symbol=symbol, since=since) + + # push to shared mem + shm.push(bars_array, prepend=True) + + i += 1 + + next_dt = bars_array[0][1] + + except BaseException as e: + log.exception(e) + await tractor.breakpoint() _local_buffer_writers = {} @@ -372,17 +456,19 @@ async def stream_quotes( # we are writer readonly=False, ) - bars = await client.bars(symbol=symbol) + trades = await client.trades(symbol=symbol) + await tractor.breakpoint() shm.push(bars) shm_token = shm.token - ln.start_soon(fill_bars, bars, shm, symbol) + ln.start_soon(fill_bars, bars, shm, client, symbol) + # times = shm.array['time'] delay_s = times[-1] - times[times != times[-1]][-1] subscribe_ohlc_for_increment(shm, delay_s) - + # pass back token, and bool, signalling if we're the writer await ctx.send_yield((shm_token, not writer_already_exists)) @@ -391,7 +477,7 @@ async def stream_quotes( async with trio_websocket.open_websocket_url( 'wss://ws.kraken.com', ) as ws: - + # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe # specific logic for this in kraken's shitty sync client: