diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 5d8763c3..ac80988b 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -16,15 +16,17 @@ """ Kraken backend. + """ from contextlib import asynccontextmanager, AsyncExitStack from dataclasses import asdict, field from types import ModuleType -from typing import List, Dict, Any, Tuple, Optional +from typing import List, Dict, Any, Tuple import json import time import trio_websocket +from trio_typing import TaskStatus from trio_websocket._impl import ( ConnectionClosed, DisconnectionTimeout, @@ -41,15 +43,11 @@ import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel + +from .api import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log -from ..data import ( - _buffer, - # iterticks, - attach_shm_array, - get_shm_token, - subscribe_ohlc_for_increment, -) +from ..data import ShmArray log = get_logger(__name__) @@ -315,6 +313,7 @@ def normalize( quote['brokerd_ts'] = time.time() quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') quote['last'] = quote['close'] + quote['bar_wap'] = ohlc.vwap # seriously eh? what's with this non-symmetry everywhere # in subscription systems... @@ -426,17 +425,37 @@ async def open_autorecon_ws(url): await stack.aclose() -# @tractor.msg.pub +async def backfill_bars( + sym: str, + shm: ShmArray, # type: ignore # noqa + + count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + """Fill historical bars into shared mem / storage afap. + """ + with trio.CancelScope() as cs: + async with open_cached_client('kraken') as client: + bars = await client.bars(symbol=sym) + shm.push(bars) + task_status.started(cs) + + async def stream_quotes( - # get_topics: Callable, - shm_token: Tuple[str, str, List[tuple]], - symbols: List[str] = ['XBTUSD', 'XMRUSD'], - # These are the symbols not expected by the ws api - # they are looked up inside this routine. - sub_type: str = 'ohlc', + + send_chan: trio.abc.SendChannel, + symbols: List[str], + shm: ShmArray, + feed_is_live: trio.Event, loglevel: str = None, - # compat with eventual ``tractor.msg.pub`` - topics: Optional[List[str]] = None, + + # backend specific + sub_type: str = 'ohlc', + + # startup sync + task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + ) -> None: """Subscribe for ohlc stream of quotes for ``pairs``. @@ -447,7 +466,8 @@ async def stream_quotes( ws_pairs = {} sym_infos = {} - async with get_client() as client: + + async with open_cached_client('kraken') as client: # keep client cached for real-time section for sym in symbols: @@ -458,40 +478,16 @@ async def stream_quotes( sym_infos[sym] = syminfo ws_pairs[sym] = si.wsname - # maybe load historical ohlcv in to shared mem - # check if shm has already been created by previous - # feed initialization - writer_exists = get_shm_token(shm_token['shm_name']) - symbol = symbols[0] - if not writer_exists: - shm = attach_shm_array( - token=shm_token, - # we are writer - readonly=False, - ) - bars = await client.bars(symbol=symbol) - - shm.push(bars) - shm_token = shm.token - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - subscribe_ohlc_for_increment(shm, delay_s) - - # yield shm_token, not writer_exists init_msgs = { # pass back token, and bool, signalling if we're the writer # and that history has been written symbol: { - 'is_shm_writer': not writer_exists, - 'shm_token': shm_token, 'symbol_info': sym_infos[sym], - } - # for sym in symbols + 'shm_write_opts': {'sum_tick_vml': False}, + }, } - yield init_msgs async with open_autorecon_ws('wss://ws.kraken.com/') as ws: @@ -521,15 +517,16 @@ async def stream_quotes( # pull a first quote and deliver msg_gen = stream_messages(ws) + # TODO: use ``anext()`` when it lands in 3.10! typ, ohlc_last = await msg_gen.__anext__() topic, quote = normalize(ohlc_last) - # packetize as {topic: quote} - yield {topic: quote} + first_quote = {topic: quote} + task_status.started((init_msgs, first_quote)) - # tell incrementer task it can start - _buffer.shm_incrementing(shm_token['shm_name']).set() + # lol, only "closes" when they're margin squeezing clients ;P + feed_is_live.set() # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime @@ -546,15 +543,18 @@ async def stream_quotes( # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m volume = ohlc.volume - # new interval + # new OHLC sample interval if ohlc.etime > last_interval_start: last_interval_start = ohlc.etime tick_volume = volume + else: # this is the tick volume *within the interval* tick_volume = volume - ohlc_last.volume + ohlc_last = ohlc last = ohlc.close + if tick_volume: ohlc.ticks.append({ 'type': 'trade', @@ -564,43 +564,10 @@ async def stream_quotes( topic, quote = normalize(ohlc) - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_exists: - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - new_v = tick_volume - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - # write shm - shm.array[ - ['open', - 'high', - 'low', - 'close', - 'bar_wap', # in this case vwap of bar - 'volume'] - ][-1] = ( - o, - max(high, last), - min(low, last), - last, - ohlc.vwap, - volume, - ) - ohlc_last = ohlc - elif typ == 'l1': quote = ohlc topic = quote['symbol'] # XXX: format required by ``tractor.msg.pub`` # requires a ``Dict[topic: str, quote: dict]`` - yield {topic: quote} + await send_chan.send({topic: quote})