Compare commits

...

4 Commits

Author SHA1 Message Date
Tyler Goodlet 513646c802 Use @njit throughout 2021-01-11 08:40:13 -05:00
Tyler Goodlet 4bd42b52c9 Add support for Trades ep 2021-01-11 08:34:59 -05:00
Guillermo Rodriguez e56d065dbc
Add fill_bars function to kraken 2021-01-04 08:30:05 -03:00
Guillermo Rodriguez b1fd986a3a
Change kraken.stream_quotes to use tractor.stream api 2021-01-02 12:35:38 -03:00
5 changed files with 282 additions and 151 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -15,11 +15,15 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Kraken backend. kraken buttz.
Get da crypto bois pampin da btcccccssssss (and or da tezos)
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict, field from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import json import json
import time import time
@ -28,6 +32,8 @@ from trio_websocket._impl import ConnectionClosed, DisconnectionTimeout
import arrow import arrow
import asks import asks
import numpy as np import numpy as np
from numba import njit, float64
from numba import from_dtype
import trio import trio
import tractor import tractor
@ -44,8 +50,9 @@ log = get_logger(__name__)
# <uri>/<version>/ # <uri>/<version>/
_url = 'https://api.kraken.com/0' _url: str = 'https://api.kraken.com/0'
_epoch_s: int = 1499000000
# Broker specific ohlc schema which includes a vwap field # Broker specific ohlc schema which includes a vwap field
_ohlc_dtype = [ _ohlc_dtype = [
@ -60,13 +67,49 @@ _ohlc_dtype = [
('bar_wap', float), ('bar_wap', float),
] ]
_trade_dtype = [
# ('index', int),
('price', float),
('volume', float),
('time', int),
('is_bid', bool),
('is_limit', bool),
('exchange', 'U16'),
]
# UI components allow this to be declared such that additional # UI components allow this to be declared such that additional
# (historical) fields can be exposed. # (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype) ohlc_dtype = np.dtype(_ohlc_dtype)
trade_dtype = np.dtype(_trade_dtype)
trade_ndtype = from_dtype(trade_dtype)
_show_wap_in_history = True _show_wap_in_history = True
# @njit
def json2np(
trades: list,
out: np.ndarray,
# _dtype=_trade_dtype,
) -> np.ndarray:
for i, trade in enumerate(trades):
price, volume, t, direction, typ, misc = trade
time_ns = float64(t) * 1e9
is_bid = {'s': False, 'b': True}[direction]
is_trade = True # do we care if it's a market vs. limti?
out[i] = (
float64(price),
float64(volume),
time_ns,
is_bid,
is_trade,
'kraken',
)
return out
class Client: class Client:
def __init__(self) -> None: def __init__(self) -> None:
@ -100,19 +143,57 @@ class Client:
true_pair_key, data = next(iter(resp['result'].items())) true_pair_key, data = next(iter(resp['result'].items()))
return data return data
async def trades(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: int = 0, # this is a special value indicating epoch of symbol
as_np: bool = True,
) -> dict:
# UTC 2017-07-02 12:53:20 is oldest seconds value
# since_s = max(_epoch_s, int(since))
# pick a timestamp 1H ago
since_ns = time.time_ns() - 60*60*1e9
json = await self._public(
'Trades',
data={
'pair': symbol,
'since': since_ns,
},
)
res = json['result']
last_ns = res.pop('last')
trades = next(iter(res.values()))
out = np.zeros(1000, dtype=trade_ndtype)
array = json2np(trades, out)
return array
async def bars( async def bars(
self, self,
symbol: str = 'XBTUSD', symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20 # UTC 2017-07-02 12:53:20
since: int = None,
count: int = 720, # <- max allowed per query count: int = 720, # <- max allowed per query
as_np: bool = True, as_np: bool = True,
) -> dict: ) -> dict:
if since is None: """Retreive OHLC bars.
since = arrow.utcnow().floor('minute').shift(
minutes=-count).timestamp Note only a max of 720 candles for each sampling interval
# UTC 2017-07-02 12:53:20 is oldest seconds value can be retreived. To acquire longer term history use .`trades()`
since = str(max(1499000000, since)) above. See here:
https://support.kraken.com/hc/en-us/articles/218198197-How-to-retrieve-historical-time-and-sales-trading-history-using-the-REST-API-Trades-endpoint-
We're mostly just keeping this method for bookkeeping
but we can probably just remove it eventually.
"""
# member 720 is farthest back they'll go
since = arrow.utcnow().floor('minute').shift(
minutes=-720).timestamp
json = await self._public( json = await self._public(
'OHLC', 'OHLC',
data={ data={
@ -129,12 +210,14 @@ class Client:
first = bars[0] first = bars[0]
last_nz_vwap = first[-3] last_nz_vwap = first[-3]
if last_nz_vwap == 0: if last_nz_vwap == 0:
# use close if vwap is zero # use close if vwap is zero
last_nz_vwap = first[-4] last_nz_vwap = first[-4]
# convert all fields to native types # convert all fields to native types
for i, bar in enumerate(bars): for i, bar in enumerate(bars):
# normalize weird zero-ed vwap values..cmon kraken.. # normalize weird zero-ed vwap values..cmon kraken..
# indicates vwap didn't change since last bar # indicates vwap didn't change since last bar
vwap = float(bar.pop(-3)) vwap = float(bar.pop(-3))
@ -290,170 +373,220 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
} }
# @tractor.msg.pub async def fill_bars(
async def stream_quotes( first_bars,
# get_topics: Callable, shm,
shm_token: Tuple[str, str, List[tuple]], client,
symbols: List[str] = ['XBTUSD', 'XMRUSD'], symbol: str,
# These are the symbols not expected by the ws api count: int = 75
# they are looked up inside this routine.
sub_type: str = 'ohlc',
loglevel: str = None,
# compat with eventual ``tractor.msg.pub``
topics: Optional[List[str]] = None,
) -> None: ) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``.
``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>. # async with get_client() as client:
"""
next_dt = first_bars[0][1]
i = 0
while i < count:
# timestamp in seconds?
since = arrow.get(next_dt).floor(
'minute').shift(minutes=-(2*720)).timestamp
try:
bars_array = await client.bars(symbol=symbol, since=since)
# push to shared mem
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars_array[0][1]
except BaseException as e:
log.exception(e)
await tractor.breakpoint()
_local_buffer_writers = {}
@asynccontextmanager
async def activate_writer(key: str) -> (bool, trio.Nursery):
try:
writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists:
_local_buffer_writers[key] = True
async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally:
_local_buffer_writers.pop(key, None)
@tractor.stream
async def stream_quotes(
ctx: tractor.Context,
symbols: List[str],
shm_token: Tuple[str, str, List[tuple]],
loglevel: str = None,
# compat for @tractor.msg.pub
topics: Any = None
) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
ws_pairs = {} ws_pairs = {}
async with get_client() as client: async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
async with get_client() as client:
# keep client cached for real-time section
for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
# keep client cached for real-time section symbol = symbols[0]
for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
# maybe load historical ohlcv in to shared mem if not writer_already_exists:
# check if shm has already been created by previous shm = attach_shm_array(
# feed initialization token=shm_token,
writer_exists = get_shm_token(shm_token['shm_name']) # we are writer
readonly=False,
)
trades = await client.trades(symbol=symbol)
await tractor.breakpoint()
symbol = symbols[0] shm.push(bars)
shm_token = shm.token
if not writer_exists: ln.start_soon(fill_bars, bars, shm, client, symbol)
shm = attach_shm_array(
token=shm_token,
# we are writer
readonly=False,
)
bars = await client.bars(symbol=symbol)
shm.push(bars) #
shm_token = shm.token times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
times = shm.array['time'] # pass back token, and bool, signalling if we're the writer
delay_s = times[-1] - times[times != times[-1]][-1] await ctx.send_yield((shm_token, not writer_already_exists))
subscribe_ohlc_for_increment(shm, delay_s)
yield shm_token, not writer_exists while True:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
) as ws:
while True: # XXX: setup subs
try: # https://docs.kraken.com/websockets/#message-subscribe
async with trio_websocket.open_websocket_url( # specific logic for this in kraken's shitty sync client:
'wss://ws.kraken.com/', # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
) as ws: ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
# XXX: setup subs # TODO: we want to eventually allow unsubs which should
# https://docs.kraken.com/websockets/#message-subscribe # be completely fine to request from a separate task
# specific logic for this in kraken's shitty sync client: # since internally the ws methods appear to be FIFO
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 # locked.
ohlc_sub = make_sub( await ws.send_message(json.dumps(ohlc_sub))
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
# TODO: we want to eventually allow unsubs which should # trade data (aka L1)
# be completely fine to request from a separate task l1_sub = make_sub(
# since internally the ws methods appear to be FIFO list(ws_pairs.values()),
# locked. {'name': 'spread'} # 'depth': 10}
await ws.send_message(json.dumps(ohlc_sub))
# trade data (aka L1) )
l1_sub = make_sub( await ws.send_message(json.dumps(l1_sub))
list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10}
) async def recv():
await ws.send_message(json.dumps(l1_sub)) return json.loads(await ws.get_message())
async def recv(): # pull a first quote and deliver
return json.loads(await ws.get_message()) msg_gen = recv_msg(recv)
typ, ohlc_last = await msg_gen.__anext__()
# pull a first quote and deliver topic, quote = normalize(ohlc_last)
msg_gen = recv_msg(recv)
typ, ohlc_last = await msg_gen.__anext__()
topic, quote = normalize(ohlc_last) # packetize as {topic: quote}
await ctx.send_yield({topic: quote})
# packetize as {topic: quote} # keep start of last interval for volume tracking
yield {topic: quote} last_interval_start = ohlc_last.etime
# keep start of last interval for volume tracking # start streaming
last_interval_start = ohlc_last.etime async for typ, ohlc in msg_gen:
# start streaming if typ == 'ohlc':
async for typ, ohlc in msg_gen:
if typ == 'ohlc': # TODO: can get rid of all this by using
# ``trades`` subscription...
# TODO: can get rid of all this by using # generate tick values to match time & sales pane:
# ``trades`` subscription... # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume
# generate tick values to match time & sales pane: # new interval
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m if ohlc.etime > last_interval_start:
volume = ohlc.volume last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
# new interval last = ohlc.close
if ohlc.etime > last_interval_start: if tick_volume:
last_interval_start = ohlc.etime ohlc.ticks.append({
tick_volume = volume 'type': 'trade',
else: 'price': last,
# this is the tick volume *within the interval* 'size': tick_volume,
tick_volume = volume - ohlc_last.volume })
last = ohlc.close topic, quote = normalize(ohlc)
if tick_volume:
ohlc.ticks.append({
'type': 'trade',
'price': last,
'size': tick_volume,
})
topic, quote = normalize(ohlc) # if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_already_exists:
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick_volume
# if we are the lone tick writer start writing if v == 0 and new_v:
# the buffer with appropriate trade data # no trades for this bar yet so the open
if not writer_exists: # is also the close/last trade price
# update last entry o = last
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick_volume
if v == 0 and new_v: # write shm
# no trades for this bar yet so the open shm.array[
# is also the close/last trade price ['open',
o = last 'high',
'low',
'close',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
max(high, last),
min(low, last),
last,
ohlc.vwap,
volume,
)
ohlc_last = ohlc
# write shm elif typ == 'l1':
shm.array[ quote = ohlc
['open', topic = quote['symbol']
'high',
'low',
'close',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
max(high, last),
min(low, last),
last,
ohlc.vwap,
volume,
)
ohlc_last = ohlc
elif typ == 'l1': # XXX: format required by ``tractor.msg.pub``
quote = ohlc # requires a ``Dict[topic: str, quote: dict]``
topic = quote['symbol'] await ctx.send_yield({topic: quote})
# XXX: format required by ``tractor.msg.pub`` except (ConnectionClosed, DisconnectionTimeout):
# requires a ``Dict[topic: str, quote: dict]`` log.exception("Good job kraken...reconnecting")
yield {topic: quote}
except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting")

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by

View File

@ -20,7 +20,7 @@ Momentum bby.
from typing import AsyncIterator, Optional from typing import AsyncIterator, Optional
import numpy as np import numpy as np
from numba import jit, float64, optional, int64 from numba import njit, float64, optional, int64
from ..data._normalize import iterticks from ..data._normalize import iterticks
@ -29,13 +29,12 @@ from ..data._normalize import iterticks
# - how to handle non-plottable values # - how to handle non-plottable values
# - composition of fsps / implicit chaining # - composition of fsps / implicit chaining
@jit( @njit(
float64[:]( float64[:](
float64[:], float64[:],
optional(float64), optional(float64),
optional(float64) optional(float64)
), ),
nopython=True,
nogil=True nogil=True
) )
def ema( def ema(
@ -94,7 +93,7 @@ def ema(
return s return s
# @jit( # @njit(
# float64[:]( # float64[:](
# float64[:], # float64[:],
# int64, # int64,

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by

View File

@ -21,7 +21,7 @@ from typing import List, Optional, Tuple
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
from numba import jit, float64, int64 # , optional from numba import njit, float64, int64 # , optional
from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QLineF, QPointF from PyQt5.QtCore import QLineF, QPointF
# from numba import types as ntypes # from numba import types as ntypes
@ -70,7 +70,7 @@ def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]:
return [hl, o, c] return [hl, o, c]
@jit( @njit(
# TODO: for now need to construct this manually for readonly arrays, see # TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511 # https://github.com/numba/numba/issues/4511
# ntypes.Tuple((float64[:], float64[:], float64[:]))( # ntypes.Tuple((float64[:], float64[:], float64[:]))(
@ -78,7 +78,6 @@ def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]:
# int64, # int64,
# optional(float64), # optional(float64),
# ), # ),
nopython=True,
nogil=True nogil=True
) )
def path_arrays_from_ohlc( def path_arrays_from_ohlc(