Deribit's feed fix
- `FeedInit` for init_msgs in `stream_quotes`. - new cache is `client_pairs` so is replacing the old `client.cache_symbols`. - `get_mkt_info` added - `get_ohlc` fixed to comply the new ways of the feed.deribit_fix
parent
e825b00b85
commit
81cb75f4b3
|
@ -51,6 +51,7 @@ __brokers__: list[str] = [
|
||||||
'ib',
|
'ib',
|
||||||
'kraken',
|
'kraken',
|
||||||
'kucoin',
|
'kucoin',
|
||||||
|
'deribit',
|
||||||
|
|
||||||
# broken but used to work
|
# broken but used to work
|
||||||
# 'questrade',
|
# 'questrade',
|
||||||
|
@ -61,7 +62,6 @@ __brokers__: list[str] = [
|
||||||
# wstrade
|
# wstrade
|
||||||
# iex
|
# iex
|
||||||
|
|
||||||
# deribit
|
|
||||||
# bitso
|
# bitso
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ from .api import (
|
||||||
get_client,
|
get_client,
|
||||||
)
|
)
|
||||||
from .feed import (
|
from .feed import (
|
||||||
|
get_mkt_info,
|
||||||
open_history_client,
|
open_history_client,
|
||||||
open_symbol_search,
|
open_symbol_search,
|
||||||
stream_quotes,
|
stream_quotes,
|
||||||
|
@ -43,6 +44,7 @@ log = get_logger(__name__)
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'get_client',
|
'get_client',
|
||||||
# 'trades_dialogue',
|
# 'trades_dialogue',
|
||||||
|
'get_mkt_info',
|
||||||
'open_history_client',
|
'open_history_client',
|
||||||
'open_symbol_search',
|
'open_symbol_search',
|
||||||
'stream_quotes',
|
'stream_quotes',
|
||||||
|
|
|
@ -34,9 +34,20 @@ from rapidfuzz import process as fuzzy
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
from piker.brokers import open_cached_client
|
from piker.accounting import (
|
||||||
|
MktPair,
|
||||||
|
unpack_fqme,
|
||||||
|
)
|
||||||
|
from piker.brokers import (
|
||||||
|
open_cached_client,
|
||||||
|
NoData,
|
||||||
|
)
|
||||||
|
from piker._cacheables import (
|
||||||
|
async_lifo_cache,
|
||||||
|
)
|
||||||
from piker.log import get_logger, get_console_log
|
from piker.log import get_logger, get_console_log
|
||||||
from piker.data import ShmArray
|
from piker.data import ShmArray
|
||||||
|
from piker.data.validate import FeedInit
|
||||||
from piker.brokers._util import (
|
from piker.brokers._util import (
|
||||||
BrokerError,
|
BrokerError,
|
||||||
DataUnavailable,
|
DataUnavailable,
|
||||||
|
@ -51,7 +62,7 @@ from cryptofeed.symbols import Symbol
|
||||||
from .api import (
|
from .api import (
|
||||||
Client, Trade,
|
Client, Trade,
|
||||||
get_config,
|
get_config,
|
||||||
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
||||||
maybe_open_price_feed
|
maybe_open_price_feed
|
||||||
)
|
)
|
||||||
from .venues import (
|
from .venues import (
|
||||||
|
@ -72,36 +83,107 @@ async def open_history_client(
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
) -> tuple[Callable, int]:
|
) -> tuple[Callable, int]:
|
||||||
|
|
||||||
fnstrument: str = mkt.bs_fqme
|
|
||||||
# TODO implement history getter for the new storage layer.
|
# TODO implement history getter for the new storage layer.
|
||||||
async with open_cached_client('deribit') as client:
|
async with open_cached_client('deribit') as client:
|
||||||
|
|
||||||
async def get_ohlc(
|
async def get_ohlc(
|
||||||
end_dt: Optional[datetime] = None,
|
timeframe: float,
|
||||||
start_dt: Optional[datetime] = None,
|
end_dt: datetime | None = None,
|
||||||
|
start_dt: datetime | None = None,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
np.ndarray,
|
np.ndarray,
|
||||||
datetime, # start
|
datetime, # start
|
||||||
datetime, # end
|
datetime, # end
|
||||||
]:
|
]:
|
||||||
|
if timeframe != 60:
|
||||||
|
raise DataUnavailable('Only 1m bars are supported')
|
||||||
|
|
||||||
array = await client.bars(
|
array: np.ndarray = await client.bars(
|
||||||
instrument,
|
mkt,
|
||||||
start_dt=start_dt,
|
start_dt=start_dt,
|
||||||
end_dt=end_dt,
|
end_dt=end_dt,
|
||||||
)
|
)
|
||||||
if len(array) == 0:
|
if len(array) == 0:
|
||||||
raise DataUnavailable
|
raise NoData(
|
||||||
|
f'No frame for {start_dt} -> {end_dt}\n'
|
||||||
|
)
|
||||||
|
|
||||||
start_dt = pendulum.from_timestamp(array[0]['time'])
|
start_dt = from_timestamp(array[0]['time'])
|
||||||
end_dt = pendulum.from_timestamp(array[-1]['time'])
|
end_dt = from_timestamp(array[-1]['time'])
|
||||||
|
|
||||||
|
times = array['time']
|
||||||
|
if not times.any():
|
||||||
|
raise ValueError(
|
||||||
|
'Bad frame with null-times?\n\n'
|
||||||
|
f'{times}'
|
||||||
|
)
|
||||||
|
|
||||||
|
if end_dt is None:
|
||||||
|
inow: int = round(time.time())
|
||||||
|
if (inow - times[-1]) > 60:
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
return array, start_dt, end_dt
|
return array, start_dt, end_dt
|
||||||
|
|
||||||
yield get_ohlc, {'erlangs': 3, 'rate': 3}
|
yield get_ohlc, {'erlangs': 3, 'rate': 3}
|
||||||
|
|
||||||
|
|
||||||
|
@async_lifo_cache()
|
||||||
|
async def get_mkt_info(
|
||||||
|
fqme: str,
|
||||||
|
|
||||||
|
) -> tuple[MktPair, Pair] | None:
|
||||||
|
|
||||||
|
# uppercase since kraken bs_mktid is always upper
|
||||||
|
if 'deribit' not in fqme.lower():
|
||||||
|
fqme += '.deribit'
|
||||||
|
|
||||||
|
mkt_mode: str = ''
|
||||||
|
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
|
||||||
|
|
||||||
|
# NOTE: we always upper case all tokens to be consistent with
|
||||||
|
# binance's symbology style for pairs, like `BTCUSDT`, but in
|
||||||
|
# theory we could also just keep things lower case; as long as
|
||||||
|
# we're consistent and the symcache matches whatever this func
|
||||||
|
# returns, always!
|
||||||
|
expiry: str = expiry.upper()
|
||||||
|
venue: str = venue.upper()
|
||||||
|
venue_lower: str = venue.lower()
|
||||||
|
|
||||||
|
mkt_mode: str = 'option'
|
||||||
|
|
||||||
|
async with open_cached_client(
|
||||||
|
'deribit',
|
||||||
|
) as client:
|
||||||
|
|
||||||
|
assets: dict[str, Asset] = await client.get_assets()
|
||||||
|
pair_str: str = mkt_ep.lower()
|
||||||
|
|
||||||
|
pair: Pair = await client.exch_info(
|
||||||
|
sym=pair_str,
|
||||||
|
)
|
||||||
|
mkt_mode = pair.venue
|
||||||
|
client.mkt_mode = mkt_mode
|
||||||
|
|
||||||
|
dst: Asset | None = assets.get(pair.bs_dst_asset)
|
||||||
|
src: Asset | None = assets.get(pair.bs_src_asset)
|
||||||
|
|
||||||
|
mkt = MktPair(
|
||||||
|
dst=dst,
|
||||||
|
src=src,
|
||||||
|
price_tick=pair.price_tick,
|
||||||
|
size_tick=pair.size_tick,
|
||||||
|
bs_mktid=pair.symbol,
|
||||||
|
expiry=pair.expiry,
|
||||||
|
venue=mkt_mode,
|
||||||
|
broker='deribit',
|
||||||
|
_atype=mkt_mode,
|
||||||
|
_fqme_without_src=True,
|
||||||
|
)
|
||||||
|
return mkt, pair
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
|
|
||||||
send_chan: trio.abc.SendChannel,
|
send_chan: trio.abc.SendChannel,
|
||||||
|
@ -116,31 +198,26 @@ async def stream_quotes(
|
||||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||||
|
|
||||||
sym = symbols[0]
|
sym = symbols[0].split('.')[0]
|
||||||
|
|
||||||
|
init_msgs: list[FeedInit] = []
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
open_cached_client('deribit') as client,
|
open_cached_client('deribit') as client,
|
||||||
send_chan as send_chan
|
send_chan as send_chan
|
||||||
):
|
):
|
||||||
|
|
||||||
init_msgs = {
|
mkt, pair = await get_mkt_info(sym)
|
||||||
# pass back token, and bool, signalling if we're the writer
|
|
||||||
# and that history has been written
|
|
||||||
sym: {
|
|
||||||
'symbol_info': {
|
|
||||||
'asset_type': 'option',
|
|
||||||
'price_tick_size': 0.0005
|
|
||||||
},
|
|
||||||
'shm_write_opts': {'sum_tick_vml': False},
|
|
||||||
'fqsn': sym,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
# build out init msgs according to latest spec
|
||||||
|
init_msgs.append(
|
||||||
|
FeedInit(mkt_info=mkt)
|
||||||
|
)
|
||||||
nsym = piker_sym_to_cb_sym(sym)
|
nsym = piker_sym_to_cb_sym(sym)
|
||||||
|
|
||||||
async with maybe_open_price_feed(sym) as stream:
|
async with maybe_open_price_feed(sym) as stream:
|
||||||
|
|
||||||
cache = await client.cache_symbols()
|
cache = client._pairs
|
||||||
|
|
||||||
last_trades = (await client.last_trades(
|
last_trades = (await client.last_trades(
|
||||||
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||||
|
@ -182,12 +259,21 @@ async def open_symbol_search(
|
||||||
async with open_cached_client('deribit') as client:
|
async with open_cached_client('deribit') as client:
|
||||||
|
|
||||||
# load all symbols locally for fast search
|
# load all symbols locally for fast search
|
||||||
cache = await client.cache_symbols()
|
cache = client._pairs
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
pattern: str
|
||||||
async for pattern in stream:
|
async for pattern in stream:
|
||||||
# repack in dict form
|
# NOTE: pattern fuzzy-matching is done within
|
||||||
await stream.send(
|
# the methd impl.
|
||||||
await client.search_symbols(pattern))
|
pairs: dict[str, Pair] = await client.search_symbols(
|
||||||
|
pattern,
|
||||||
|
)
|
||||||
|
# repack in fqme-keyed table
|
||||||
|
byfqme: dict[str, Pair] = {}
|
||||||
|
for pair in pairs.values():
|
||||||
|
byfqme[pair.bs_fqme] = pair
|
||||||
|
|
||||||
|
await stream.send(byfqme)
|
||||||
|
|
Loading…
Reference in New Issue