Add shm support to kraken backend

bar_select
Tyler Goodlet 2020-09-26 14:12:54 -04:00
parent d4eb5ccca4
commit bc65040601
1 changed files with 117 additions and 74 deletions

View File

@ -3,8 +3,7 @@ Kraken backend.
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict, field from dataclasses import dataclass, asdict, field
from itertools import starmap from typing import List, Dict, Any, Tuple, Optional
from typing import List, Dict, Any, Callable
import json import json
import time import time
@ -18,6 +17,12 @@ import tractor
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import (
# iterticks,
attach_shm_array,
get_shm_token,
subscribe_ohlc_for_increment,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -26,7 +31,7 @@ log = get_logger(__name__)
_url = 'https://api.kraken.com/0' _url = 'https://api.kraken.com/0'
# conversion to numpy worthy types # Broker specific ohlc schema which includes a vwap field
_ohlc_dtype = [ _ohlc_dtype = [
('index', int), ('index', int),
('time', int), ('time', int),
@ -34,9 +39,9 @@ _ohlc_dtype = [
('high', float), ('high', float),
('low', float), ('low', float),
('close', float), ('close', float),
('vwap', float),
('volume', float), ('volume', float),
('count', int) ('count', int),
('vwap', float),
] ]
# UI components allow this to be declared such that additional # UI components allow this to be declared such that additional
@ -114,18 +119,24 @@ class Client:
for i, bar in enumerate(bars): for i, bar in enumerate(bars):
# normalize weird zero-ed vwap values..cmon kraken.. # normalize weird zero-ed vwap values..cmon kraken..
# indicates vwap didn't change since last bar # indicates vwap didn't change since last bar
vwap = float(bar[-3]) vwap = float(bar.pop(-3))
if vwap != 0: if vwap != 0:
last_nz_vwap = vwap last_nz_vwap = vwap
if vwap == 0: if vwap == 0:
bar[-3] = last_nz_vwap vwap = last_nz_vwap
# re-insert vwap as the last of the fields
bar.append(vwap)
new_bars.append( new_bars.append(
(i,) + tuple( (i,) + tuple(
ftype(bar[j]) for j, (name, ftype) in enumerate(_ohlc_dtype[1:]) ftype(bar[j]) for j, (name, ftype) in enumerate(
_ohlc_dtype[1:]
)
) )
) )
return np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
except KeyError: except KeyError:
raise SymbolNotFound(json['error'][0] + f': {symbol}') raise SymbolNotFound(json['error'][0] + f': {symbol}')
@ -215,15 +226,17 @@ def normalize(
return topic, quote return topic, quote
@tractor.msg.pub # @tractor.msg.pub
async def stream_quotes( async def stream_quotes(
get_topics: Callable, # get_topics: Callable,
shared_array_token: Tuple[str, str, str], shm_token: Tuple[str, str, List[tuple]],
symbols: List[str] = ['XBTUSD', 'XMRUSD'],
# These are the symbols not expected by the ws api # These are the symbols not expected by the ws api
# they are looked up inside this routine. # they are looked up inside this routine.
symbols: List[str] = ['XBTUSD', 'XMRUSD'],
sub_type: str = 'ohlc', sub_type: str = 'ohlc',
loglevel: str = None, loglevel: str = None,
# compat with eventual ``tractor.msg.pub``
topics: Optional[List[str]] = None,
) -> None: ) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``. """Subscribe for ohlc stream of quotes for ``pairs``.
@ -234,84 +247,114 @@ async def stream_quotes(
ws_pairs = {} ws_pairs = {}
async with get_client() as client: async with get_client() as client:
# keep client cached for real-time section
for sym in symbols: for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
while True: # maybe load historical ohlcv in to shared mem
try: # check if shm has already been created by previous
async with trio_websocket.open_websocket_url( # feed initialization
'wss://ws.kraken.com', writer_exists = get_shm_token(shm_token['shm_name'])
) as ws:
# setup subs
# see: https://docs.kraken.com/websockets/#message-subscribe
subs = {
'pair': list(ws_pairs.values()),
'event': 'subscribe',
'subscription': {
'name': sub_type,
'interval': 1, # 1 min
# 'name': 'ticker',
# 'name': 'openOrders',
# 'depth': '25',
},
}
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_message(json.dumps(subs))
async def recv(): symbol = symbols[0]
return json.loads(await ws.get_message())
# pull a first quote and deliver if not writer_exists:
ohlc_gen = recv_ohlc(recv) shm = attach_shm_array(
ohlc_last = await ohlc_gen.__anext__() token=shm_token,
# we are writer
readonly=False,
)
bars = await client.bars(symbol=symbol)
topic, quote = normalize(ohlc_last) shm.push(bars)
shm_token = shm.token
# packetize as {topic: quote} times = shm.array['time']
yield {topic: quote} delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
# keep start of last interval for volume tracking yield shm_token, not writer_exists
last_interval_start = ohlc_last.etime
# start streaming while True:
async for ohlc in ohlc_gen: try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
) as ws:
# setup subs
# https://docs.kraken.com/websockets/#message-subscribe
subs = {
'pair': list(ws_pairs.values()),
'event': 'subscribe',
'subscription': {
'name': sub_type,
'interval': 1, # 1 min
# 'name': 'ticker',
# 'name': 'openOrders',
# 'depth': '25',
},
}
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO
# locked.
await ws.send_message(json.dumps(subs))
# generate tick values to match time & sales pane: async def recv():
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m return json.loads(await ws.get_message())
volume = ohlc.volume
if ohlc.etime > last_interval_start: # new interval
last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
if tick_volume: # pull a first quote and deliver
ohlc.ticks.append({ ohlc_gen = recv_ohlc(recv)
'type': 'trade', ohlc_last = await ohlc_gen.__anext__()
'price': ohlc.close,
'size': tick_volume,
})
topic, quote = normalize(ohlc) topic, quote = normalize(ohlc_last)
# XXX: format required by ``tractor.msg.pub`` # packetize as {topic: quote}
# requires a ``Dict[topic: str, quote: dict]``
yield {topic: quote} yield {topic: quote}
ohlc_last = ohlc # keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime
except (ConnectionClosed, DisconnectionTimeout): # start streaming
log.exception("Good job kraken...reconnecting") async for ohlc in ohlc_gen:
# generate tick values to match time & sales pane:
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume
if ohlc.etime > last_interval_start: # new interval
last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
if __name__ == '__main__': last = ohlc.close
if tick_volume:
ohlc.ticks.append({
'type': 'trade',
'price': last,
'size': tick_volume,
})
async def stream_ohlc(): topic, quote = normalize(ohlc)
async for msg in stream_quotes():
print(msg)
tractor.run(stream_ohlc) # if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_exists:
# update last entry
# benchmarked in the 4-5 us range
high, low = shm.array[-1][['high', 'low']]
shm.array[['high', 'low', 'close', 'vwap']][-1] = (
max(high, last),
min(low, last),
last,
ohlc.vwap,
)
# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
yield {topic: quote}
ohlc_last = ohlc
except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting")