diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 605c5177..ea3850fd 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -4,7 +4,7 @@ Kraken backend. from contextlib import asynccontextmanager from dataclasses import dataclass, asdict, field from itertools import starmap -from typing import List, Dict, Any +from typing import List, Dict, Any, Callable import json import time @@ -144,7 +144,9 @@ class OHLC: setattr(self, f, val.type(getattr(self, f))) +@tractor.msg.pub async def stream_quotes( + get_topics: Callable, # These are the symbols not expected by the ws api # they are looked up inside this routine. symbols: List[str] = ['XBTUSD', 'XMRUSD'], @@ -181,6 +183,10 @@ async def stream_quotes( # 'depth': '25', }, } + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. await ws.send_message(json.dumps(subs)) async def recv(): @@ -222,7 +228,14 @@ async def stream_quotes( # pull a first quote and deliver ohlc_gen = recv_ohlc() ohlc_last = await ohlc_gen.__anext__() - yield asdict(ohlc_last) + + # seriously eh? what's with this non-symmetry everywhere + # in subscription systems... + quote = asdict(ohlc_last) + topic = quote['pair'].replace('/', '') + + # packetize as {topic: quote} + yield {topic: quote} # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime @@ -246,8 +259,16 @@ async def stream_quotes( 'price': ohlc.close, 'size': tick_volume, }) - yield asdict(ohlc) + + # XXX: format required by ``tractor.msg.pub`` + # requires a ``Dict[topic: str, quote: dict]`` + quote = asdict(ohlc) + print(quote) + topic = quote['pair'].replace('/', '') + yield {topic: quote} + ohlc_last = ohlc + except (ConnectionClosed, DisconnectionTimeout): log.exception("Good job kraken...reconnecting")