From add63734f1e30c1fc283ab13d1553144be784a3a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 19 Feb 2021 18:42:50 -0500 Subject: [PATCH] Add an auto-reconnect websocket API --- piker/brokers/kraken.py | 323 ++++++++++++++++++++++++++-------------- 1 file changed, 208 insertions(+), 115 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 3bd6081c..425963e9 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -17,14 +17,22 @@ """ Kraken backend. """ -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, AsyncExitStack from dataclasses import asdict, field +from types import ModuleType from typing import List, Dict, Any, Tuple, Optional import json import time import trio_websocket -from trio_websocket._impl import ConnectionClosed, DisconnectionTimeout +from trio_websocket._impl import ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, +) + import arrow import asks import numpy as np @@ -229,22 +237,27 @@ async def get_client() -> Client: yield Client() -async def recv_msg(recv): +async def stream_messages(ws): + too_slow_count = last_hb = 0 while True: - with trio.move_on_after(1.5) as cs: - msg = await recv() - # trigger reconnection logic if too slow + with trio.move_on_after(5) as cs: + msg = await ws.recv_msg() + + # trigger reconnection if heartbeat is laggy if cs.cancelled_caught: + too_slow_count += 1 - if too_slow_count > 2: + + if too_slow_count > 10: log.warning( - "Heartbeat is to slow, " - "resetting ws connection") - raise trio_websocket._impl.ConnectionClosed( - "Reset Connection") + "Heartbeat is too slow, resetting ws connection") + + await ws._connect() + too_slow_count = 0 + continue if isinstance(msg, dict): if msg.get('event') == 'heartbeat': @@ -252,11 +265,11 @@ async def recv_msg(recv): now = time.time() delay = now - last_hb last_hb = now - log.trace(f"Heartbeat after {delay}") - # TODO: hmm i guess we should use this - # for determining when to do connection - # resets eh? + # XXX: why tf is this not printing without --tl flag? + log.debug(f"Heartbeat after {delay}") + # print(f"Heartbeat after {delay}") + continue err = msg.get('errorMessage') @@ -326,6 +339,95 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: } +class AutoReconWs: + """Make ``trio_websocketw` sockets stay up no matter the bs. + + """ + recon_errors = ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, + ) + + def __init__( + self, + url: str, + stack: AsyncExitStack, + serializer: ModuleType = json, + ): + self.url = url + self._stack = stack + self._ws: 'WebSocketConnection' = None # noqa + + async def _connect( + self, + tries: int = 10000, + ) -> None: + try: + await self._stack.aclose() + except (DisconnectionTimeout, RuntimeError): + await trio.sleep(1) + + last_err = None + for i in range(tries): + try: + self._ws = await self._stack.enter_async_context( + trio_websocket.open_websocket_url(self.url) + ) + log.info(f'Connection success: {self.url}') + return + except self.recon_errors as err: + last_err = err + log.error( + f'{self} connection bail with ' + f'{type(err)}...retry attempt {i}' + ) + await trio.sleep(1) + continue + else: + log.exception('ws connection fail...') + raise last_err + + async def send_msg( + self, + data: Any, + ) -> None: + while True: + try: + return await self._ws.send_message(json.dumps(data)) + except self.recon_errors: + await self._connect() + + async def recv_msg( + self, + ) -> Any: + while True: + try: + return json.loads(await self._ws.get_message()) + except self.recon_errors: + await self._connect() + + +@asynccontextmanager +async def open_autorecon_ws(url): + """Apparently we can QoS for all sorts of reasons..so catch em. + + """ + async with AsyncExitStack() as stack: + ws = AutoReconWs(url, stack) + # async with trio_websocket.open_websocket_url(url) as ws: + # await tractor.breakpoint() + + await ws._connect() + try: + yield ws + + finally: + await stack.aclose() + + # @tractor.msg.pub async def stream_quotes( # get_topics: Callable, @@ -353,8 +455,8 @@ async def stream_quotes( for sym in symbols: si = Pair(**await client.symbol_info(sym)) # validation syminfo = si.dict() - syminfo['price_tick_size'] = 1/10**si.pair_decimals - syminfo['lot_tick_size'] = 1/10**si.lot_decimals + syminfo['price_tick_size'] = 1 / 10**si.pair_decimals + syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals sym_infos[sym] = syminfo ws_pairs[sym] = si.wsname @@ -393,123 +495,114 @@ async def stream_quotes( } yield init_msgs - while True: - try: - async with trio_websocket.open_websocket_url( - 'wss://ws.kraken.com/', - ) as ws: + async with open_autorecon_ws('wss://ws.kraken.com/') as ws: - # XXX: setup subs - # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - ohlc_sub = make_sub( - list(ws_pairs.values()), - {'name': 'ohlc', 'interval': 1} - ) + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + ohlc_sub = make_sub( + list(ws_pairs.values()), + {'name': 'ohlc', 'interval': 1} + ) - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_message(json.dumps(ohlc_sub)) + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_msg(ohlc_sub) - # trade data (aka L1) - l1_sub = make_sub( - list(ws_pairs.values()), - {'name': 'spread'} # 'depth': 10} + # trade data (aka L1) + l1_sub = make_sub( + list(ws_pairs.values()), + {'name': 'spread'} # 'depth': 10} + ) - ) - await ws.send_message(json.dumps(l1_sub)) + await ws.send_msg(l1_sub) - async def recv(): - return json.loads(await ws.get_message()) + # pull a first quote and deliver + msg_gen = stream_messages(ws) - # pull a first quote and deliver - msg_gen = recv_msg(recv) - typ, ohlc_last = await msg_gen.__anext__() + typ, ohlc_last = await msg_gen.__anext__() - topic, quote = normalize(ohlc_last) + topic, quote = normalize(ohlc_last) - # packetize as {topic: quote} - yield {topic: quote} + # packetize as {topic: quote} + yield {topic: quote} - # tell incrementer task it can start - _buffer.shm_incrementing(shm_token['shm_name']).set() + # tell incrementer task it can start + _buffer.shm_incrementing(shm_token['shm_name']).set() - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime - # start streaming - async for typ, ohlc in msg_gen: + # start streaming + async for typ, ohlc in msg_gen: - if typ == 'ohlc': + if typ == 'ohlc': - # TODO: can get rid of all this by using - # ``trades`` subscription... + # TODO: can get rid of all this by using + # ``trades`` subscription... - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = ohlc.volume - # new interval - if ohlc.etime > last_interval_start: - last_interval_start = ohlc.etime - tick_volume = volume - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume + # new interval + if ohlc.etime > last_interval_start: + last_interval_start = ohlc.etime + tick_volume = volume + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume - last = ohlc.close - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': last, - 'size': tick_volume, - }) + last = ohlc.close + if tick_volume: + ohlc.ticks.append({ + 'type': 'trade', + 'price': last, + 'size': tick_volume, + }) - topic, quote = normalize(ohlc) + topic, quote = normalize(ohlc) - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_exists: - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - new_v = tick_volume + # if we are the lone tick writer start writing + # the buffer with appropriate trade data + if not writer_exists: + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + new_v = tick_volume - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last - # write shm - shm.array[ - ['open', - 'high', - 'low', - 'close', - 'bar_wap', # in this case vwap of bar - 'volume'] - ][-1] = ( - o, - max(high, last), - min(low, last), - last, - ohlc.vwap, - volume, - ) - ohlc_last = ohlc + # write shm + shm.array[ + ['open', + 'high', + 'low', + 'close', + 'bar_wap', # in this case vwap of bar + 'volume'] + ][-1] = ( + o, + max(high, last), + min(low, last), + last, + ohlc.vwap, + volume, + ) + ohlc_last = ohlc - elif typ == 'l1': - quote = ohlc - topic = quote['symbol'] + elif typ == 'l1': + quote = ohlc + topic = quote['symbol'] - # XXX: format required by ``tractor.msg.pub`` - # requires a ``Dict[topic: str, quote: dict]`` - yield {topic: quote} - - except (ConnectionClosed, DisconnectionTimeout): - log.exception("Good job kraken...reconnecting") + # XXX: format required by ``tractor.msg.pub`` + # requires a ``Dict[topic: str, quote: dict]`` + yield {topic: quote}