diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 21627c69..90bd6476 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -176,8 +176,9 @@ class OHLC: setattr(self, f, val.type(getattr(self, f))) -async def recv_ohlc(recv): +async def recv_msg(recv): too_slow_count = last_hb = 0 + while True: with trio.move_on_after(1.5) as cs: msg = await recv() @@ -194,20 +195,50 @@ async def recv_ohlc(recv): if isinstance(msg, dict): if msg.get('event') == 'heartbeat': + now = time.time() delay = now - last_hb last_hb = now log.trace(f"Heartbeat after {delay}") + # TODO: hmm i guess we should use this # for determining when to do connection # resets eh? continue + err = msg.get('errorMessage') if err: raise BrokerError(err) else: - chan_id, ohlc_array, chan_name, pair = msg - yield OHLC(chan_id, chan_name, pair, *ohlc_array) + chan_id, *payload_array, chan_name, pair = msg + + if 'ohlc' in chan_name: + + yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) + + elif 'spread' in chan_name: + + bid, ask, ts, bsize, asize = map(float, payload_array[0]) + + # TODO: really makes you think IB has a horrible API... + quote = { + 'symbol': pair.replace('/', ''), + 'ticks': [ + {'type': 'bid', 'price': bid, 'size': bsize}, + {'type': 'bsize', 'price': bid, 'size': bsize}, + + {'type': 'ask', 'price': ask, 'size': asize}, + {'type': 'asize', 'price': ask, 'size': asize}, + ], + } + yield 'l1', quote + + # elif 'book' in msg[-2]: + # chan_id, *payload_array, chan_name, pair = msg + # print(msg) + + else: + print(f'UNHANDLED MSG: {msg}') def normalize( @@ -226,6 +257,21 @@ def normalize( return topic, quote +def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: + """Create a request subscription packet dict. + + https://docs.kraken.com/websockets/#message-subscribe + + """ + # eg. specific logic for this in kraken's sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + return { + 'pair': pairs, + 'event': 'subscribe', + 'subscription': data, + } + + # @tractor.msg.pub async def stream_quotes( # get_topics: Callable, @@ -247,6 +293,7 @@ async def stream_quotes( ws_pairs = {} async with get_client() as client: + # keep client cached for real-time section for sym in symbols: ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] @@ -280,31 +327,36 @@ async def stream_quotes( async with trio_websocket.open_websocket_url( 'wss://ws.kraken.com', ) as ws: - # setup subs + + # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe - subs = { - 'pair': list(ws_pairs.values()), - 'event': 'subscribe', - 'subscription': { - 'name': sub_type, - 'interval': 1, # 1 min - # 'name': 'ticker', - # 'name': 'openOrders', - # 'depth': '25', - }, - } + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + ohlc_sub = make_sub( + list(ws_pairs.values()), + {'name': 'ohlc', 'interval': 1} + ) + # TODO: we want to eventually allow unsubs which should # be completely fine to request from a separate task # since internally the ws methods appear to be FIFO # locked. - await ws.send_message(json.dumps(subs)) + await ws.send_message(json.dumps(ohlc_sub)) + + # trade data (aka L1) + l1_sub = make_sub( + list(ws_pairs.values()), + {'name': 'spread'} # 'depth': 10} + + ) + await ws.send_message(json.dumps(l1_sub)) async def recv(): return json.loads(await ws.get_message()) # pull a first quote and deliver - ohlc_gen = recv_ohlc(recv) - ohlc_last = await ohlc_gen.__anext__() + msg_gen = recv_msg(recv) + typ, ohlc_last = await msg_gen.__anext__() topic, quote = normalize(ohlc_last) @@ -315,65 +367,75 @@ async def stream_quotes( last_interval_start = ohlc_last.etime # start streaming - async for ohlc in ohlc_gen: + async for typ, ohlc in msg_gen: - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume - if ohlc.etime > last_interval_start: # new interval - last_interval_start = ohlc.etime - tick_volume = volume - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume + if typ == 'ohlc': - last = ohlc.close - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': last, - 'size': tick_volume, - }) + # TODO: can get rid of all this by using + # ``trades`` subscription... - topic, quote = normalize(ohlc) + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = ohlc.volume - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_exists: - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - new_v = tick_volume + # new interval + if ohlc.etime > last_interval_start: + last_interval_start = ohlc.etime + tick_volume = volume + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last + last = ohlc.close + if tick_volume: + ohlc.ticks.append({ + 'type': 'trade', + 'price': last, + 'size': tick_volume, + }) - # write shm - shm.array[ - ['open', - 'high', - 'low', - 'close', - 'vwap', - 'volume'] - ][-1] = ( - o, - max(high, last), - min(low, last), - last, - ohlc.vwap, - volume, - ) + topic, quote = normalize(ohlc) + + # if we are the lone tick writer start writing + # the buffer with appropriate trade data + if not writer_exists: + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + new_v = tick_volume + + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last + + # write shm + shm.array[ + ['open', + 'high', + 'low', + 'close', + 'vwap', + 'volume'] + ][-1] = ( + o, + max(high, last), + min(low, last), + last, + ohlc.vwap, + volume, + ) + ohlc_last = ohlc + + elif typ == 'l1': + quote = ohlc + topic = quote['symbol'] # XXX: format required by ``tractor.msg.pub`` # requires a ``Dict[topic: str, quote: dict]`` yield {topic: quote} - ohlc_last = ohlc - except (ConnectionClosed, DisconnectionTimeout): log.exception("Good job kraken...reconnecting")