# piker: trading gear for hackers # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' Deribit backend. ''' import asyncio from collections import ChainMap from contextlib import ( asynccontextmanager as acm, ) from datetime import datetime from decimal import ( Decimal, ) from functools import partial import time from typing import ( Any, Optional, Callable, ) from pendulum import now import trio from trio_typing import TaskStatus from rapidfuzz import process as fuzzy import numpy as np from tractor.trionics import ( broadcast_receiver, maybe_open_context ) from tractor import to_asyncio # XXX WOOPS XD # yeah you'll need to install it since it was removed in #489 by # accident; well i thought we had removed all usage.. from cryptofeed import FeedHandler from cryptofeed.defines import ( DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT ) from cryptofeed.symbols import Symbol # types for managing the cb callbacks. # from cryptofeed.types import L1Book from .venues import ( _ws_url, MarketType, PAIRTYPES, Pair, OptionPair, JSONRPCResult, JSONRPCChannel, KLinesResult, Trade, LastTradesResult, ) from piker.accounting import ( Asset, digits_to_dec, MktPair, ) from piker.data import ( def_iohlcv_fields, match_from_pairs, Struct, ) from piker.data._web_bs import ( open_jsonrpc_session ) from piker import config from piker.log import get_logger log = get_logger(__name__) _spawn_kwargs = { 'infect_asyncio': True, } # convert datetime obj timestamp to unixtime in milliseconds def deribit_timestamp(when): return int((when.timestamp() * 1000) + (when.microsecond / 1000)) def str_to_cb_sym(name: str) -> Symbol: base, strike_price, expiry_date, option_type = name.split('-') quote = base if option_type == 'put': option_type = PUT elif option_type == 'call': option_type = CALL else: raise Exception("Couldn\'t parse option type") new_expiry_date = get_values_from_cb_normalized_date(expiry_date) return Symbol( base=base, quote=quote, type=OPTION, strike_price=strike_price, option_type=option_type, expiry_date=new_expiry_date) def piker_sym_to_cb_sym(name: str) -> Symbol: base, expiry_date, strike_price, option_type = tuple( name.upper().split('-')) quote = base if option_type == 'P': option_type = PUT elif option_type == 'C': option_type = CALL else: raise Exception("Couldn\'t parse option type") return Symbol( base=base, quote=quote, type=OPTION, strike_price=strike_price, option_type=option_type, expiry_date=expiry_date) def cb_sym_to_deribit_inst(sym: Symbol): new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) otype = 'C' if sym.option_type == CALL else 'P' return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' def get_values_from_cb_normalized_date(expiry_date: str) -> str: # deribit specific cb_norm = [ 'F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z' ] months = [ 'JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC' ] # YYMDD # 01234 day, month, year = ( expiry_date[3:], months[cb_norm.index(expiry_date[2:3])], expiry_date[:2] ) return f'{day}{month}{year}' def get_config() -> dict[str, Any]: conf: dict path: Path conf, path = config.load( conf_name='brokers', touch_if_dne=True, ) section: dict = {} section = conf.get('deribit') if section is None: log.warning(f'No config section found for deribit in {path}') return {} conf_option = section.get('option', {}) section.clear # clear the dict to reuse it section['deribit'] = {} section['deribit']['key_id'] = conf_option.get('api_key') section['deribit']['key_secret'] = conf_option.get('api_secret') section['log'] = {} section['log']['filename'] = 'feedhandler.log' section['log']['level'] = 'DEBUG' return section class Client: def __init__( self, json_rpc: Callable ) -> None: self._pairs: ChainMap[str, Pair] = ChainMap() config = get_config().get('deribit', {}) self._key_id = config.get('key_id') self._key_secret = config.get('key_secret') self.json_rpc = json_rpc self._auth_ts = None self._auth_renew_ts = 5 # seconds to renew auth async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: """Background task that adquires a first access token and then will refresh the access token. https://docs.deribit.com/?python#authentication-2 """ access_scope = 'trade:read_write' current_ts = time.time() if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts: # if we are close to token expiry time params = { 'grant_type': 'client_credentials', 'client_id': self._key_id, 'client_secret': self._key_secret, 'scope': access_scope } resp = await self.json_rpc('public/auth', params) result = resp.result self._auth_ts = time.time() + result['expires_in'] return await self.json_rpc(*args, **kwargs) async def get_balances( self, kind: str = 'option' ) -> dict[str, float]: """Return the set of positions for this account by symbol. """ balances = {} for currency in self.currencies: resp = await self._json_rpc_auth_wrapper( 'private/get_positions', params={ 'currency': currency.upper(), 'kind': kind}) balances[currency] = resp.result return balances async def get_assets( self, venue: str | None = None, ) -> dict[str, Asset]: """Return the set of asset balances for this account by symbol. """ assets = {} resp = await self._json_rpc_auth_wrapper( 'public/get_currencies', params={} ) currencies = resp.result for currency in currencies: name = currency['currency'] tx_tick = digits_to_dec(currency['fee_precision']) atype='crypto_currency' assets[name] = Asset( name=name, atype=atype, tx_tick=tx_tick) instruments = await self.symbol_info(currency=name) for instrument in instruments: pair = instruments[instrument] assets[pair.symbol] = Asset( name=pair.symbol, atype=pair.venue, tx_tick=pair.size_tick) return assets async def get_mkt_pairs(self) -> dict[str, Pair]: flat: dict[str, Pair] = {} for key in self._pairs: item = self._pairs.get(key) flat[item.bs_fqme] = item return flat async def submit_limit( self, symbol: str, price: float, action: str, size: float ) -> dict: """Place an order """ params = { 'instrument_name': symbol.upper(), 'amount': size, 'type': 'limit', 'price': price, } resp = await self._json_rpc_auth_wrapper( f'private/{action}', params) return resp.result async def submit_cancel(self, oid: str): """Send cancel request for order id """ resp = await self._json_rpc_auth_wrapper( 'private/cancel', {'order_id': oid}) return resp.result async def exch_info( self, sym: str | None = None, venue: MarketType = 'option', expiry: str | None = None, ) -> dict[str, Pair] | Pair: pair_table: dict[str, Pair] = self._pairs if ( sym and (cached_pair := pair_table.get(sym)) ): return cached_pair if sym: return pair_table[sym] else: return self._pairs async def symbol_info( self, instrument: Optional[str] = None, currency: str = 'btc', # BTC, ETH, SOL, USDC kind: str = 'option', expired: bool = False ) -> dict[str, Pair] | Pair: ''' Get symbol infos. ''' if self._pairs: return self._pairs # will retrieve all symbols by default params: dict[str, str] = { 'currency': currency.upper(), 'kind': kind, 'expired': str(expired).lower() } resp: JSONRPCResult = await self._json_rpc_auth_wrapper( 'public/get_instruments', params, ) # convert to symbol-keyed table pair_type: Type = PAIRTYPES[kind] results: list[dict] | None = resp.result instruments: dict[str, Pair] = {} for item in results: symbol=item['instrument_name'].lower() try: pair: Pair = pair_type( symbol=symbol, **item ) except Exception as e: e.add_note( "\nDon't panic, prolly stupid deribit changed their symbology schema again..\n" 'Check out their API docs here:\n\n' 'https://docs.deribit.com/?python#deribit-api-v2-1-1' ) raise instruments[symbol] = pair if instrument is not None: return instruments[instrument.lower()] else: return instruments async def cache_symbols( self, venue: MarketType = 'option', ) -> None: # lookup internal mkt-specific pair table to update pair_table: dict[str, Pair] = self._pairs # make API request(s) mkt_pairs = await self.symbol_info() if not mkt_pairs: raise SymbolNotFound(f'No market pairs found!?:\n{resp}') pairs_view_subtable: dict[str, Pair] = {} for instrument in mkt_pairs: pair_type: Type = PAIRTYPES[venue] pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) pair_table[pair.symbol.upper()] = pair # update an additional top-level-cross-venue-table # `._pairs: ChainMap` for search B0 pairs_view_subtable[pair.bs_fqme] = pair self._pairs.maps.append(pairs_view_subtable) return self._pairs async def search_symbols( self, pattern: str, limit: int = 30, ) -> dict[str, Pair]: ''' Fuzzy search symbology set for pairs matching `pattern`. ''' pairs: dict[str, Pair] = await self.exch_info() return match_from_pairs( pairs=pairs, query=pattern.upper(), score_cutoff=35, limit=limit ) async def bars( self, mkt: MktPair, start_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None, limit: int = 1000, as_np: bool = True, ) -> list[tuple] | np.ndarray: instrument: str = mkt.bs_fqme.split('.')[0] if end_dt is None: end_dt = now('UTC') if start_dt is None: start_dt = end_dt.start_of( 'minute').subtract(minutes=limit) start_time = deribit_timestamp(start_dt) end_time = deribit_timestamp(end_dt) # https://docs.deribit.com/#public-get_tradingview_chart_data resp = await self._json_rpc_auth_wrapper( 'public/get_tradingview_chart_data', params={ 'instrument_name': instrument.upper(), 'start_timestamp': start_time, 'end_timestamp': end_time, 'resolution': '1' }) result = KLinesResult(**resp.result) new_bars: list[tuple] = [] for i in range(len(result.close)): row = [ (start_time + (i * (60 * 1000))) / 1000.0, # time result.open[i], result.high[i], result.low[i], result.close[i], result.volume[i] ] new_bars.append((i,) + tuple(row)) if not as_np: return result return np.array( new_bars, dtype=def_iohlcv_fields ) async def last_trades( self, instrument: str, count: int = 10 ): resp = await self._json_rpc_auth_wrapper( 'public/get_last_trades_by_instrument', params={ 'instrument_name': instrument, 'count': count }) return LastTradesResult(**resp.result) @acm async def get_client( is_brokercheck: bool = False, venue: MarketType = 'option', ) -> Client: async with ( trio.open_nursery() as n, open_jsonrpc_session( _ws_url, response_type=JSONRPCResult ) as json_rpc ): client = Client(json_rpc) await client.cache_symbols() yield client n.cancel_scope.cancel() @acm async def open_feed_handler(): fh = FeedHandler(config=get_config()) yield fh await to_asyncio.run_task(fh.stop_async) @acm async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: async with maybe_open_context( acm_func=open_feed_handler, key='feedhandler', ) as (cache_hit, fh): yield fh async def aio_price_feed_relay( fh: FeedHandler, instrument: str, from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, ) -> None: async def _trade(data: dict, receipt_timestamp): to_trio.send_nowait(('trade', { 'symbol': cb_sym_to_deribit_inst( str_to_cb_sym(data.symbol)).lower(), 'last': data, 'broker_ts': time.time(), 'data': data.to_dict(), 'receipt': receipt_timestamp })) async def _l1(data: dict, receipt_timestamp): to_trio.send_nowait(('l1', { 'symbol': cb_sym_to_deribit_inst( str_to_cb_sym(data.symbol)).lower(), 'ticks': [ { 'type': 'bid', 'price': float(data.bid_price), 'size': float(data.bid_size) }, { 'type': 'bsize', 'price': float(data.bid_price), 'size': float(data.bid_size) }, { 'type': 'ask', 'price': float(data.ask_price), 'size': float(data.ask_size) }, { 'type': 'asize', 'price': float(data.ask_price), 'size': float(data.ask_size) } ] })) sym: Symbol = piker_sym_to_cb_sym(instrument) fh.add_feed( DERIBIT, channels=[TRADES, L1_BOOK], symbols=[sym], callbacks={ TRADES: _trade, L1_BOOK: _l1 }) if not fh.running: fh.run( start_loop=False, install_signal_handlers=False) # sync with trio to_trio.send_nowait(None) await asyncio.sleep(float('inf')) @acm async def open_price_feed( instrument: str ) -> trio.abc.ReceiveStream: async with maybe_open_feed_handler() as fh: async with to_asyncio.open_channel_from( partial( aio_price_feed_relay, fh, instrument ) ) as (first, chan): yield chan @acm async def maybe_open_price_feed( instrument: str ) -> trio.abc.ReceiveStream: # TODO: add a predicate to maybe_open_context async with maybe_open_context( acm_func=open_price_feed, kwargs={ 'instrument': instrument.split('.')[0] }, key=f'{instrument.split('.')[0]}-price', ) as (cache_hit, feed): if cache_hit: yield broadcast_receiver(feed, 10) else: yield feed async def aio_order_feed_relay( fh: FeedHandler, instrument: Symbol, from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, ) -> None: async def _fill(data: dict, receipt_timestamp): breakpoint() async def _order_info(data: dict, receipt_timestamp): breakpoint() fh.add_feed( DERIBIT, channels=[FILLS, ORDER_INFO], symbols=[instrument.upper()], callbacks={ FILLS: _fill, ORDER_INFO: _order_info, }) if not fh.running: fh.run( start_loop=False, install_signal_handlers=False) # sync with trio to_trio.send_nowait(None) await asyncio.sleep(float('inf')) @acm async def open_order_feed( instrument: list[str] ) -> trio.abc.ReceiveStream: async with maybe_open_feed_handler() as fh: async with to_asyncio.open_channel_from( partial( aio_order_feed_relay, fh, instrument ) ) as (first, chan): yield chan @acm async def maybe_open_order_feed( instrument: str ) -> trio.abc.ReceiveStream: # TODO: add a predicate to maybe_open_context async with maybe_open_context( acm_func=open_order_feed, kwargs={ 'instrument': instrument.split('.')[0], 'fh': fh }, key=f'{instrument.split('.')[0]}-order', ) as (cache_hit, feed): if cache_hit: yield broadcast_receiver(feed, 10) else: yield feed