Add an auto-reconnect websocket API

basic_orders
Tyler Goodlet 2021-02-19 18:42:50 -05:00
parent bbd54e8f95
commit add63734f1
1 changed files with 208 additions and 115 deletions

View File

@ -17,14 +17,22 @@
""" """
Kraken backend. Kraken backend.
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import asdict, field from dataclasses import asdict, field
from types import ModuleType
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple, Optional
import json import json
import time import time
import trio_websocket import trio_websocket
from trio_websocket._impl import ConnectionClosed, DisconnectionTimeout from trio_websocket._impl import (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
import arrow import arrow
import asks import asks
import numpy as np import numpy as np
@ -229,22 +237,27 @@ async def get_client() -> Client:
yield Client() yield Client()
async def recv_msg(recv): async def stream_messages(ws):
too_slow_count = last_hb = 0 too_slow_count = last_hb = 0
while True: while True:
with trio.move_on_after(1.5) as cs:
msg = await recv()
# trigger reconnection logic if too slow with trio.move_on_after(5) as cs:
msg = await ws.recv_msg()
# trigger reconnection if heartbeat is laggy
if cs.cancelled_caught: if cs.cancelled_caught:
too_slow_count += 1 too_slow_count += 1
if too_slow_count > 2:
if too_slow_count > 10:
log.warning( log.warning(
"Heartbeat is to slow, " "Heartbeat is too slow, resetting ws connection")
"resetting ws connection")
raise trio_websocket._impl.ConnectionClosed( await ws._connect()
"Reset Connection") too_slow_count = 0
continue
if isinstance(msg, dict): if isinstance(msg, dict):
if msg.get('event') == 'heartbeat': if msg.get('event') == 'heartbeat':
@ -252,11 +265,11 @@ async def recv_msg(recv):
now = time.time() now = time.time()
delay = now - last_hb delay = now - last_hb
last_hb = now last_hb = now
log.trace(f"Heartbeat after {delay}")
# TODO: hmm i guess we should use this # XXX: why tf is this not printing without --tl flag?
# for determining when to do connection log.debug(f"Heartbeat after {delay}")
# resets eh? # print(f"Heartbeat after {delay}")
continue continue
err = msg.get('errorMessage') err = msg.get('errorMessage')
@ -326,6 +339,95 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]:
} }
class AutoReconWs:
"""Make ``trio_websocketw` sockets stay up no matter the bs.
"""
recon_errors = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
HandshakeError,
ConnectionTimeout,
)
def __init__(
self,
url: str,
stack: AsyncExitStack,
serializer: ModuleType = json,
):
self.url = url
self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa
async def _connect(
self,
tries: int = 10000,
) -> None:
try:
await self._stack.aclose()
except (DisconnectionTimeout, RuntimeError):
await trio.sleep(1)
last_err = None
for i in range(tries):
try:
self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url)
)
log.info(f'Connection success: {self.url}')
return
except self.recon_errors as err:
last_err = err
log.error(
f'{self} connection bail with '
f'{type(err)}...retry attempt {i}'
)
await trio.sleep(1)
continue
else:
log.exception('ws connection fail...')
raise last_err
async def send_msg(
self,
data: Any,
) -> None:
while True:
try:
return await self._ws.send_message(json.dumps(data))
except self.recon_errors:
await self._connect()
async def recv_msg(
self,
) -> Any:
while True:
try:
return json.loads(await self._ws.get_message())
except self.recon_errors:
await self._connect()
@asynccontextmanager
async def open_autorecon_ws(url):
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""
async with AsyncExitStack() as stack:
ws = AutoReconWs(url, stack)
# async with trio_websocket.open_websocket_url(url) as ws:
# await tractor.breakpoint()
await ws._connect()
try:
yield ws
finally:
await stack.aclose()
# @tractor.msg.pub # @tractor.msg.pub
async def stream_quotes( async def stream_quotes(
# get_topics: Callable, # get_topics: Callable,
@ -353,8 +455,8 @@ async def stream_quotes(
for sym in symbols: for sym in symbols:
si = Pair(**await client.symbol_info(sym)) # validation si = Pair(**await client.symbol_info(sym)) # validation
syminfo = si.dict() syminfo = si.dict()
syminfo['price_tick_size'] = 1/10**si.pair_decimals syminfo['price_tick_size'] = 1 / 10**si.pair_decimals
syminfo['lot_tick_size'] = 1/10**si.lot_decimals syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals
sym_infos[sym] = syminfo sym_infos[sym] = syminfo
ws_pairs[sym] = si.wsname ws_pairs[sym] = si.wsname
@ -393,123 +495,114 @@ async def stream_quotes(
} }
yield init_msgs yield init_msgs
while True: async with open_autorecon_ws('wss://ws.kraken.com/') as ws:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com/',
) as ws:
# XXX: setup subs # XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe # https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client: # specific logic for this in kraken's shitty sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub( ohlc_sub = make_sub(
list(ws_pairs.values()), list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1} {'name': 'ohlc', 'interval': 1}
) )
# TODO: we want to eventually allow unsubs which should # TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task # be completely fine to request from a separate task
# since internally the ws methods appear to be FIFO # since internally the ws methods appear to be FIFO
# locked. # locked.
await ws.send_message(json.dumps(ohlc_sub)) await ws.send_msg(ohlc_sub)
# trade data (aka L1) # trade data (aka L1)
l1_sub = make_sub( l1_sub = make_sub(
list(ws_pairs.values()), list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10} {'name': 'spread'} # 'depth': 10}
)
) await ws.send_msg(l1_sub)
await ws.send_message(json.dumps(l1_sub))
async def recv(): # pull a first quote and deliver
return json.loads(await ws.get_message()) msg_gen = stream_messages(ws)
# pull a first quote and deliver typ, ohlc_last = await msg_gen.__anext__()
msg_gen = recv_msg(recv)
typ, ohlc_last = await msg_gen.__anext__()
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)
# packetize as {topic: quote} # packetize as {topic: quote}
yield {topic: quote} yield {topic: quote}
# tell incrementer task it can start # tell incrementer task it can start
_buffer.shm_incrementing(shm_token['shm_name']).set() _buffer.shm_incrementing(shm_token['shm_name']).set()
# keep start of last interval for volume tracking # keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime last_interval_start = ohlc_last.etime
# start streaming # start streaming
async for typ, ohlc in msg_gen: async for typ, ohlc in msg_gen:
if typ == 'ohlc': if typ == 'ohlc':
# TODO: can get rid of all this by using # TODO: can get rid of all this by using
# ``trades`` subscription... # ``trades`` subscription...
# generate tick values to match time & sales pane: # generate tick values to match time & sales pane:
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume volume = ohlc.volume
# new interval # new interval
if ohlc.etime > last_interval_start: if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime last_interval_start = ohlc.etime
tick_volume = volume tick_volume = volume
else: else:
# this is the tick volume *within the interval* # this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume tick_volume = volume - ohlc_last.volume
last = ohlc.close last = ohlc.close
if tick_volume: if tick_volume:
ohlc.ticks.append({ ohlc.ticks.append({
'type': 'trade', 'type': 'trade',
'price': last, 'price': last,
'size': tick_volume, 'size': tick_volume,
}) })
topic, quote = normalize(ohlc) topic, quote = normalize(ohlc)
# if we are the lone tick writer start writing # if we are the lone tick writer start writing
# the buffer with appropriate trade data # the buffer with appropriate trade data
if not writer_exists: if not writer_exists:
# update last entry # update last entry
# benchmarked in the 4-5 us range # benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][ o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume'] ['open', 'high', 'low', 'volume']
] ]
new_v = tick_volume new_v = tick_volume
if v == 0 and new_v: if v == 0 and new_v:
# no trades for this bar yet so the open # no trades for this bar yet so the open
# is also the close/last trade price # is also the close/last trade price
o = last o = last
# write shm # write shm
shm.array[ shm.array[
['open', ['open',
'high', 'high',
'low', 'low',
'close', 'close',
'bar_wap', # in this case vwap of bar 'bar_wap', # in this case vwap of bar
'volume'] 'volume']
][-1] = ( ][-1] = (
o, o,
max(high, last), max(high, last),
min(low, last), min(low, last),
last, last,
ohlc.vwap, ohlc.vwap,
volume, volume,
) )
ohlc_last = ohlc ohlc_last = ohlc
elif typ == 'l1': elif typ == 'l1':
quote = ohlc quote = ohlc
topic = quote['symbol'] topic = quote['symbol']
# XXX: format required by ``tractor.msg.pub`` # XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]`` # requires a ``Dict[topic: str, quote: dict]``
yield {topic: quote} yield {topic: quote}
except (ConnectionClosed, DisconnectionTimeout):
log.exception("Good job kraken...reconnecting")