Port kraken backend to new data feed api

cached_feeds
Tyler Goodlet 2021-03-31 14:22:09 -04:00
parent 29b73b41fb
commit 5a970dad72
1 changed files with 50 additions and 83 deletions

View File

@ -16,15 +16,17 @@
""" """
Kraken backend. Kraken backend.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import asdict, field from dataclasses import asdict, field
from types import ModuleType from types import ModuleType
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple
import json import json
import time import time
import trio_websocket import trio_websocket
from trio_typing import TaskStatus
from trio_websocket._impl import ( from trio_websocket._impl import (
ConnectionClosed, ConnectionClosed,
DisconnectionTimeout, DisconnectionTimeout,
@ -41,15 +43,11 @@ import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
from .api import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ( from ..data import ShmArray
_buffer,
# iterticks,
attach_shm_array,
get_shm_token,
subscribe_ohlc_for_increment,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -315,6 +313,7 @@ def normalize(
quote['brokerd_ts'] = time.time() quote['brokerd_ts'] = time.time()
quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '')
quote['last'] = quote['close'] quote['last'] = quote['close']
quote['bar_wap'] = ohlc.vwap
# seriously eh? what's with this non-symmetry everywhere # seriously eh? what's with this non-symmetry everywhere
# in subscription systems... # in subscription systems...
@ -426,17 +425,37 @@ async def open_autorecon_ws(url):
await stack.aclose() await stack.aclose()
# @tractor.msg.pub async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes( async def stream_quotes(
# get_topics: Callable,
shm_token: Tuple[str, str, List[tuple]], send_chan: trio.abc.SendChannel,
symbols: List[str] = ['XBTUSD', 'XMRUSD'], symbols: List[str],
# These are the symbols not expected by the ws api shm: ShmArray,
# they are looked up inside this routine. feed_is_live: trio.Event,
sub_type: str = 'ohlc',
loglevel: str = None, loglevel: str = None,
# compat with eventual ``tractor.msg.pub``
topics: Optional[List[str]] = None, # backend specific
sub_type: str = 'ohlc',
# startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``. """Subscribe for ohlc stream of quotes for ``pairs``.
@ -447,7 +466,8 @@ async def stream_quotes(
ws_pairs = {} ws_pairs = {}
sym_infos = {} sym_infos = {}
async with get_client() as client:
async with open_cached_client('kraken') as client:
# keep client cached for real-time section # keep client cached for real-time section
for sym in symbols: for sym in symbols:
@ -458,40 +478,16 @@ async def stream_quotes(
sym_infos[sym] = syminfo sym_infos[sym] = syminfo
ws_pairs[sym] = si.wsname ws_pairs[sym] = si.wsname
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
writer_exists = get_shm_token(shm_token['shm_name'])
symbol = symbols[0] symbol = symbols[0]
if not writer_exists:
shm = attach_shm_array(
token=shm_token,
# we are writer
readonly=False,
)
bars = await client.bars(symbol=symbol)
shm.push(bars)
shm_token = shm.token
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
# yield shm_token, not writer_exists
init_msgs = { init_msgs = {
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
symbol: { symbol: {
'is_shm_writer': not writer_exists,
'shm_token': shm_token,
'symbol_info': sym_infos[sym], 'symbol_info': sym_infos[sym],
} 'shm_write_opts': {'sum_tick_vml': False},
# for sym in symbols },
} }
yield init_msgs
async with open_autorecon_ws('wss://ws.kraken.com/') as ws: async with open_autorecon_ws('wss://ws.kraken.com/') as ws:
@ -521,15 +517,16 @@ async def stream_quotes(
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = stream_messages(ws) msg_gen = stream_messages(ws)
# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__() typ, ohlc_last = await msg_gen.__anext__()
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)
# packetize as {topic: quote} first_quote = {topic: quote}
yield {topic: quote} task_status.started((init_msgs, first_quote))
# tell incrementer task it can start # lol, only "closes" when they're margin squeezing clients ;P
_buffer.shm_incrementing(shm_token['shm_name']).set() feed_is_live.set()
# keep start of last interval for volume tracking # keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime last_interval_start = ohlc_last.etime
@ -546,15 +543,18 @@ async def stream_quotes(
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume volume = ohlc.volume
# new interval # new OHLC sample interval
if ohlc.etime > last_interval_start: if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime last_interval_start = ohlc.etime
tick_volume = volume tick_volume = volume
else: else:
# this is the tick volume *within the interval* # this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume tick_volume = volume - ohlc_last.volume
ohlc_last = ohlc
last = ohlc.close last = ohlc.close
if tick_volume: if tick_volume:
ohlc.ticks.append({ ohlc.ticks.append({
'type': 'trade', 'type': 'trade',
@ -564,43 +564,10 @@ async def stream_quotes(
topic, quote = normalize(ohlc) topic, quote = normalize(ohlc)
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_exists:
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick_volume
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
# write shm
shm.array[
['open',
'high',
'low',
'close',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
max(high, last),
min(low, last),
last,
ohlc.vwap,
volume,
)
ohlc_last = ohlc
elif typ == 'l1': elif typ == 'l1':
quote = ohlc quote = ohlc
topic = quote['symbol'] topic = quote['symbol']
# XXX: format required by ``tractor.msg.pub`` # XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]`` # requires a ``Dict[topic: str, quote: dict]``
yield {topic: quote} await send_chan.send({topic: quote})