diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index ea3850fd..8029af04 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -120,6 +120,9 @@ async def get_client() -> Client: @dataclass class OHLC: """Description of the flattened OHLC quote format. + + For schema details see: + https://docs.kraken.com/websockets/#message-ohlc """ chan_id: int # internal kraken id chan_name: str # eg. ohlc-1 (name-interval) @@ -144,6 +147,56 @@ class OHLC: setattr(self, f, val.type(getattr(self, f))) +async def recv_ohlc(recv): + too_slow_count = last_hb = 0 + while True: + with trio.move_on_after(1.5) as cs: + msg = await recv() + + # trigger reconnection logic if too slow + if cs.cancelled_caught: + too_slow_count += 1 + if too_slow_count > 2: + log.warning( + "Heartbeat is to slow, " + "resetting ws connection") + raise trio_websocket._impl.ConnectionClosed( + "Reset Connection") + + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + now = time.time() + delay = now - last_hb + last_hb = now + log.trace(f"Heartbeat after {delay}") + # TODO: hmm i guess we should use this + # for determining when to do connection + # resets eh? + continue + err = msg.get('errorMessage') + if err: + raise BrokerError(err) + else: + chan_id, ohlc_array, chan_name, pair = msg + yield OHLC(chan_id, chan_name, pair, *ohlc_array) + + +def normalize( + ohlc: OHLC, +) -> dict: + quote = asdict(ohlc) + quote['broker_ts'] = quote['time'] + quote['brokerd_ts'] = time.time() + quote['pair'] = quote['pair'].replace('/', '') + + # seriously eh? what's with this non-symmetry everywhere + # in subscription systems... + topic = quote['pair'].replace('/', '') + + print(quote) + return topic, quote + + @tractor.msg.pub async def stream_quotes( get_topics: Callable, @@ -192,47 +245,11 @@ async def stream_quotes( async def recv(): return json.loads(await ws.get_message()) - async def recv_ohlc(): - too_slow_count = last_hb = 0 - while True: - with trio.move_on_after(1.5) as cs: - msg = await recv() - - # trigger reconnection logic if too slow - if cs.cancelled_caught: - too_slow_count += 1 - if too_slow_count > 2: - log.warning( - "Heartbeat is to slow, " - "resetting ws connection") - raise trio_websocket._impl.ConnectionClosed( - "Reset Connection") - - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - now = time.time() - delay = now - last_hb - last_hb = now - log.trace(f"Heartbeat after {delay}") - # TODO: hmm i guess we should use this - # for determining when to do connection - # resets eh? - continue - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - chan_id, ohlc_array, chan_name, pair = msg - yield OHLC(chan_id, chan_name, pair, *ohlc_array) - # pull a first quote and deliver - ohlc_gen = recv_ohlc() + ohlc_gen = recv_ohlc(recv) ohlc_last = await ohlc_gen.__anext__() - # seriously eh? what's with this non-symmetry everywhere - # in subscription systems... - quote = asdict(ohlc_last) - topic = quote['pair'].replace('/', '') + topic, quote = normalize(ohlc_last) # packetize as {topic: quote} yield {topic: quote} @@ -260,11 +277,10 @@ async def stream_quotes( 'size': tick_volume, }) + topic, quote = normalize(ohlc) + # XXX: format required by ``tractor.msg.pub`` # requires a ``Dict[topic: str, quote: dict]`` - quote = asdict(ohlc) - print(quote) - topic = quote['pair'].replace('/', '') yield {topic: quote} ohlc_last = ohlc