diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 5fd9216b..dd048d45 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -6,11 +6,13 @@ from dataclasses import dataclass, asdict, field from itertools import starmap from typing import List, Dict, Any import json +import time import trio_websocket import arrow import asks import numpy as np +import trio import tractor from ._util import resproc, SymbolNotFound, BrokerError @@ -116,6 +118,8 @@ async def get_client() -> Client: @dataclass class OHLC: + """Description of the flattened OHLC quote format. + """ chan_id: int # internal kraken id chan_name: str # eg. ohlc-1 (name-interval) pair: str # fx pair @@ -181,17 +185,28 @@ async def stream_quotes( async def recv(): return json.loads(await ws.get_message()) - import time - async def recv_ohlc(): - last_hb = 0 + too_slow_count = last_hb = 0 while True: - msg = await recv() + with trio.move_on_after(1.5) as cs: + msg = await recv() + + # trigger reconnection logic if too slow + if cs.cancelled_caught: + too_slow_count += 1 + if too_slow_count > 2: + log.warning( + "Heartbeat is to slow, " + "resetting ws connection") + raise trio_websocket._impl.ConnectionClosed( + "Reset Connection") + if isinstance(msg, dict): if msg.get('event') == 'heartbeat': - log.trace( - f"Heartbeat after {time.time() - last_hb}") - last_hb = time.time() + now = time.time() + delay = now - last_hb + last_hb = now + log.trace(f"Heartbeat after {delay}") # TODO: hmm i guess we should use this # for determining when to do connection # resets eh? @@ -203,6 +218,7 @@ async def stream_quotes( chan_id, ohlc_array, chan_name, pair = msg yield OHLC(chan_id, chan_name, pair, *ohlc_array) + # pull a first quote and deliver ohlc_gen = recv_ohlc() ohlc_last = await ohlc_gen.__anext__() yield asdict(ohlc_last) @@ -210,6 +226,7 @@ async def stream_quotes( # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime + # start streaming async for ohlc in ohlc_gen: # generate tick values to match time & sales pane: @@ -231,7 +248,7 @@ async def stream_quotes( yield asdict(ohlc) ohlc_last = ohlc except trio_websocket._impl.ConnectionClosed: - log.error("Good job kraken...") + log.exception("Good job kraken...reconnecting") if __name__ == '__main__':