Make ws loop restart on connection failures

its_happening
Tyler Goodlet 2020-08-01 22:12:26 -04:00
parent 06f03c690c
commit fad58d18c9
1 changed files with 84 additions and 81 deletions

View File

@ -7,7 +7,7 @@ from itertools import starmap
from typing import List, Dict, Any from typing import List, Dict, Any
import json import json
from trio_websocket import open_websocket_url import trio_websocket
import arrow import arrow
import asks import asks
import numpy as np import numpy as np
@ -114,6 +114,31 @@ async def get_client() -> Client:
yield Client() yield Client()
@dataclass
class OHLC:
chan_id: int # internal kraken id
chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair
time: float # Begin time of interval, in seconds since epoch
etime: float # End time of interval, in seconds since epoch
open: float # Open price of interval
high: float # High price within interval
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval
# (sampled) generated tick data
ticks: List[Any] = field(default_factory=list)
# XXX: ugh, super hideous.. needs built-in converters.
def __post_init__(self):
for f, val in self.__dataclass_fields__.items():
if f == 'ticks':
continue
setattr(self, f, val.type(getattr(self, f)))
async def stream_quotes( async def stream_quotes(
# These are the symbols not expected by the ws api # These are the symbols not expected by the ws api
# they are looked up inside this routine. # they are looked up inside this routine.
@ -133,93 +158,71 @@ async def stream_quotes(
for sym in symbols: for sym in symbols:
ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] ws_pairs[sym] = (await client.symbol_info(sym))['wsname']
async with open_websocket_url( while True:
'wss://ws.kraken.com', try:
) as ws: async with trio_websocket.open_websocket_url(
# setup subs 'wss://ws.kraken.com',
# see: https://docs.kraken.com/websockets/#message-subscribe ) as ws:
subs = { # setup subs
'pair': list(ws_pairs.values()), # see: https://docs.kraken.com/websockets/#message-subscribe
'event': 'subscribe', subs = {
'subscription': { 'pair': list(ws_pairs.values()),
'name': sub_type, 'event': 'subscribe',
'interval': 1, # 1 min 'subscription': {
# 'name': 'ticker', 'name': sub_type,
# 'name': 'openOrders', 'interval': 1, # 1 min
# 'depth': '25', # 'name': 'ticker',
}, # 'name': 'openOrders',
} # 'depth': '25',
await ws.send_message(json.dumps(subs)) },
}
await ws.send_message(json.dumps(subs))
async def recv(): async def recv():
return json.loads(await ws.get_message()) return json.loads(await ws.get_message())
async def recv_ohlc(): async def recv_ohlc():
while True: while True:
msg = await recv() msg = await recv()
if isinstance(msg, dict): if isinstance(msg, dict):
if msg.get('event') == 'heartbeat': if msg.get('event') == 'heartbeat':
continue continue
err = msg.get('errorMessage') err = msg.get('errorMessage')
if err: if err:
raise BrokerError(err) raise BrokerError(err)
else: else:
chan_id, ohlc_array, chan_name, pair = msg chan_id, ohlc_array, chan_name, pair = msg
yield OHLC(chan_id, chan_name, pair, *ohlc_array) yield OHLC(chan_id, chan_name, pair, *ohlc_array)
@dataclass ohlc_gen = recv_ohlc()
class OHLC: ohlc_last = await ohlc_gen.__anext__()
chan_id: int # internal kraken id yield asdict(ohlc_last)
chan_name: str # eg. ohlc-1 (name-interval)
pair: str # fx pair
time: float # Begin time of interval, in seconds since epoch
etime: float # End time of interval, in seconds since epoch
open: float # Open price of interval
high: float # High price within interval
low: float # Low price within interval
close: float # Close price of interval
vwap: float # Volume weighted average price within interval
volume: float # Accumulated volume **within interval**
count: int # Number of trades within interval
# (sampled) generated tick data
ticks: List[Any] = field(default_factory=list)
# XXX: ugh, super hideous.. needs built-in converters. # keep start of last interval for volume tracking
def __post_init__(self): last_interval_start = ohlc_last.etime
for f, val in self.__dataclass_fields__.items():
if f == 'ticks':
continue
setattr(self, f, val.type(getattr(self, f)))
ohlc_gen = recv_ohlc() async for ohlc in ohlc_gen:
ohlc_last = await ohlc_gen.__anext__()
yield asdict(ohlc_last)
# keep start of last interval for volume tracking # generate tick values to match time & sales pane:
last_interval_start = ohlc_last.etime # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume
if ohlc.etime > last_interval_start: # new interval
last_interval_start = ohlc.etime
tick_volume = volume
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
async for ohlc in ohlc_gen: if tick_volume:
ohlc.ticks.append({
# generate tick values to match time & sales pane: 'type': 'trade',
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m 'price': ohlc.close,
volume = ohlc.volume 'size': tick_volume,
if ohlc.etime > last_interval_start: # new interval })
log.debug( yield asdict(ohlc)
f"New interval last: {ohlc_last.time}, now: {ohlc.time}") ohlc_last = ohlc
last_interval_start = ohlc.etime except trio_websocket._impl.ConnectionClosed:
tick_volume = volume log.error("Good job kraken...")
else:
# this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume
if tick_volume:
ohlc.ticks.append({
'type': 'trade',
'price': ohlc.close,
'size': tick_volume,
})
yield asdict(ohlc)
ohlc_last = ohlc
if __name__ == '__main__': if __name__ == '__main__':