From bc650406010e112adca9dd058b32fb9a4ae2fc5f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 26 Sep 2020 14:12:54 -0400 Subject: [PATCH] Add shm support to kraken backend --- piker/brokers/kraken.py | 191 ++++++++++++++++++++++++---------------- 1 file changed, 117 insertions(+), 74 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index f2da2bb8..a79a6399 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -3,8 +3,7 @@ Kraken backend. """ from contextlib import asynccontextmanager from dataclasses import dataclass, asdict, field -from itertools import starmap -from typing import List, Dict, Any, Callable +from typing import List, Dict, Any, Tuple, Optional import json import time @@ -18,6 +17,12 @@ import tractor from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log +from ..data import ( + # iterticks, + attach_shm_array, + get_shm_token, + subscribe_ohlc_for_increment, +) log = get_logger(__name__) @@ -26,7 +31,7 @@ log = get_logger(__name__) _url = 'https://api.kraken.com/0' -# conversion to numpy worthy types +# Broker specific ohlc schema which includes a vwap field _ohlc_dtype = [ ('index', int), ('time', int), @@ -34,9 +39,9 @@ _ohlc_dtype = [ ('high', float), ('low', float), ('close', float), - ('vwap', float), ('volume', float), - ('count', int) + ('count', int), + ('vwap', float), ] # UI components allow this to be declared such that additional @@ -114,18 +119,24 @@ class Client: for i, bar in enumerate(bars): # normalize weird zero-ed vwap values..cmon kraken.. # indicates vwap didn't change since last bar - vwap = float(bar[-3]) + vwap = float(bar.pop(-3)) if vwap != 0: last_nz_vwap = vwap if vwap == 0: - bar[-3] = last_nz_vwap + vwap = last_nz_vwap + + # re-insert vwap as the last of the fields + bar.append(vwap) new_bars.append( (i,) + tuple( - ftype(bar[j]) for j, (name, ftype) in enumerate(_ohlc_dtype[1:]) + ftype(bar[j]) for j, (name, ftype) in enumerate( + _ohlc_dtype[1:] + ) ) ) - return np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars + return array except KeyError: raise SymbolNotFound(json['error'][0] + f': {symbol}') @@ -215,15 +226,17 @@ def normalize( return topic, quote -@tractor.msg.pub +# @tractor.msg.pub async def stream_quotes( - get_topics: Callable, - shared_array_token: Tuple[str, str, str], + # get_topics: Callable, + shm_token: Tuple[str, str, List[tuple]], + symbols: List[str] = ['XBTUSD', 'XMRUSD'], # These are the symbols not expected by the ws api # they are looked up inside this routine. - symbols: List[str] = ['XBTUSD', 'XMRUSD'], sub_type: str = 'ohlc', loglevel: str = None, + # compat with eventual ``tractor.msg.pub`` + topics: Optional[List[str]] = None, ) -> None: """Subscribe for ohlc stream of quotes for ``pairs``. @@ -234,84 +247,114 @@ async def stream_quotes( ws_pairs = {} async with get_client() as client: + # keep client cached for real-time section for sym in symbols: ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] - while True: - try: - async with trio_websocket.open_websocket_url( - 'wss://ws.kraken.com', - ) as ws: - # setup subs - # see: https://docs.kraken.com/websockets/#message-subscribe - subs = { - 'pair': list(ws_pairs.values()), - 'event': 'subscribe', - 'subscription': { - 'name': sub_type, - 'interval': 1, # 1 min - # 'name': 'ticker', - # 'name': 'openOrders', - # 'depth': '25', - }, - } - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_message(json.dumps(subs)) + # maybe load historical ohlcv in to shared mem + # check if shm has already been created by previous + # feed initialization + writer_exists = get_shm_token(shm_token['shm_name']) - async def recv(): - return json.loads(await ws.get_message()) + symbol = symbols[0] - # pull a first quote and deliver - ohlc_gen = recv_ohlc(recv) - ohlc_last = await ohlc_gen.__anext__() + if not writer_exists: + shm = attach_shm_array( + token=shm_token, + # we are writer + readonly=False, + ) + bars = await client.bars(symbol=symbol) - topic, quote = normalize(ohlc_last) + shm.push(bars) + shm_token = shm.token - # packetize as {topic: quote} - yield {topic: quote} + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + subscribe_ohlc_for_increment(shm, delay_s) - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime + yield shm_token, not writer_exists - # start streaming - async for ohlc in ohlc_gen: + while True: + try: + async with trio_websocket.open_websocket_url( + 'wss://ws.kraken.com', + ) as ws: + # setup subs + # https://docs.kraken.com/websockets/#message-subscribe + subs = { + 'pair': list(ws_pairs.values()), + 'event': 'subscribe', + 'subscription': { + 'name': sub_type, + 'interval': 1, # 1 min + # 'name': 'ticker', + # 'name': 'openOrders', + # 'depth': '25', + }, + } + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_message(json.dumps(subs)) - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume - if ohlc.etime > last_interval_start: # new interval - last_interval_start = ohlc.etime - tick_volume = volume - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume + async def recv(): + return json.loads(await ws.get_message()) - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': ohlc.close, - 'size': tick_volume, - }) + # pull a first quote and deliver + ohlc_gen = recv_ohlc(recv) + ohlc_last = await ohlc_gen.__anext__() - topic, quote = normalize(ohlc) + topic, quote = normalize(ohlc_last) - # XXX: format required by ``tractor.msg.pub`` - # requires a ``Dict[topic: str, quote: dict]`` + # packetize as {topic: quote} yield {topic: quote} - ohlc_last = ohlc + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime - except (ConnectionClosed, DisconnectionTimeout): - log.exception("Good job kraken...reconnecting") + # start streaming + async for ohlc in ohlc_gen: + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = ohlc.volume + if ohlc.etime > last_interval_start: # new interval + last_interval_start = ohlc.etime + tick_volume = volume + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume -if __name__ == '__main__': + last = ohlc.close + if tick_volume: + ohlc.ticks.append({ + 'type': 'trade', + 'price': last, + 'size': tick_volume, + }) - async def stream_ohlc(): - async for msg in stream_quotes(): - print(msg) + topic, quote = normalize(ohlc) - tractor.run(stream_ohlc) + # if we are the lone tick writer start writing + # the buffer with appropriate trade data + if not writer_exists: + # update last entry + # benchmarked in the 4-5 us range + high, low = shm.array[-1][['high', 'low']] + shm.array[['high', 'low', 'close', 'vwap']][-1] = ( + max(high, last), + min(low, last), + last, + ohlc.vwap, + ) + + # XXX: format required by ``tractor.msg.pub`` + # requires a ``Dict[topic: str, quote: dict]`` + yield {topic: quote} + + ohlc_last = ohlc + + except (ConnectionClosed, DisconnectionTimeout): + log.exception("Good job kraken...reconnecting")