kraken: add `<pair>.spot.kraken` fqme interpolation
As just added for binance move to using an explicit `.<venue>.kraken` style for spot markets which makes the current spot symbology expand to `<PAIR>.SPOT` from the new `Pair.bs_fqme: str`. Reasons for why are laid out in the equivalent patch for binance. Obviously this also primes for supporting kraken's futures venue APIs as well 🏄 https://docs.futures.kraken.com/#introduction Detalles: - add `.spot.kraken` parsing to `get_mkt_info()` so that if the venue token is not passed by caller we implicitly expand it in. - change `normalize()` to only return the `quote: dict` not the topic key. - rewrite live feed msg loop to use `match:` syntax B)basic_buy_bot
parent
8e03212e40
commit
9ff03ba00c
|
@ -153,6 +153,10 @@ class Pair(Struct):
|
|||
def size_tick(self) -> Decimal:
|
||||
return digits_to_dec(self.lot_decimals)
|
||||
|
||||
@property
|
||||
def bs_fqme(self) -> str:
|
||||
return f'{self.symbol}.SPOT'
|
||||
|
||||
|
||||
class Client:
|
||||
|
||||
|
@ -639,7 +643,7 @@ class Client:
|
|||
|
||||
'''
|
||||
try:
|
||||
return cls._ntable[ticker].lower()
|
||||
return cls._ntable[ticker]
|
||||
except KeyError as ke:
|
||||
raise SymbolNotFound(f'kraken has no {ke.args[0]}')
|
||||
|
||||
|
|
|
@ -1194,8 +1194,8 @@ async def norm_trade_records(
|
|||
}[record['type']]
|
||||
|
||||
# we normalize to kraken's `altname` always..
|
||||
bs_mktid = Client.normalize_symbol(record['pair'])
|
||||
fqme = f'{bs_mktid}.kraken'
|
||||
bs_mktid: str = Client.normalize_symbol(record['pair'])
|
||||
fqme = f'{bs_mktid.lower()}.kraken'
|
||||
mkt: MktPair = (await get_mkt_info(fqme))[0]
|
||||
|
||||
records[tid] = Transaction(
|
||||
|
|
|
@ -41,9 +41,11 @@ import trio
|
|||
from piker.accounting._mktinfo import (
|
||||
Asset,
|
||||
MktPair,
|
||||
unpack_fqme,
|
||||
)
|
||||
from piker.brokers import (
|
||||
open_cached_client,
|
||||
SymbolNotFound,
|
||||
)
|
||||
from piker._cacheables import (
|
||||
async_lifo_cache,
|
||||
|
@ -195,24 +197,18 @@ async def process_data_feed_msgs(
|
|||
# yield msg
|
||||
|
||||
|
||||
def normalize(
|
||||
ohlc: OHLC,
|
||||
def normalize(ohlc: OHLC) -> dict:
|
||||
'''
|
||||
Norm an `OHLC` msg to piker's minimal (live-)quote schema.
|
||||
|
||||
) -> dict:
|
||||
'''
|
||||
quote = ohlc.to_dict()
|
||||
quote['broker_ts'] = quote['time']
|
||||
quote['brokerd_ts'] = time.time()
|
||||
quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '')
|
||||
quote['last'] = quote['close']
|
||||
quote['bar_wap'] = ohlc.vwap
|
||||
|
||||
# seriously eh? what's with this non-symmetry everywhere
|
||||
# in subscription systems...
|
||||
# XXX: piker style is always lowercases symbols.
|
||||
topic = quote['pair'].replace('/', '').lower()
|
||||
|
||||
# print(quote)
|
||||
return topic, quote
|
||||
return quote
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -221,7 +217,7 @@ async def open_history_client(
|
|||
|
||||
) -> AsyncGenerator[Callable, None]:
|
||||
|
||||
symbol: str = mkt.bs_fqme
|
||||
symbol: str = mkt.bs_mktid
|
||||
|
||||
# TODO implement history getter for the new storage layer.
|
||||
async with open_cached_client('kraken') as client:
|
||||
|
@ -284,6 +280,18 @@ async def get_mkt_info(
|
|||
key-strs to `MktPair`s.
|
||||
|
||||
'''
|
||||
venue: str = 'spot'
|
||||
expiry: str = ''
|
||||
if '.kraken' in fqme:
|
||||
broker, pair, venue, expiry = unpack_fqme(fqme)
|
||||
venue: str = venue or 'spot'
|
||||
|
||||
if venue != 'spot':
|
||||
raise SymbolNotFound(
|
||||
'kraken only supports spot markets right now!\n'
|
||||
f'{fqme}\n'
|
||||
)
|
||||
|
||||
async with open_cached_client('kraken') as client:
|
||||
|
||||
# uppercase since kraken bs_mktid is always upper
|
||||
|
@ -304,6 +312,12 @@ async def get_mkt_info(
|
|||
size_tick=pair.size_tick,
|
||||
bs_mktid=bs_mktid,
|
||||
|
||||
expiry=expiry,
|
||||
venue=venue or 'spot',
|
||||
|
||||
# TODO: futes
|
||||
# _atype=_atype,
|
||||
|
||||
broker='kraken',
|
||||
)
|
||||
return mkt, pair
|
||||
|
@ -410,7 +424,7 @@ async def stream_quotes(
|
|||
):
|
||||
# pull a first quote and deliver
|
||||
typ, ohlc_last = await anext(msg_gen)
|
||||
topic, quote = normalize(ohlc_last)
|
||||
quote = normalize(ohlc_last)
|
||||
|
||||
task_status.started((init_msgs, quote))
|
||||
feed_is_live.set()
|
||||
|
@ -419,41 +433,46 @@ async def stream_quotes(
|
|||
last_interval_start = ohlc_last.etime
|
||||
|
||||
# start streaming
|
||||
async for typ, ohlc in msg_gen:
|
||||
|
||||
if typ == 'ohlc':
|
||||
topic: str = mkt.bs_fqme
|
||||
async for typ, quote in msg_gen:
|
||||
match typ:
|
||||
|
||||
# TODO: can get rid of all this by using
|
||||
# ``trades`` subscription...
|
||||
|
||||
# ``trades`` subscription..? Not sure why this
|
||||
# wasn't used originally? (music queues) zoltannn..
|
||||
# https://docs.kraken.com/websockets/#message-trade
|
||||
case 'ohlc':
|
||||
# generate tick values to match time & sales pane:
|
||||
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
|
||||
volume = ohlc.volume
|
||||
volume = quote.volume
|
||||
|
||||
# new OHLC sample interval
|
||||
if ohlc.etime > last_interval_start:
|
||||
last_interval_start = ohlc.etime
|
||||
if quote.etime > last_interval_start:
|
||||
last_interval_start = quote.etime
|
||||
tick_volume = volume
|
||||
|
||||
else:
|
||||
# this is the tick volume *within the interval*
|
||||
tick_volume = volume - ohlc_last.volume
|
||||
|
||||
ohlc_last = ohlc
|
||||
last = ohlc.close
|
||||
ohlc_last = quote
|
||||
last = quote.close
|
||||
|
||||
if tick_volume:
|
||||
ohlc.ticks.append({
|
||||
quote.ticks.append({
|
||||
'type': 'trade',
|
||||
'price': last,
|
||||
'size': tick_volume,
|
||||
})
|
||||
|
||||
topic, quote = normalize(ohlc)
|
||||
quote = normalize(quote)
|
||||
|
||||
elif typ == 'l1':
|
||||
quote = ohlc
|
||||
topic = quote['symbol'].lower()
|
||||
case 'l1':
|
||||
# passthrough quote msg
|
||||
pass
|
||||
|
||||
case _:
|
||||
log.warning(f'Unknown WSS message: {typ}, {quote}')
|
||||
|
||||
await send_chan.send({topic: quote})
|
||||
|
||||
|
|
Loading…
Reference in New Issue