Make ws loop restart on connection failures

unleash_the_kraken
Tyler Goodlet 2020-08-01 22:12:26 -04:00
parent d976f3d074
commit bf9a0136df
1 changed files with 84 additions and 81 deletions

View File

@ -7,7 +7,7 @@ from itertools import starmap
from typing import List, Dict, Any from typing import List, Dict, Any
import json import json
from trio_websocket import open_websocket_url import trio_websocket
import arrow import arrow
import asks import asks
import numpy as np import numpy as np
@ -114,6 +114,31 @@ async def get_client() -> Client:
yield Client() yield Client()
@dataclass
class OHLC:
chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair
time: float # Begin time of interval, in seconds since epoch
etime: float # End time of interval, in seconds since epoch
open: float # Open price of interval
high: float # High price within interval
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval
# (sampled) generated tick data
ticks: List[Any] = field(default_factory=list)
# XXX: ugh, super hideous.. needs built-in converters.
def __post_init__(self):
for f, val in self.__dataclass_fields__.items():
if f == 'ticks':
continue
setattr(self, f, val.type(getattr(self, f)))
async def stream_quotes( async def stream_quotes(
# These are the symbols not expected by the ws api # These are the symbols not expected by the ws api
# they are looked up inside this routine. # they are looked up inside this routine.
@ -133,7 +158,9 @@ async def stream_quotes(
for sym in symbols: for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
async with open_websocket_url( while True:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com', 'wss://ws.kraken.com',
) as ws: ) as ws:
# setup subs # setup subs
@ -167,30 +194,6 @@ async def stream_quotes(
chan_id, ohlc_array, chan_name, pair = msg chan_id, ohlc_array, chan_name, pair = msg
yield OHLC(chan_id, chan_name, pair, *ohlc_array) yield OHLC(chan_id, chan_name, pair, *ohlc_array)
@dataclass
class OHLC:
chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair
time: float # Begin time of interval, in seconds since epoch
etime: float # End time of interval, in seconds since epoch
open: float # Open price of interval
high: float # High price within interval
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval
# (sampled) generated tick data
ticks: List[Any] = field(default_factory=list)
# XXX: ugh, super hideous.. needs built-in converters.
def __post_init__(self):
for f, val in self.__dataclass_fields__.items():
if f == 'ticks':
continue
setattr(self, f, val.type(getattr(self, f)))
ohlc_gen = recv_ohlc() ohlc_gen = recv_ohlc()
ohlc_last = await ohlc_gen.__anext__() ohlc_last = await ohlc_gen.__anext__()
yield asdict(ohlc_last) yield asdict(ohlc_last)
@ -204,8 +207,6 @@ async def stream_quotes(
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume volume = ohlc.volume
if ohlc.etime > last_interval_start: # new interval if ohlc.etime > last_interval_start: # new interval
log.debug(
f"New interval last: {ohlc_last.time}, now: {ohlc.time}")
last_interval_start = ohlc.etime last_interval_start = ohlc.etime
tick_volume = volume tick_volume = volume
else: else:
@ -220,6 +221,8 @@ async def stream_quotes(
}) })
yield asdict(ohlc) yield asdict(ohlc)
ohlc_last = ohlc ohlc_last = ohlc
except trio_websocket._impl.ConnectionClosed:
log.error("Good job kraken...")
if __name__ == '__main__': if __name__ == '__main__':