diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 0c328d9f..e096af16 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -51,6 +51,7 @@ __brokers__: list[str] = [ 'ib', 'kraken', 'kucoin', + 'deribit', # broken but used to work # 'questrade', @@ -61,7 +62,6 @@ __brokers__: list[str] = [ # wstrade # iex - # deribit # bitso ] diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py index 7499cd35..5e87a708 100644 --- a/piker/brokers/deribit/__init__.py +++ b/piker/brokers/deribit/__init__.py @@ -25,6 +25,7 @@ from .api import ( get_client, ) from .feed import ( + get_mkt_info, open_history_client, open_symbol_search, stream_quotes, @@ -43,6 +44,7 @@ log = get_logger(__name__) __all__ = [ 'get_client', # 'trades_dialogue', + 'get_mkt_info', 'open_history_client', 'open_symbol_search', 'stream_quotes', diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index bd2fbe06..e32e31d2 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -34,9 +34,20 @@ from rapidfuzz import process as fuzzy import numpy as np import tractor -from piker.brokers import open_cached_client +from piker.accounting import ( + MktPair, + unpack_fqme, +) +from piker.brokers import ( + open_cached_client, + NoData, +) +from piker._cacheables import ( + async_lifo_cache, +) from piker.log import get_logger, get_console_log from piker.data import ShmArray +from piker.data.validate import FeedInit from piker.brokers._util import ( BrokerError, DataUnavailable, @@ -51,7 +62,7 @@ from cryptofeed.symbols import Symbol from .api import ( Client, Trade, get_config, - str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + piker_sym_to_cb_sym, cb_sym_to_deribit_inst, maybe_open_price_feed ) from .venues import ( @@ -72,36 +83,107 @@ async def open_history_client( mkt: MktPair, ) -> tuple[Callable, int]: - fnstrument: str = mkt.bs_fqme # TODO implement history getter for the new storage layer. async with open_cached_client('deribit') as client: async def get_ohlc( - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, + timeframe: float, + end_dt: datetime | None = None, + start_dt: datetime | None = None, ) -> tuple[ np.ndarray, datetime, # start datetime, # end ]: + if timeframe != 60: + raise DataUnavailable('Only 1m bars are supported') - array = await client.bars( - instrument, + array: np.ndarray = await client.bars( + mkt, start_dt=start_dt, end_dt=end_dt, ) if len(array) == 0: - raise DataUnavailable + raise NoData( + f'No frame for {start_dt} -> {end_dt}\n' + ) - start_dt = pendulum.from_timestamp(array[0]['time']) - end_dt = pendulum.from_timestamp(array[-1]['time']) + start_dt = from_timestamp(array[0]['time']) + end_dt = from_timestamp(array[-1]['time']) + + times = array['time'] + if not times.any(): + raise ValueError( + 'Bad frame with null-times?\n\n' + f'{times}' + ) + + if end_dt is None: + inow: int = round(time.time()) + if (inow - times[-1]) > 60: + await tractor.pause() return array, start_dt, end_dt yield get_ohlc, {'erlangs': 3, 'rate': 3} +@async_lifo_cache() +async def get_mkt_info( + fqme: str, + +) -> tuple[MktPair, Pair] | None: + + # uppercase since kraken bs_mktid is always upper + if 'deribit' not in fqme.lower(): + fqme += '.deribit' + + mkt_mode: str = '' + broker, mkt_ep, venue, expiry = unpack_fqme(fqme) + + # NOTE: we always upper case all tokens to be consistent with + # binance's symbology style for pairs, like `BTCUSDT`, but in + # theory we could also just keep things lower case; as long as + # we're consistent and the symcache matches whatever this func + # returns, always! + expiry: str = expiry.upper() + venue: str = venue.upper() + venue_lower: str = venue.lower() + + mkt_mode: str = 'option' + + async with open_cached_client( + 'deribit', + ) as client: + + assets: dict[str, Asset] = await client.get_assets() + pair_str: str = mkt_ep.lower() + + pair: Pair = await client.exch_info( + sym=pair_str, + ) + mkt_mode = pair.venue + client.mkt_mode = mkt_mode + + dst: Asset | None = assets.get(pair.bs_dst_asset) + src: Asset | None = assets.get(pair.bs_src_asset) + + mkt = MktPair( + dst=dst, + src=src, + price_tick=pair.price_tick, + size_tick=pair.size_tick, + bs_mktid=pair.symbol, + expiry=pair.expiry, + venue=mkt_mode, + broker='deribit', + _atype=mkt_mode, + _fqme_without_src=True, + ) + return mkt, pair + + async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -116,31 +198,26 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - sym = symbols[0] + sym = symbols[0].split('.')[0] + + init_msgs: list[FeedInit] = [] async with ( open_cached_client('deribit') as client, send_chan as send_chan ): - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': { - 'asset_type': 'option', - 'price_tick_size': 0.0005 - }, - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, - }, - } + mkt, pair = await get_mkt_info(sym) + # build out init msgs according to latest spec + init_msgs.append( + FeedInit(mkt_info=mkt) + ) nsym = piker_sym_to_cb_sym(sym) async with maybe_open_price_feed(sym) as stream: - cache = await client.cache_symbols() + cache = client._pairs last_trades = (await client.last_trades( cb_sym_to_deribit_inst(nsym), count=1)).trades @@ -182,12 +259,21 @@ async def open_symbol_search( async with open_cached_client('deribit') as client: # load all symbols locally for fast search - cache = await client.cache_symbols() + cache = client._pairs await ctx.started() async with ctx.open_stream() as stream: + pattern: str async for pattern in stream: - # repack in dict form - await stream.send( - await client.search_symbols(pattern)) + # NOTE: pattern fuzzy-matching is done within + # the methd impl. + pairs: dict[str, Pair] = await client.search_symbols( + pattern, + ) + # repack in fqme-keyed table + byfqme: dict[str, Pair] = {} + for pair in pairs.values(): + byfqme[pair.bs_fqme] = pair + + await stream.send(byfqme)