diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index e9a3f607..a82714cf 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -153,6 +153,10 @@ class Pair(Struct): def size_tick(self) -> Decimal: return digits_to_dec(self.lot_decimals) + @property + def bs_fqme(self) -> str: + return f'{self.symbol}.SPOT' + class Client: @@ -639,7 +643,7 @@ class Client: ''' try: - return cls._ntable[ticker].lower() + return cls._ntable[ticker] except KeyError as ke: raise SymbolNotFound(f'kraken has no {ke.args[0]}') diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 8fa321b0..814826be 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -1194,8 +1194,8 @@ async def norm_trade_records( }[record['type']] # we normalize to kraken's `altname` always.. - bs_mktid = Client.normalize_symbol(record['pair']) - fqme = f'{bs_mktid}.kraken' + bs_mktid: str = Client.normalize_symbol(record['pair']) + fqme = f'{bs_mktid.lower()}.kraken' mkt: MktPair = (await get_mkt_info(fqme))[0] records[tid] = Transaction( diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index dc70672f..4830914f 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -41,9 +41,11 @@ import trio from piker.accounting._mktinfo import ( Asset, MktPair, + unpack_fqme, ) from piker.brokers import ( open_cached_client, + SymbolNotFound, ) from piker._cacheables import ( async_lifo_cache, @@ -195,24 +197,18 @@ async def process_data_feed_msgs( # yield msg -def normalize( - ohlc: OHLC, +def normalize(ohlc: OHLC) -> dict: + ''' + Norm an `OHLC` msg to piker's minimal (live-)quote schema. -) -> dict: + ''' quote = ohlc.to_dict() quote['broker_ts'] = quote['time'] quote['brokerd_ts'] = time.time() quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') quote['last'] = quote['close'] quote['bar_wap'] = ohlc.vwap - - # seriously eh? what's with this non-symmetry everywhere - # in subscription systems... - # XXX: piker style is always lowercases symbols. - topic = quote['pair'].replace('/', '').lower() - - # print(quote) - return topic, quote + return quote @acm @@ -221,7 +217,7 @@ async def open_history_client( ) -> AsyncGenerator[Callable, None]: - symbol: str = mkt.bs_fqme + symbol: str = mkt.bs_mktid # TODO implement history getter for the new storage layer. async with open_cached_client('kraken') as client: @@ -284,6 +280,18 @@ async def get_mkt_info( key-strs to `MktPair`s. ''' + venue: str = 'spot' + expiry: str = '' + if '.kraken' in fqme: + broker, pair, venue, expiry = unpack_fqme(fqme) + venue: str = venue or 'spot' + + if venue != 'spot': + raise SymbolNotFound( + 'kraken only supports spot markets right now!\n' + f'{fqme}\n' + ) + async with open_cached_client('kraken') as client: # uppercase since kraken bs_mktid is always upper @@ -304,6 +312,12 @@ async def get_mkt_info( size_tick=pair.size_tick, bs_mktid=bs_mktid, + expiry=expiry, + venue=venue or 'spot', + + # TODO: futes + # _atype=_atype, + broker='kraken', ) return mkt, pair @@ -410,7 +424,7 @@ async def stream_quotes( ): # pull a first quote and deliver typ, ohlc_last = await anext(msg_gen) - topic, quote = normalize(ohlc_last) + quote = normalize(ohlc_last) task_status.started((init_msgs, quote)) feed_is_live.set() @@ -419,41 +433,46 @@ async def stream_quotes( last_interval_start = ohlc_last.etime # start streaming - async for typ, ohlc in msg_gen: - - if typ == 'ohlc': + topic: str = mkt.bs_fqme + async for typ, quote in msg_gen: + match typ: # TODO: can get rid of all this by using - # ``trades`` subscription... + # ``trades`` subscription..? Not sure why this + # wasn't used originally? (music queues) zoltannn.. + # https://docs.kraken.com/websockets/#message-trade + case 'ohlc': + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = quote.volume - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume + # new OHLC sample interval + if quote.etime > last_interval_start: + last_interval_start = quote.etime + tick_volume = volume - # new OHLC sample interval - if ohlc.etime > last_interval_start: - last_interval_start = ohlc.etime - tick_volume = volume + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume + ohlc_last = quote + last = quote.close - ohlc_last = ohlc - last = ohlc.close + if tick_volume: + quote.ticks.append({ + 'type': 'trade', + 'price': last, + 'size': tick_volume, + }) - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': last, - 'size': tick_volume, - }) + quote = normalize(quote) - topic, quote = normalize(ohlc) + case 'l1': + # passthrough quote msg + pass - elif typ == 'l1': - quote = ohlc - topic = quote['symbol'].lower() + case _: + log.warning(f'Unknown WSS message: {typ}, {quote}') await send_chan.send({topic: quote})