diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index b9b98ad9..3b19ff59 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -113,13 +113,15 @@ def str_to_cb_sym(name: str) -> Symbol: else: raise Exception("Couldn\'t parse option type") + new_expiry_date = get_values_from_cb_normalized_date(expiry_date) + return Symbol( - base, quote, + base=base, + quote=quote, type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=expiry_date, - expiry_normalize=False) + expiry_date=new_expiry_date) def piker_sym_to_cb_sym(name: str) -> Symbol: @@ -130,83 +132,138 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: if option_type == 'P': option_type = PUT - elif option_type == 'C': + elif option_type == 'C': option_type = CALL else: raise Exception("Couldn\'t parse option type") return Symbol( - base, quote, + base=base, + quote=quote, type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=expiry_date.upper()) + expiry_date=expiry_date) def cb_sym_to_deribit_inst(sym: Symbol): - # cryptofeed normalized - cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] - - # deribit specific - months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] - - exp = sym.expiry_date - - # YYMDD - # 01234 - year, month, day = ( - exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) - + new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) otype = 'C' if sym.option_type == CALL else 'P' - return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' + return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' + + +def get_values_from_cb_normalized_date(expiry_date: str) -> str: + # deribit specific + cb_norm = [ + 'F', 'G', 'H', 'J', + 'K', 'M', 'N', 'Q', + 'U', 'V', 'X', 'Z' + ] + months = [ + 'JAN', 'FEB', 'MAR', 'APR', + 'MAY', 'JUN', 'JUL', 'AUG', + 'SEP', 'OCT', 'NOV', 'DEC' + ] + # YYMDD + # 01234 + day, month, year = ( + expiry_date[3:], + months[cb_norm.index(expiry_date[2:3])], + expiry_date[:2] + ) + return f'{day}{month}{year}' def get_config() -> dict[str, Any]: - conf, path = config.load() + conf: dict + path: Path + conf, path = config.load( + conf_name='brokers', + touch_if_dne=True, + ) + section: dict = {} section = conf.get('deribit') - - # TODO: document why we send this, basically because logging params for cryptofeed - conf['log'] = {} - conf['log']['disabled'] = True - if section is None: log.warning(f'No config section found for deribit in {path}') + return {} - return conf + conf_option = section.get('option', {}) + section.clear # clear the dict to reuse it + section['deribit'] = {} + section['deribit']['key_id'] = conf_option.get('api_key') + section['deribit']['key_secret'] = conf_option.get('api_secret') + + section['log'] = {} + section['log']['filename'] = 'feedhandler.log' + section['log']['level'] = 'DEBUG' + + return section class Client: - def __init__(self, json_rpc: Callable) -> None: - self._pairs: dict[str, Any] = None + def __init__( + self, + + json_rpc: Callable + + ) -> None: + self._pairs: ChainMap[str, Pair] = ChainMap() config = get_config().get('deribit', {}) - if ('key_id' in config) and ('key_secret' in config): - self._key_id = config['key_id'] - self._key_secret = config['key_secret'] - - else: - self._key_id = None - self._key_secret = None + self._key_id = config.get('key_id') + self._key_secret = config.get('key_secret') self.json_rpc = json_rpc - @property - def currencies(self): - return ['btc', 'eth', 'sol', 'usd'] + self._auth_ts = None + self._auth_renew_ts = 5 # seconds to renew auth - async def get_balances(self, kind: str = 'option') -> dict[str, float]: + async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: + + """Background task that adquires a first access token and then will + refresh the access token. + + https://docs.deribit.com/?python#authentication-2 + """ + access_scope = 'trade:read_write' + current_ts = time.time() + + if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts: + # if we are close to token expiry time + + params = { + 'grant_type': 'client_credentials', + 'client_id': self._key_id, + 'client_secret': self._key_secret, + 'scope': access_scope + } + + resp = await self.json_rpc('public/auth', params) + result = resp.result + + self._auth_ts = time.time() + result['expires_in'] + + return await self.json_rpc(*args, **kwargs) + + + + + async def get_balances( + self, + kind: str = 'option' + ) -> dict[str, float]: """Return the set of positions for this account by symbol. """ balances = {} for currency in self.currencies: - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'private/get_positions', params={ 'currency': currency.upper(), 'kind': kind}) @@ -215,20 +272,46 @@ class Client: return balances - async def get_assets(self) -> dict[str, float]: + async def get_assets( + self, + venue: str | None = None, + + ) -> dict[str, Asset]: """Return the set of asset balances for this account by symbol. """ - balances = {} + assets = {} + resp = await self._json_rpc_auth_wrapper( + 'public/get_currencies', + params={} + ) + currencies = resp.result + for currency in currencies: + name = currency['currency'] + tx_tick = digits_to_dec(currency['fee_precision']) + atype='crypto_currency' + assets[name] = Asset( + name=name, + atype=atype, + tx_tick=tx_tick) - for currency in self.currencies: - resp = await self.json_rpc( - 'private/get_account_summary', params={ - 'currency': currency.upper()}) + instruments = await self.symbol_info(currency=name) + for instrument in instruments: + pair = instruments[instrument] + assets[pair.symbol] = Asset( + name=pair.symbol, + atype=pair.venue, + tx_tick=pair.size_tick) - balances[currency] = resp.result['balance'] + return assets - return balances + async def get_mkt_pairs(self) -> dict[str, Pair]: + flat: dict[str, Pair] = {} + for key in self._pairs: + item = self._pairs.get(key) + flat[item.bs_fqme] = item + + return flat async def submit_limit( self, @@ -245,7 +328,7 @@ class Client: 'type': 'limit', 'price': price, } - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( f'private/{action}', params) return resp.result @@ -253,10 +336,32 @@ class Client: async def submit_cancel(self, oid: str): """Send cancel request for order id """ - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'private/cancel', {'order_id': oid}) return resp.result + async def exch_info( + self, + sym: str | None = None, + + venue: MarketType = 'option', + expiry: str | None = None, + + ) -> dict[str, Pair] | Pair: + + pair_table: dict[str, Pair] = self._pairs + + if ( + sym + and (cached_pair := pair_table.get(sym)) + ): + return cached_pair + + if sym: + return pair_table[sym] + else: + return self._pairs + async def symbol_info( self, instrument: Optional[str] = None, @@ -264,7 +369,7 @@ class Client: kind: str = 'option', expired: bool = False - ) -> dict[str, dict]: + ) -> dict[str, Pair] | Pair: ''' Get symbol infos. @@ -279,28 +384,65 @@ class Client: 'expired': str(expired).lower() } - resp: JSONRPCResult = await self.json_rpc( + resp: JSONRPCResult = await self._json_rpc_auth_wrapper( 'public/get_instruments', params, ) # convert to symbol-keyed table + pair_type: Type = PAIRTYPES[kind] results: list[dict] | None = resp.result - instruments: dict[str, dict] = { - item['instrument_name'].lower(): item - for item in results - } + + instruments: dict[str, Pair] = {} + for item in results: + symbol=item['instrument_name'].lower() + try: + pair: Pair = pair_type( + symbol=symbol, + **item + ) + except Exception as e: + e.add_note( + "\nDon't panic, prolly stupid deribit changed their symbology schema again..\n" + 'Check out their API docs here:\n\n' + 'https://docs.deribit.com/?python#deribit-api-v2-1-1' + ) + raise + + instruments[symbol] = pair if instrument is not None: - return instruments[instrument] + return instruments[instrument.lower()] else: return instruments async def cache_symbols( self, - ) -> dict: + venue: MarketType = 'option', - if not self._pairs: - self._pairs = await self.symbol_info() + ) -> None: + # lookup internal mkt-specific pair table to update + pair_table: dict[str, Pair] = self._pairs + + # make API request(s) + mkt_pairs = await self.symbol_info() + + if not mkt_pairs: + raise SymbolNotFound(f'No market pairs found!?:\n{resp}') + + pairs_view_subtable: dict[str, Pair] = {} + + for instrument in mkt_pairs: + pair_type: Type = PAIRTYPES[venue] + + pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) + + pair_table[pair.symbol.upper()] = pair + + # update an additional top-level-cross-venue-table + # `._pairs: ChainMap` for search B0 + pairs_view_subtable[pair.bs_fqme] = pair + + self._pairs.maps.append(pairs_view_subtable) return self._pairs @@ -308,37 +450,35 @@ class Client: self, pattern: str, limit: int = 30, - ) -> dict[str, Any]: + ) -> dict[str, Pair]: ''' Fuzzy search symbology set for pairs matching `pattern`. ''' - pairs: dict[str, Any] = await self.symbol_info() - matches: dict[str, Pair] = match_from_pairs( + pairs: dict[str, Pair] = await self.exch_info() + + return match_from_pairs( pairs=pairs, query=pattern.upper(), score_cutoff=35, limit=limit ) - # repack in name-keyed table - return { - pair['instrument_name'].lower(): pair - for pair in matches.values() - } - async def bars( self, - symbol: str, + mkt: MktPair, + start_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None, + limit: int = 1000, as_np: bool = True, - ) -> dict: - instrument = symbol + + ) -> list[tuple] | np.ndarray: + instrument: str = mkt.bs_fqme.split('.')[0] if end_dt is None: - end_dt = pendulum.now('UTC') + end_dt = now('UTC') if start_dt is None: start_dt = end_dt.start_of( @@ -348,7 +488,7 @@ class Client: end_time = deribit_timestamp(end_dt) # https://docs.deribit.com/#public-get_tradingview_chart_data - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'public/get_tradingview_chart_data', params={ 'instrument_name': instrument.upper(), @@ -358,36 +498,34 @@ class Client: }) result = KLinesResult(**resp.result) - new_bars = [] + new_bars: list[tuple] = [] for i in range(len(result.close)): - _open = result.open[i] - high = result.high[i] - low = result.low[i] - close = result.close[i] - volume = result.volume[i] - - row = [ + row = [ (start_time + (i * (60 * 1000))) / 1000.0, # time result.open[i], result.high[i], result.low[i], result.close[i], - result.volume[i], - 0 + result.volume[i] ] new_bars.append((i,) + tuple(row)) - array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines - return array + if not as_np: + return result + + return np.array( + new_bars, + dtype=def_iohlcv_fields + ) async def last_trades( self, instrument: str, count: int = 10 ): - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'public/get_last_trades_by_instrument', params={ 'instrument_name': instrument, @@ -399,78 +537,17 @@ class Client: @acm async def get_client( - is_brokercheck: bool = False + is_brokercheck: bool = False, + venue: MarketType = 'option', ) -> Client: async with ( trio.open_nursery() as n, open_jsonrpc_session( - _testnet_ws_url, dtype=JSONRPCResult) as json_rpc + _ws_url, response_type=JSONRPCResult + ) as json_rpc ): client = Client(json_rpc) - - _refresh_token: Optional[str] = None - _access_token: Optional[str] = None - - async def _auth_loop( - task_status: TaskStatus = trio.TASK_STATUS_IGNORED - ): - """Background task that adquires a first access token and then will - refresh the access token while the nursery isn't cancelled. - - https://docs.deribit.com/?python#authentication-2 - """ - renew_time = 10 - access_scope = 'trade:read_write' - _expiry_time = time.time() - got_access = False - nonlocal _refresh_token - nonlocal _access_token - - while True: - if time.time() - _expiry_time < renew_time: - # if we are close to token expiry time - - if _refresh_token != None: - # if we have a refresh token already dont need to send - # secret - params = { - 'grant_type': 'refresh_token', - 'refresh_token': _refresh_token, - 'scope': access_scope - } - - else: - # we don't have refresh token, send secret to initialize - params = { - 'grant_type': 'client_credentials', - 'client_id': client._key_id, - 'client_secret': client._key_secret, - 'scope': access_scope - } - - resp = await json_rpc('public/auth', params) - result = resp.result - - _expiry_time = time.time() + result['expires_in'] - _refresh_token = result['refresh_token'] - - if 'access_token' in result: - _access_token = result['access_token'] - - if not got_access: - # first time this loop runs we must indicate task is - # started, we have auth - got_access = True - task_status.started() - - else: - await trio.sleep(renew_time / 2) - - # if we have client creds launch auth loop - if client._key_id is not None: - await n.start(_auth_loop) - await client.cache_symbols() yield client n.cancel_scope.cancel() @@ -494,7 +571,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: async def aio_price_feed_relay( fh: FeedHandler, - instrument: Symbol, + instrument: str, from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, ) -> None: @@ -513,21 +590,33 @@ async def aio_price_feed_relay( 'symbol': cb_sym_to_deribit_inst( str_to_cb_sym(data.symbol)).lower(), 'ticks': [ - {'type': 'bid', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'bsize', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'ask', - 'price': float(data.ask_price), 'size': float(data.ask_size)}, - {'type': 'asize', - 'price': float(data.ask_price), 'size': float(data.ask_size)} + { + 'type': 'bid', + 'price': float(data.bid_price), + 'size': float(data.bid_size) + }, + { + 'type': 'bsize', + 'price': float(data.bid_price), + 'size': float(data.bid_size) + }, + { + 'type': 'ask', + 'price': float(data.ask_price), + 'size': float(data.ask_size) + }, + { + 'type': 'asize', + 'price': float(data.ask_price), + 'size': float(data.ask_size) + } ] })) - + sym: Symbol = piker_sym_to_cb_sym(instrument) fh.add_feed( DERIBIT, channels=[TRADES, L1_BOOK], - symbols=[piker_sym_to_cb_sym(instrument)], + symbols=[sym], callbacks={ TRADES: _trade, L1_BOOK: _l1 @@ -568,9 +657,9 @@ async def maybe_open_price_feed( async with maybe_open_context( acm_func=open_price_feed, kwargs={ - 'instrument': instrument + 'instrument': instrument.split('.')[0] }, - key=f'{instrument}-price', + key=f'{instrument.split('.')[0]}-price', ) as (cache_hit, feed): if cache_hit: yield broadcast_receiver(feed, 10) @@ -635,10 +724,10 @@ async def maybe_open_order_feed( async with maybe_open_context( acm_func=open_order_feed, kwargs={ - 'instrument': instrument, + 'instrument': instrument.split('.')[0], 'fh': fh }, - key=f'{instrument}-order', + key=f'{instrument.split('.')[0]}-order', ) as (cache_hit, feed): if cache_hit: yield broadcast_receiver(feed, 10)