Change kraken.stream_quotes to use tractor.stream api

kraken_history
Guillermo Rodriguez 2020-12-30 18:58:14 -03:00
parent 373327e3b7
commit b1fd986a3a
No known key found for this signature in database
GPG Key ID: 3F61096EC7DF75A8
1 changed files with 190 additions and 27 deletions

View File

@ -19,7 +19,7 @@ Kraken backend.
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Tuple, Optional
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import json
import time
@ -291,38 +291,199 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
# @tractor.msg.pub
async def stream_quotes(
# get_topics: Callable,
shm_token: Tuple[str, str, List[tuple]],
symbols: List[str] = ['XBTUSD', 'XMRUSD'],
# These are the symbols not expected by the ws api
# they are looked up inside this routine.
sub_type: str = 'ohlc',
loglevel: str = None,
# compat with eventual ``tractor.msg.pub``
topics: Optional[List[str]] = None,
) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``.
#async def stream_quotes(
# # get_topics: Callable,
# shm_token: Tuple[str, str, List[tuple]],
# symbols: List[str] = ['XBTUSD', 'XMRUSD'],
# # These are the symbols not expected by the ws api
# # they are looked up inside this routine.
# sub_type: str = 'ohlc',
# loglevel: str = None,
# # compat with eventual ``tractor.msg.pub``
# topics: Optional[List[str]] = None,
#) -> None:
# """Subscribe for ohlc stream of quotes for ``pairs``.
#
# ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
# """
# # XXX: required to propagate ``tractor`` loglevel to piker logging
# get_console_log(loglevel or tractor.current_actor().loglevel)
#
# ws_pairs = {}
# async with get_client() as client:
#
# # keep client cached for real-time section
# for sym in symbols:
# ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
#
# # maybe load historical ohlcv in to shared mem
# # check if shm has already been created by previous
# # feed initialization
# writer_exists = get_shm_token(shm_token['shm_name'])
#
# symbol = symbols[0]
#
# if not writer_exists:
# shm = attach_shm_array(
# token=shm_token,
# # we are writer
# readonly=False,
# )
# bars = await client.bars(symbol=symbol)
#
# shm.push(bars)
# shm_token = shm.token
#
# times = shm.array['time']
# delay_s = times[-1] - times[times != times[-1]][-1]
# subscribe_ohlc_for_increment(shm, delay_s)
#
# yield shm_token, not writer_exists
#
# while True:
# try:
# async with trio_websocket.open_websocket_url(
# 'wss://ws.kraken.com/',
# ) as ws:
#
# # XXX: setup subs
# # https://docs.kraken.com/websockets/#message-subscribe
# # specific logic for this in kraken's shitty sync client:
# # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
# ohlc_sub = make_sub(
# list(ws_pairs.values()),
# {'name': 'ohlc', 'interval': 1}
# )
#
# # TODO: we want to eventually allow unsubs which should
# # be completely fine to request from a separate task
# # since internally the ws methods appear to be FIFO
# # locked.
# await ws.send_message(json.dumps(ohlc_sub))
#
# # trade data (aka L1)
# l1_sub = make_sub(
# list(ws_pairs.values()),
# {'name': 'spread'} # 'depth': 10}
#
# )
# await ws.send_message(json.dumps(l1_sub))
#
# async def recv():
# return json.loads(await ws.get_message())
#
# # pull a first quote and deliver
# msg_gen = recv_msg(recv)
# typ, ohlc_last = await msg_gen.__anext__()
#
# topic, quote = normalize(ohlc_last)
#
# # packetize as {topic: quote}
# yield {topic: quote}
#
# # keep start of last interval for volume tracking
# last_interval_start = ohlc_last.etime
#
# # start streaming
# async for typ, ohlc in msg_gen:
#
# if typ == 'ohlc':
#
# # TODO: can get rid of all this by using
# # ``trades`` subscription...
#
# # generate tick values to match time & sales pane:
# # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
# volume = ohlc.volume
#
# # new interval
# if ohlc.etime > last_interval_start:
# last_interval_start = ohlc.etime
# tick_volume = volume
# else:
# # this is the tick volume *within the interval*
# tick_volume = volume - ohlc_last.volume
#
# last = ohlc.close
# if tick_volume:
# ohlc.ticks.append({
# 'type': 'trade',
# 'price': last,
# 'size': tick_volume,
# })
#
# topic, quote = normalize(ohlc)
#
# # if we are the lone tick writer start writing
# # the buffer with appropriate trade data
# if not writer_exists:
# # update last entry
# # benchmarked in the 4-5 us range
# o, high, low, v = shm.array[-1][
# ['open', 'high', 'low', 'volume']
# ]
# new_v = tick_volume
#
# if v == 0 and new_v:
# # no trades for this bar yet so the open
# # is also the close/last trade price
# o = last
#
# # write shm
# shm.array[
# ['open',
# 'high',
# 'low',
# 'close',
# 'bar_wap', # in this case vwap of bar
# 'volume']
# ][-1] = (
# o,
# max(high, last),
# min(low, last),
# last,
# ohlc.vwap,
# volume,
# )
# ohlc_last = ohlc
#
# elif typ == 'l1':
# quote = ohlc
# topic = quote['symbol']
#
# # XXX: format required by ``tractor.msg.pub``
# # requires a ``Dict[topic: str, quote: dict]``
# yield {topic: quote}
#
# except (ConnectionClosed, DisconnectionTimeout):
# log.exception("Good job kraken...reconnecting")
``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>.
"""
@tractor.stream
async def stream_quotes(
ctx: tractor.Context,
symbols: List[str],
shm_token: Tuple[str, str, List[tuple]],
loglevel: str = None,
# compat for @tractor.msg.pub
topics: Any = None
) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
ws_pairs = {}
async with get_client() as client:
# keep client cached for real-time section
for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
writer_exists = get_shm_token(shm_token['shm_name'])
symbol = symbols[0]
await tractor.breakpoint()
if not writer_exists:
shm = attach_shm_array(
token=shm_token,
@ -337,15 +498,16 @@ async def stream_quotes(
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
yield shm_token, not writer_exists
# pass back token, and bool, signalling if we're the writer
await ctx.send_yield((shm_token, not writer_exists))
while True:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com/',
'wss://ws.kraken.com',
) as ws:
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
@ -379,7 +541,7 @@ async def stream_quotes(
topic, quote = normalize(ohlc_last)
# packetize as {topic: quote}
yield {topic: quote}
await ctx.send_yield({topic: quote})
# keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime
@ -453,7 +615,8 @@ async def stream_quotes(
# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
yield {topic: quote}
await ctx.send_yield({topic: quote})
except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting")