Deribit's feed fix
- `FeedInit` for init_msgs in `stream_quotes`. - new cache is `client_pairs` so is replacing the old `client.cache_symbols`. - `get_mkt_info` added - `get_ohlc` fixed to comply the new ways of the feed.deribit_fix
							parent
							
								
									1061103f76
								
							
						
					
					
						commit
						f1436c93db
					
				|  | @ -51,6 +51,7 @@ __brokers__: list[str] = [ | |||
|     'ib', | ||||
|     'kraken', | ||||
|     'kucoin', | ||||
|     'deribit', | ||||
| 
 | ||||
|     # broken but used to work | ||||
|     # 'questrade', | ||||
|  | @ -61,7 +62,6 @@ __brokers__: list[str] = [ | |||
|     # wstrade | ||||
|     # iex | ||||
| 
 | ||||
|     # deribit | ||||
|     # bitso | ||||
| ] | ||||
| 
 | ||||
|  |  | |||
|  | @ -25,6 +25,7 @@ from .api import ( | |||
|     get_client, | ||||
| ) | ||||
| from .feed import ( | ||||
|     get_mkt_info, | ||||
|     open_history_client, | ||||
|     open_symbol_search, | ||||
|     stream_quotes, | ||||
|  | @ -43,6 +44,7 @@ log = get_logger(__name__) | |||
| __all__ = [ | ||||
|     'get_client', | ||||
| #    'trades_dialogue', | ||||
|     'get_mkt_info', | ||||
|     'open_history_client', | ||||
|     'open_symbol_search', | ||||
|     'stream_quotes', | ||||
|  |  | |||
|  | @ -34,9 +34,20 @@ from rapidfuzz import process as fuzzy | |||
| import numpy as np | ||||
| import tractor | ||||
| 
 | ||||
| from piker.brokers import open_cached_client | ||||
| from piker.accounting import ( | ||||
|     MktPair, | ||||
|     unpack_fqme, | ||||
| ) | ||||
| from piker.brokers import ( | ||||
|     open_cached_client, | ||||
|     NoData, | ||||
| ) | ||||
| from piker._cacheables import ( | ||||
|     async_lifo_cache, | ||||
| ) | ||||
| from piker.log import get_logger, get_console_log | ||||
| from piker.data import ShmArray | ||||
| from piker.data.validate import FeedInit | ||||
| from piker.brokers._util import ( | ||||
|     BrokerError, | ||||
|     DataUnavailable, | ||||
|  | @ -51,7 +62,7 @@ from cryptofeed.symbols import Symbol | |||
| from .api import ( | ||||
|     Client, Trade, | ||||
|     get_config, | ||||
|     str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, | ||||
|     piker_sym_to_cb_sym, cb_sym_to_deribit_inst, | ||||
|     maybe_open_price_feed | ||||
| ) | ||||
| from .venues import ( | ||||
|  | @ -72,36 +83,107 @@ async def open_history_client( | |||
|     mkt: MktPair, | ||||
| ) -> tuple[Callable, int]: | ||||
| 
 | ||||
|     fnstrument: str = mkt.bs_fqme | ||||
|     # TODO implement history getter for the new storage layer. | ||||
|     async with open_cached_client('deribit') as client: | ||||
| 
 | ||||
|         async def get_ohlc( | ||||
|             end_dt: Optional[datetime] = None, | ||||
|             start_dt: Optional[datetime] = None, | ||||
|             timeframe: float, | ||||
|             end_dt: datetime | None = None, | ||||
|             start_dt: datetime | None = None, | ||||
| 
 | ||||
|         ) -> tuple[ | ||||
|             np.ndarray, | ||||
|             datetime,  # start | ||||
|             datetime,  # end | ||||
|         ]: | ||||
|             if timeframe != 60: | ||||
|                 raise DataUnavailable('Only 1m bars are supported') | ||||
| 
 | ||||
|             array = await client.bars( | ||||
|                 instrument, | ||||
|             array: np.ndarray = await client.bars( | ||||
|                 mkt, | ||||
|                 start_dt=start_dt, | ||||
|                 end_dt=end_dt, | ||||
|             ) | ||||
|             if len(array) == 0: | ||||
|                 raise DataUnavailable | ||||
|                 raise NoData( | ||||
|                     f'No frame for {start_dt} -> {end_dt}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             start_dt = pendulum.from_timestamp(array[0]['time']) | ||||
|             end_dt = pendulum.from_timestamp(array[-1]['time']) | ||||
|             start_dt = from_timestamp(array[0]['time']) | ||||
|             end_dt = from_timestamp(array[-1]['time']) | ||||
| 
 | ||||
|             times = array['time'] | ||||
|             if not times.any(): | ||||
|                 raise ValueError( | ||||
|                     'Bad frame with null-times?\n\n' | ||||
|                     f'{times}' | ||||
|                 ) | ||||
| 
 | ||||
|             if end_dt is None: | ||||
|                 inow: int = round(time.time()) | ||||
|                 if (inow - times[-1]) > 60: | ||||
|                     await tractor.pause() | ||||
| 
 | ||||
|             return array, start_dt, end_dt | ||||
| 
 | ||||
|         yield get_ohlc, {'erlangs': 3, 'rate': 3} | ||||
| 
 | ||||
| 
 | ||||
| @async_lifo_cache() | ||||
| async def get_mkt_info( | ||||
|     fqme: str, | ||||
| 
 | ||||
| ) -> tuple[MktPair, Pair] | None: | ||||
| 
 | ||||
|     # uppercase since kraken bs_mktid is always upper | ||||
|     if 'deribit' not in fqme.lower(): | ||||
|         fqme += '.deribit' | ||||
| 
 | ||||
|     mkt_mode: str = '' | ||||
|     broker, mkt_ep, venue, expiry = unpack_fqme(fqme) | ||||
| 
 | ||||
|     # NOTE: we always upper case all tokens to be consistent with | ||||
|     # binance's symbology style for pairs, like `BTCUSDT`, but in | ||||
|     # theory we could also just keep things lower case; as long as | ||||
|     # we're consistent and the symcache matches whatever this func | ||||
|     # returns, always! | ||||
|     expiry: str = expiry.upper() | ||||
|     venue: str = venue.upper() | ||||
|     venue_lower: str = venue.lower() | ||||
| 
 | ||||
|     mkt_mode: str = 'option' | ||||
| 
 | ||||
|     async with open_cached_client( | ||||
|         'deribit', | ||||
|     ) as client: | ||||
| 
 | ||||
|         assets: dict[str, Asset] = await client.get_assets() | ||||
|         pair_str: str = mkt_ep.lower() | ||||
| 
 | ||||
|         pair: Pair = await client.exch_info( | ||||
|             sym=pair_str, | ||||
|         ) | ||||
|         mkt_mode = pair.venue | ||||
|         client.mkt_mode = mkt_mode | ||||
| 
 | ||||
|         dst: Asset | None = assets.get(pair.bs_dst_asset) | ||||
|         src: Asset | None = assets.get(pair.bs_src_asset) | ||||
| 
 | ||||
|         mkt = MktPair( | ||||
|             dst=dst, | ||||
|             src=src, | ||||
|             price_tick=pair.price_tick, | ||||
|             size_tick=pair.size_tick, | ||||
|             bs_mktid=pair.symbol, | ||||
|             expiry=pair.expiry, | ||||
|             venue=mkt_mode, | ||||
|             broker='deribit', | ||||
|             _atype=mkt_mode, | ||||
|             _fqme_without_src=True, | ||||
|         ) | ||||
|         return mkt, pair | ||||
| 
 | ||||
| 
 | ||||
| async def stream_quotes( | ||||
| 
 | ||||
|     send_chan: trio.abc.SendChannel, | ||||
|  | @ -116,31 +198,26 @@ async def stream_quotes( | |||
|     # XXX: required to propagate ``tractor`` loglevel to piker logging | ||||
|     get_console_log(loglevel or tractor.current_actor().loglevel) | ||||
| 
 | ||||
|     sym = symbols[0] | ||||
|     sym = symbols[0].split('.')[0] | ||||
| 
 | ||||
|     init_msgs: list[FeedInit] = [] | ||||
| 
 | ||||
|     async with ( | ||||
|         open_cached_client('deribit') as client, | ||||
|         send_chan as send_chan | ||||
|     ): | ||||
| 
 | ||||
|         init_msgs = { | ||||
|             # pass back token, and bool, signalling if we're the writer | ||||
|             # and that history has been written | ||||
|             sym: { | ||||
|                 'symbol_info': { | ||||
|                     'asset_type': 'option', | ||||
|                     'price_tick_size': 0.0005 | ||||
|                 }, | ||||
|                 'shm_write_opts': {'sum_tick_vml': False}, | ||||
|                 'fqsn': sym, | ||||
|             }, | ||||
|         } | ||||
|         mkt, pair = await get_mkt_info(sym) | ||||
| 
 | ||||
|         # build out init msgs according to latest spec | ||||
|         init_msgs.append( | ||||
|             FeedInit(mkt_info=mkt) | ||||
|         ) | ||||
|         nsym = piker_sym_to_cb_sym(sym) | ||||
| 
 | ||||
|         async with maybe_open_price_feed(sym) as stream: | ||||
| 
 | ||||
|             cache = await client.cache_symbols() | ||||
|             cache = client._pairs | ||||
| 
 | ||||
|             last_trades = (await client.last_trades( | ||||
|                 cb_sym_to_deribit_inst(nsym), count=1)).trades | ||||
|  | @ -182,12 +259,21 @@ async def open_symbol_search( | |||
|     async with open_cached_client('deribit') as client: | ||||
| 
 | ||||
|         # load all symbols locally for fast search | ||||
|         cache = await client.cache_symbols() | ||||
|         cache = client._pairs | ||||
|         await ctx.started() | ||||
| 
 | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             pattern: str | ||||
|             async for pattern in stream: | ||||
|                 # repack in dict form | ||||
|                 await stream.send( | ||||
|                     await client.search_symbols(pattern)) | ||||
|                 # NOTE: pattern fuzzy-matching is done within | ||||
|                 # the methd impl. | ||||
|                 pairs: dict[str, Pair] = await client.search_symbols( | ||||
|                     pattern, | ||||
|                 ) | ||||
|                 # repack in fqme-keyed table | ||||
|                 byfqme: dict[str, Pair] = {} | ||||
|                 for pair in pairs.values(): | ||||
|                     byfqme[pair.bs_fqme] = pair | ||||
| 
 | ||||
|                 await stream.send(byfqme) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue