diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 0c328d9f..e096af16 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -51,6 +51,7 @@ __brokers__: list[str] = [ 'ib', 'kraken', 'kucoin', + 'deribit', # broken but used to work # 'questrade', @@ -61,7 +62,6 @@ __brokers__: list[str] = [ # wstrade # iex - # deribit # bitso ] diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py index 4c0c1850..5e87a708 100644 --- a/piker/brokers/deribit/__init__.py +++ b/piker/brokers/deribit/__init__.py @@ -25,6 +25,7 @@ from .api import ( get_client, ) from .feed import ( + get_mkt_info, open_history_client, open_symbol_search, stream_quotes, @@ -34,15 +35,20 @@ from .feed import ( # open_trade_dialog, # norm_trade_records, # ) +from .venues import ( + OptionPair, +) log = get_logger(__name__) __all__ = [ 'get_client', # 'trades_dialogue', + 'get_mkt_info', 'open_history_client', 'open_symbol_search', 'stream_quotes', + 'OptionPair', # 'norm_trade_records', ] diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 03cc301e..ee9c7033 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -19,11 +19,17 @@ Deribit backend. ''' import asyncio +from collections import ChainMap from contextlib import ( asynccontextmanager as acm, ) from datetime import datetime +from decimal import ( + Decimal, +) from functools import partial +from pathlib import Path +from pprint import pformat import time from typing import ( Any, @@ -31,10 +37,8 @@ from typing import ( Callable, ) -import pendulum +from pendulum import now import trio -from trio_typing import TaskStatus -from rapidfuzz import process as fuzzy import numpy as np from tractor.trionics import ( broadcast_receiver, @@ -48,14 +52,35 @@ from cryptofeed import FeedHandler from cryptofeed.defines import ( DERIBIT, L1_BOOK, TRADES, - OPTION, CALL, PUT + OPTION, CALL, PUT, + OPEN_INTEREST, ) from cryptofeed.symbols import Symbol - +from cryptofeed.types import ( + L1Book, + Trade, + OpenInterest, +) +from piker.brokers import SymbolNotFound +from .venues import ( + _ws_url, + MarketType, + PAIRTYPES, + Pair, + OptionPair, + JSONRPCResult, + KLinesResult, + LastTradesResult, +) +from piker.accounting import ( + Asset, + digits_to_dec, + MktPair, +) from piker.data import ( def_iohlcv_fields, match_from_pairs, - Struct, + # Struct, ) from piker.data._web_bs import ( open_jsonrpc_session @@ -74,60 +99,21 @@ _spawn_kwargs = { } -_url = 'https://www.deribit.com' -_ws_url = 'wss://www.deribit.com/ws/api/v2' -_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' +def deribit_timestamp(when: datetime) -> int: + ''' + Convert conventional epoch timestamp, in secs, to unixtime in + milliseconds. + + ''' + return int( + (when.timestamp() * 1000) + + + (when.microsecond / 1000) + ) -class JSONRPCResult(Struct): - jsonrpc: str = '2.0' - id: int - result: Optional[list[dict]] = None - error: Optional[dict] = None - usIn: int - usOut: int - usDiff: int - testnet: bool - -class JSONRPCChannel(Struct): - jsonrpc: str = '2.0' - method: str - params: dict - - -class KLinesResult(Struct): - close: list[float] - cost: list[float] - high: list[float] - low: list[float] - open: list[float] - status: str - ticks: list[int] - volume: list[float] - -class Trade(Struct): - trade_seq: int - trade_id: str - timestamp: int - tick_direction: int - price: float - mark_price: float - iv: float - instrument_name: str - index_price: float - direction: str - combo_trade_id: Optional[int] = 0, - combo_id: Optional[str] = '', - amount: float - -class LastTradesResult(Struct): - trades: list[Trade] - has_more: bool - - -# convert datetime obj timestamp to unixtime in milliseconds -def deribit_timestamp(when): - return int((when.timestamp() * 1000) + (when.microsecond / 1000)) +def get_timestamp_int(expiry_date: str) -> int: + return int(time.mktime(time.strptime(expiry_date, '%d%b%y'))) def str_to_cb_sym(name: str) -> Symbol: @@ -136,106 +122,197 @@ def str_to_cb_sym(name: str) -> Symbol: quote = base if option_type == 'put': - option_type = PUT - elif option_type == 'call': + option_type = PUT + elif option_type == 'call': option_type = CALL else: raise Exception("Couldn\'t parse option type") + new_expiry_date: int = get_timestamp_int( + get_values_from_cb_normalized_date(expiry_date) + ) return Symbol( - base, quote, + base=base, + quote=quote, type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=expiry_date, - expiry_normalize=False) + expiry_date=new_expiry_date + ) def piker_sym_to_cb_sym(name: str) -> Symbol: - base, expiry_date, strike_price, option_type = tuple( + ( + base, + expiry_date, + strike_price, + option_type, + )= tuple( name.upper().split('-')) - quote = base + new_expiry_date = get_timestamp_int(expiry_date) + quote: str = base - if option_type == 'P': - option_type = PUT - elif option_type == 'C': + if option_type == 'P' or option_type == 'PUT': + option_type = PUT + elif option_type == 'C' or option_type == 'CALL': option_type = CALL else: raise Exception("Couldn\'t parse option type") return Symbol( - base, quote, + base=base, + quote=quote, type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=expiry_date.upper()) + expiry_date=new_expiry_date + ) -def cb_sym_to_deribit_inst(sym: Symbol): - # cryptofeed normalized - cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] +# TODO, instead can't we just lookup the `MktPair` directly +# and pass it upward to `stream_quotes()`?? +def cb_sym_to_deribit_inst(sym: Symbol) -> str: + ''' + Generate our own internal `str`-repr for a `cryptofeed.Symbol` + uniquely from its fields. - # deribit specific - months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] + This is the equiv of generating a `Pair.fmqe` from `cryptofeed` + for now i suppose..? - exp = sym.expiry_date + ''' + new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) + otype = ( + 'C' if sym.option_type == CALL + else 'P' + ) + return ( + f'{sym.base}-' + f'{new_expiry_date}-' + f'{sym.strike_price}-' + f'{otype}' + ) + +def get_values_from_cb_normalized_date(expiry_date: str) -> str: + # deribit specific + cb_norm = [ + 'F', 'G', 'H', 'J', + 'K', 'M', 'N', 'Q', + 'U', 'V', 'X', 'Z' + ] + months = [ + 'JAN', 'FEB', 'MAR', 'APR', + 'MAY', 'JUN', 'JUL', 'AUG', + 'SEP', 'OCT', 'NOV', 'DEC' + ] # YYMDD # 01234 - year, month, day = ( - exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) - - otype = 'C' if sym.option_type == CALL else 'P' - - return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' + day, month, year = ( + expiry_date[3:], + months[cb_norm.index(expiry_date[2:3])], + expiry_date[:2] + ) + return f'{day}{month}{year}' def get_config() -> dict[str, Any]: - conf, path = config.load() - - section = conf.get('deribit') - - # TODO: document why we send this, basically because logging params for cryptofeed - conf['log'] = {} - conf['log']['disabled'] = True - + conf: dict + path: Path + conf, path = config.load( + conf_name='brokers', + touch_if_dne=True, + ) + section: dict|None = conf.get('deribit') if section is None: - log.warning(f'No config section found for deribit in {path}') + raise ValueError( + f'No `[deribit]` section found in\n' + f'{path!r}\n\n' + f'See the template config from the core repo for samples..\n' + # f'' + ) - return conf + conf_option = section.get('option', {}) + conf_log = conf_option.get('log', {}) + return { + 'deribit': { + 'key_id': conf_option['key_id'], + 'key_secret': conf_option['key_secret'], + }, + 'log': { + 'filename': conf_log['filename'], + 'level': conf_log['level'], + 'disabled': conf_log['disabled'], + } + } class Client: + ''' + Hi-level interface for the jsron-RPC over websocket API. - def __init__(self, json_rpc: Callable) -> None: - self._pairs: dict[str, Any] = None + ''' + def __init__( + self, + + json_rpc: Callable + + ) -> None: + self._pairs: ChainMap[str, Pair] = ChainMap() config = get_config().get('deribit', {}) - if ('key_id' in config) and ('key_secret' in config): - self._key_id = config['key_id'] - self._key_secret = config['key_secret'] - - else: - self._key_id = None - self._key_secret = None + self._key_id = config.get('key_id') + self._key_secret = config.get('key_secret') self.json_rpc = json_rpc - @property - def currencies(self): - return ['btc', 'eth', 'sol', 'usd'] + self._auth_ts = None + self._auth_renew_ts = 5 # seconds to renew auth - async def get_balances(self, kind: str = 'option') -> dict[str, float]: + async def _json_rpc_auth_wrapper( + self, + *args, + **kwargs, + ) -> JSONRPCResult: + + """Background task that adquires a first access token and then will + refresh the access token. + + https://docs.deribit.com/?python#authentication-2 + """ + access_scope = 'trade:read_write' + current_ts = time.time() + + if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts: + # if we are close to token expiry time + + params = { + 'grant_type': 'client_credentials', + 'client_id': self._key_id, + 'client_secret': self._key_secret, + 'scope': access_scope + } + + resp = await self.json_rpc('public/auth', params) + result = resp.result + + self._auth_ts = time.time() + result['expires_in'] + + return await self.json_rpc(*args, **kwargs) + + async def get_balances( + self, + kind: str = 'option' + ) -> dict[str, float]: """Return the set of positions for this account by symbol. """ balances = {} for currency in self.currencies: - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'private/get_positions', params={ 'currency': currency.upper(), 'kind': kind}) @@ -244,20 +321,63 @@ class Client: return balances - async def get_assets(self) -> dict[str, float]: - """Return the set of asset balances for this account - by symbol. - """ - balances = {} + async def get_currencies( + self, - for currency in self.currencies: - resp = await self.json_rpc( - 'private/get_account_summary', params={ - 'currency': currency.upper()}) + ) -> list[dict]: + ''' + Return the set of currencies for deribit. + ''' + assets = {} + resp = await self._json_rpc_auth_wrapper( + 'public/get_currencies', + params={} + ) + return resp.result - balances[currency] = resp.result['balance'] + async def get_assets( + self, + venue: str | None = None, - return balances + ) -> dict[str, Asset]: + ''' + Return the set of asset balances for this account + by (deribit's) symbol. + + + ''' + assets = {} + currencies = await self.get_currencies() + for currency in currencies: + name: str = currency['currency'] + tx_tick: Decimal = digits_to_dec(currency['fee_precision']) + + # TODO, handling of options, futures, perps etc. more + # specifically with diff `.atype`s? + assets[name] = Asset( + name=name, + atype='crypto_currency', + tx_tick=tx_tick, + ) + + instruments = await self.symbol_info(currency=name) + for instrument in instruments: + pair = instruments[instrument] + assets[pair.symbol] = Asset( + name=pair.symbol, + atype=pair.venue, + tx_tick=pair.size_tick, + ) + + return assets + + async def get_mkt_pairs(self) -> dict[str, Pair]: + flat: dict[str, Pair] = {} + for key in self._pairs: + item = self._pairs.get(key) + flat[item.bs_fqme] = item + + return flat async def submit_limit( self, @@ -274,7 +394,7 @@ class Client: 'type': 'limit', 'price': price, } - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( f'private/{action}', params) return resp.result @@ -282,10 +402,45 @@ class Client: async def submit_cancel(self, oid: str): """Send cancel request for order id """ - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'private/cancel', {'order_id': oid}) return resp.result + async def exch_info( + self, + sym: str | None = None, + + venue: MarketType = 'option', + expiry: str | None = None, + + ) -> dict[str, Pair] | Pair: + + pair_table: dict[str, Pair] = self._pairs + + if ( + sym + and (cached_pair := pair_table.get(sym)) + ): + return cached_pair + + if sym: + opt: OptionPair|None = pair_table.get(sym) + if not opt: + closest_matches: dict[str, Pair] = match_from_pairs( + pairs=pair_table, + query=sym, + score_cutoff=40, + ) + closest_syms: list[str] = list(closest_matches.keys()) + raise ValueError( + f'No contract found for {sym!r}\n\n' + f'Closest {len(closest_syms)} available contracts:\n\n' + f'{pformat(closest_syms)}\n' + ) + return pair_table[sym] + else: + return self._pairs + async def symbol_info( self, instrument: Optional[str] = None, @@ -293,7 +448,7 @@ class Client: kind: str = 'option', expired: bool = False - ) -> dict[str, dict]: + ) -> dict[str, Pair] | Pair: ''' Get symbol infos. @@ -305,31 +460,71 @@ class Client: params: dict[str, str] = { 'currency': currency.upper(), 'kind': kind, - 'expired': str(expired).lower() + 'expired': expired, } - resp: JSONRPCResult = await self.json_rpc( + resp: JSONRPCResult = await self._json_rpc_auth_wrapper( 'public/get_instruments', params, ) # convert to symbol-keyed table + pair_type: Pair = PAIRTYPES[kind] results: list[dict] | None = resp.result - instruments: dict[str, dict] = { - item['instrument_name'].lower(): item - for item in results - } + + instruments: dict[str, Pair] = {} + for item in results: + symbol=item['instrument_name'].lower() + try: + pair: Pair = pair_type( + symbol=symbol, + **item + ) + except Exception as e: + e.add_note( + "\nDon't panic, prolly stupid deribit changed their symbology schema again..\n" + 'Check out their API docs here:\n\n' + 'https://docs.deribit.com/?python#deribit-api-v2-1-1' + ) + raise + + instruments[symbol] = pair if instrument is not None: - return instruments[instrument] + return instruments[instrument.lower()] else: return instruments async def cache_symbols( self, - ) -> dict: + venue: MarketType = 'option', - if not self._pairs: - self._pairs = await self.symbol_info() + ) -> None: + # lookup internal mkt-specific pair table to update + pair_table: dict[str, Pair] = self._pairs + + # make API request(s) + mkt_pairs = await self.symbol_info() + + if not mkt_pairs: + raise SymbolNotFound( + f'No market pairs found!?:\n' + f'{mkt_pairs}' + ) + + pairs_view_subtable: dict[str, Pair] = {} + + for instrument in mkt_pairs: + pair_type: Pair|OptionPair = PAIRTYPES[venue] + + pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) + + pair_table[pair.symbol.upper()] = pair + + # update an additional top-level-cross-venue-table + # `._pairs: ChainMap` for search B0 + pairs_view_subtable[pair.bs_fqme] = pair + + self._pairs.maps.append(pairs_view_subtable) return self._pairs @@ -337,47 +532,47 @@ class Client: self, pattern: str, limit: int = 30, - ) -> dict[str, Any]: + ) -> dict[str, Pair]: ''' Fuzzy search symbology set for pairs matching `pattern`. ''' - pairs: dict[str, Any] = await self.symbol_info() - matches: dict[str, Pair] = match_from_pairs( + pairs: dict[str, Pair] = await self.exch_info() + + return match_from_pairs( pairs=pairs, query=pattern.upper(), score_cutoff=35, limit=limit ) - # repack in name-keyed table - return { - pair['instrument_name'].lower(): pair - for pair in matches.values() - } - async def bars( self, - symbol: str, + mkt: MktPair, + start_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None, + limit: int = 1000, as_np: bool = True, - ) -> dict: - instrument = symbol + + ) -> list[tuple] | np.ndarray: + instrument: str = mkt.bs_fqme.split('.')[0] if end_dt is None: - end_dt = pendulum.now('UTC') + end_dt = now('UTC') + _orig_start_dt = start_dt if start_dt is None: start_dt = end_dt.start_of( - 'minute').subtract(minutes=limit) + 'minute' + ).subtract(minutes=limit) - start_time = deribit_timestamp(start_dt) - end_time = deribit_timestamp(end_dt) + start_time: int = deribit_timestamp(start_dt) + end_time: int = deribit_timestamp(end_dt) # https://docs.deribit.com/#public-get_tradingview_chart_data - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'public/get_tradingview_chart_data', params={ 'instrument_name': instrument.upper(), @@ -387,36 +582,38 @@ class Client: }) result = KLinesResult(**resp.result) - new_bars = [] + new_bars: list[tuple] = [] + # if _orig_start_dt is None: + # if not new_bars: + # import tractor + # await tractor.pause() + for i in range(len(result.close)): - - _open = result.open[i] - high = result.high[i] - low = result.low[i] - close = result.close[i] - volume = result.volume[i] - row = [ (start_time + (i * (60 * 1000))) / 1000.0, # time result.open[i], result.high[i], result.low[i], result.close[i], - result.volume[i], - 0 + result.volume[i] ] new_bars.append((i,) + tuple(row)) - array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines - return array + if not as_np: + return result + + return np.array( + new_bars, + dtype=def_iohlcv_fields + ) async def last_trades( self, instrument: str, count: int = 10 ): - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'public/get_last_trades_by_instrument', params={ 'instrument_name': instrument, @@ -428,85 +625,24 @@ class Client: @acm async def get_client( - is_brokercheck: bool = False + is_brokercheck: bool = False, + venue: MarketType = 'option', ) -> Client: async with ( trio.open_nursery() as n, open_jsonrpc_session( - _testnet_ws_url, dtype=JSONRPCResult) as json_rpc + _ws_url, response_type=JSONRPCResult + ) as json_rpc ): client = Client(json_rpc) - - _refresh_token: Optional[str] = None - _access_token: Optional[str] = None - - async def _auth_loop( - task_status: TaskStatus = trio.TASK_STATUS_IGNORED - ): - """Background task that adquires a first access token and then will - refresh the access token while the nursery isn't cancelled. - - https://docs.deribit.com/?python#authentication-2 - """ - renew_time = 10 - access_scope = 'trade:read_write' - _expiry_time = time.time() - got_access = False - nonlocal _refresh_token - nonlocal _access_token - - while True: - if time.time() - _expiry_time < renew_time: - # if we are close to token expiry time - - if _refresh_token != None: - # if we have a refresh token already dont need to send - # secret - params = { - 'grant_type': 'refresh_token', - 'refresh_token': _refresh_token, - 'scope': access_scope - } - - else: - # we don't have refresh token, send secret to initialize - params = { - 'grant_type': 'client_credentials', - 'client_id': client._key_id, - 'client_secret': client._key_secret, - 'scope': access_scope - } - - resp = await json_rpc('public/auth', params) - result = resp.result - - _expiry_time = time.time() + result['expires_in'] - _refresh_token = result['refresh_token'] - - if 'access_token' in result: - _access_token = result['access_token'] - - if not got_access: - # first time this loop runs we must indicate task is - # started, we have auth - got_access = True - task_status.started() - - else: - await trio.sleep(renew_time / 2) - - # if we have client creds launch auth loop - if client._key_id is not None: - await n.start(_auth_loop) - await client.cache_symbols() yield client n.cancel_scope.cancel() @acm -async def open_feed_handler(): +async def open_feed_handler() -> FeedHandler: fh = FeedHandler(config=get_config()) yield fh await to_asyncio.run_task(fh.stop_async) @@ -523,40 +659,46 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: async def aio_price_feed_relay( fh: FeedHandler, - instrument: Symbol, + instrument: str, from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, ) -> None: - async def _trade(data: dict, receipt_timestamp): - to_trio.send_nowait(('trade', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'last': data, - 'broker_ts': time.time(), - 'data': data.to_dict(), - 'receipt': receipt_timestamp - })) + ''' + Relay price feed quotes from the `cryptofeed.FeedHandler` to + the `piker`-side `trio.task` consumers for delivery to consumer + sub-actors for various subsystems. - async def _l1(data: dict, receipt_timestamp): - to_trio.send_nowait(('l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - {'type': 'bid', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'bsize', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'ask', - 'price': float(data.ask_price), 'size': float(data.ask_size)}, - {'type': 'asize', - 'price': float(data.ask_price), 'size': float(data.ask_size)} - ] - })) + ''' + async def _trade( + trade: Trade, # cryptofeed, NOT ours from `.venues`! + receipt_timestamp: int, + ) -> None: + ''' + Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. + ''' + to_trio.send_nowait(('trade', trade)) + + async def _l1( + book: L1Book, + receipt_timestamp: int, + ) -> None: + ''' + Relay-thru "l1 book" updates. + + ''' + + to_trio.send_nowait(('l1', book)) + + # TODO, make this work! + # -[ ] why isn't this working in `tractor.pause_from_sync()`?? + # breakpoint() + + sym: Symbol = piker_sym_to_cb_sym(instrument) fh.add_feed( DERIBIT, channels=[TRADES, L1_BOOK], - symbols=[piker_sym_to_cb_sym(instrument)], + symbols=[sym], callbacks={ TRADES: _trade, L1_BOOK: _l1 @@ -565,27 +707,35 @@ async def aio_price_feed_relay( if not fh.running: fh.run( start_loop=False, - install_signal_handlers=False) + install_signal_handlers=False + ) # sync with trio to_trio.send_nowait(None) + # run until cancelled await asyncio.sleep(float('inf')) @acm async def open_price_feed( instrument: str -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( +) -> to_asyncio.LinkedTaskChannel: + + fh: FeedHandler + first: None + chan: to_asyncio.LinkedTaskChannel + async with ( + maybe_open_feed_handler() as fh, + to_asyncio.open_channel_from( partial( aio_price_feed_relay, fh, instrument ) - ) as (first, chan): - yield chan + ) as (first, chan) + ): + yield chan @acm @@ -594,12 +744,13 @@ async def maybe_open_price_feed( ) -> trio.abc.ReceiveStream: # TODO: add a predicate to maybe_open_context + feed: to_asyncio.LinkedTaskChannel async with maybe_open_context( acm_func=open_price_feed, kwargs={ - 'instrument': instrument + 'instrument': instrument.split('.')[0] }, - key=f'{instrument}-price', + key=f'{instrument.split('.')[0]}-price', ) as (cache_hit, feed): if cache_hit: yield broadcast_receiver(feed, 10) @@ -608,68 +759,69 @@ async def maybe_open_price_feed( -async def aio_order_feed_relay( - fh: FeedHandler, - instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, -) -> None: - async def _fill(data: dict, receipt_timestamp): - breakpoint() +# TODO, move all to `.broker` submod! +# async def aio_order_feed_relay( +# fh: FeedHandler, +# instrument: Symbol, +# from_trio: asyncio.Queue, +# to_trio: trio.abc.SendChannel, +# ) -> None: +# async def _fill(data: dict, receipt_timestamp): +# breakpoint() - async def _order_info(data: dict, receipt_timestamp): - breakpoint() +# async def _order_info(data: dict, receipt_timestamp): +# breakpoint() - fh.add_feed( - DERIBIT, - channels=[FILLS, ORDER_INFO], - symbols=[instrument.upper()], - callbacks={ - FILLS: _fill, - ORDER_INFO: _order_info, - }) +# fh.add_feed( +# DERIBIT, +# channels=[FILLS, ORDER_INFO], +# symbols=[instrument.upper()], +# callbacks={ +# FILLS: _fill, +# ORDER_INFO: _order_info, +# }) - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) +# if not fh.running: +# fh.run( +# start_loop=False, +# install_signal_handlers=False) - # sync with trio - to_trio.send_nowait(None) +# # sync with trio +# to_trio.send_nowait(None) - await asyncio.sleep(float('inf')) +# await asyncio.sleep(float('inf')) -@acm -async def open_order_feed( - instrument: list[str] -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( - partial( - aio_order_feed_relay, - fh, - instrument - ) - ) as (first, chan): - yield chan +# @acm +# async def open_order_feed( +# instrument: list[str] +# ) -> trio.abc.ReceiveStream: +# async with maybe_open_feed_handler() as fh: +# async with to_asyncio.open_channel_from( +# partial( +# aio_order_feed_relay, +# fh, +# instrument +# ) +# ) as (first, chan): +# yield chan -@acm -async def maybe_open_order_feed( - instrument: str -) -> trio.abc.ReceiveStream: +# @acm +# async def maybe_open_order_feed( +# instrument: str +# ) -> trio.abc.ReceiveStream: - # TODO: add a predicate to maybe_open_context - async with maybe_open_context( - acm_func=open_order_feed, - kwargs={ - 'instrument': instrument, - 'fh': fh - }, - key=f'{instrument}-order', - ) as (cache_hit, feed): - if cache_hit: - yield broadcast_receiver(feed, 10) - else: - yield feed +# # TODO: add a predicate to maybe_open_context +# async with maybe_open_context( +# acm_func=open_order_feed, +# kwargs={ +# 'instrument': instrument.split('.')[0], +# 'fh': fh +# }, +# key=f'{instrument.split('.')[0]}-order', +# ) as (cache_hit, feed): +# if cache_hit: +# yield broadcast_receiver(feed, 10) +# else: +# yield feed diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 821aab87..efd43ea5 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -18,38 +18,59 @@ Deribit backend. ''' +from __future__ import annotations from contextlib import asynccontextmanager as acm from datetime import datetime -from typing import Any, Optional, Callable +from typing import ( + # Any, + # Optional, + Callable, +) +# from pprint import pformat import time +import cryptofeed import trio from trio_typing import TaskStatus -import pendulum -from rapidfuzz import process as fuzzy +from pendulum import ( + from_timestamp, +) import numpy as np import tractor -from piker.brokers import open_cached_client -from piker.log import get_logger, get_console_log -from piker.data import ShmArray -from piker.brokers._util import ( - BrokerError, +from piker.accounting import ( + Asset, + MktPair, + unpack_fqme, +) +from piker.brokers import ( + open_cached_client, + NoData, DataUnavailable, ) - -from cryptofeed import FeedHandler -from cryptofeed.defines import ( - DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT +from piker._cacheables import ( + async_lifo_cache, ) -from cryptofeed.symbols import Symbol +from piker.log import ( + get_logger, + mk_repr, +) +from piker.data.validate import FeedInit + from .api import ( - Client, Trade, - get_config, - str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + Client, + # get_config, + piker_sym_to_cb_sym, + cb_sym_to_deribit_inst, + str_to_cb_sym, maybe_open_price_feed ) +from .venues import ( + Pair, + OptionPair, + Trade, +) _spawn_kwargs = { 'infect_asyncio': True, @@ -64,90 +85,215 @@ async def open_history_client( mkt: MktPair, ) -> tuple[Callable, int]: - fnstrument: str = mkt.bs_fqme # TODO implement history getter for the new storage layer. async with open_cached_client('deribit') as client: + pair: OptionPair = client._pairs[mkt.dst.name] + # XXX NOTE, the cuckers use ms !!! + creation_time_s: int = pair.creation_timestamp/1000 + async def get_ohlc( - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, + timeframe: float, + end_dt: datetime | None = None, + start_dt: datetime | None = None, ) -> tuple[ np.ndarray, datetime, # start datetime, # end ]: + if timeframe != 60: + raise DataUnavailable('Only 1m bars are supported') - array = await client.bars( - instrument, + array: np.ndarray = await client.bars( + mkt, start_dt=start_dt, end_dt=end_dt, ) if len(array) == 0: - raise DataUnavailable + if ( + end_dt is None + ): + raise DataUnavailable( + 'No history seems to exist yet?\n\n' + f'{mkt}' + ) + elif ( + end_dt + and + end_dt.timestamp() < creation_time_s + ): + # the contract can't have history + # before it was created. + pair_type_str: str = type(pair).__name__ + create_dt: datetime = from_timestamp(creation_time_s) + raise DataUnavailable( + f'No history prior to\n' + f'`{pair_type_str}.creation_timestamp: int = ' + f'{pair.creation_timestamp}\n\n' + f'------ deribit sux ------\n' + f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n' + f'creation_time_s: {creation_time_s}\n' + f'create_dt: {create_dt}\n' + ) + raise NoData( + f'No frame for {start_dt} -> {end_dt}\n' + ) - start_dt = pendulum.from_timestamp(array[0]['time']) - end_dt = pendulum.from_timestamp(array[-1]['time']) + start_dt = from_timestamp(array[0]['time']) + end_dt = from_timestamp(array[-1]['time']) + + times = array['time'] + if not times.any(): + raise ValueError( + 'Bad frame with null-times?\n\n' + f'{times}' + ) + + if end_dt is None: + inow: int = round(time.time()) + if (inow - times[-1]) > 60: + await tractor.pause() return array, start_dt, end_dt - yield get_ohlc, {'erlangs': 3, 'rate': 3} + yield ( + get_ohlc, + { # backfill config + 'erlangs': 3, + 'rate': 3, + } + ) + + +@async_lifo_cache() +async def get_mkt_info( + fqme: str, + +) -> tuple[MktPair, Pair|OptionPair] | None: + + # uppercase since kraken bs_mktid is always upper + if 'deribit' not in fqme.lower(): + fqme += '.deribit' + + mkt_mode: str = '' + broker, mkt_ep, venue, expiry = unpack_fqme(fqme) + + # NOTE: we always upper case all tokens to be consistent with + # binance's symbology style for pairs, like `BTCUSDT`, but in + # theory we could also just keep things lower case; as long as + # we're consistent and the symcache matches whatever this func + # returns, always! + expiry: str = expiry.upper() + venue: str = venue.upper() + # venue_lower: str = venue.lower() + + mkt_mode: str = 'option' + + async with open_cached_client( + 'deribit', + ) as client: + + assets: dict[str, Asset] = await client.get_assets() + pair_str: str = mkt_ep.lower() + + pair: Pair = await client.exch_info( + sym=pair_str, + ) + mkt_mode = pair.venue + client.mkt_mode = mkt_mode + + dst: Asset | None = assets.get(pair.bs_dst_asset) + src: Asset | None = assets.get(pair.bs_src_asset) + + mkt = MktPair( + dst=dst, + src=src, + price_tick=pair.price_tick, + size_tick=pair.size_tick, + bs_mktid=pair.symbol, + venue=mkt_mode, + broker='deribit', + _atype=mkt_mode, + _fqme_without_src=True, + + # expiry=pair.expiry, + # XXX TODO, currently we don't use it since it's + # already "described" in the `OptionPair.symbol: str` + # and if we slap in the ISO repr it's kinda hideous.. + # -[ ] figure out the best either std + ) + return mkt, pair async def stream_quotes( - send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, - loglevel: str = None, # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) + ''' + Open a live quote stream for the market set defined by `symbols`. - sym = symbols[0] + Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side + task and relays through L1 and `Trade` msgs here to our `trio.Task`. + + ''' + sym = symbols[0].split('.')[0] + init_msgs: list[FeedInit] = [] + + # multiline nested `dict` formatter (since rn quote-msgs are + # just that). + pfmt: Callable[[str], str] = mk_repr( + # so we can see `deribit`'s delightfully mega-long bs fields.. + maxstring=100, + ) async with ( open_cached_client('deribit') as client, send_chan as send_chan ): + mkt: MktPair + pair: Pair + mkt, pair = await get_mkt_info(sym) - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': { - 'asset_type': 'option', - 'price_tick_size': 0.0005 - }, - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, - }, - } + # build out init msgs according to latest spec + init_msgs.append( + FeedInit( + mkt_info=mkt, + ) + ) + # build `cryptofeed` feed-handle + cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) - nsym = piker_sym_to_cb_sym(sym) + from_cf: tractor.to_asyncio.LinkedTaskChannel + async with maybe_open_price_feed(sym) as from_cf: - async with maybe_open_price_feed(sym) as stream: + # load the "last trades" summary + last_trades_res: cryptofeed.LastTradesResult = await client.last_trades( + cb_sym_to_deribit_inst(cf_sym), + count=1, + ) + last_trades: list[Trade] = last_trades_res.trades - cache = await client.cache_symbols() + # TODO, do we even need this or will the above always + # work? + # if not last_trades: + # await tractor.pause() + # async for typ, quote in from_cf: + # if typ == 'trade': + # last_trade = Trade(**(quote['data'])) + # break - last_trades = (await client.last_trades( - cb_sym_to_deribit_inst(nsym), count=1)).trades + # else: + last_trade = Trade( + **(last_trades[0]) + ) - if len(last_trades) == 0: - last_trade = None - async for typ, quote in stream: - if typ == 'trade': - last_trade = Trade(**(quote['data'])) - break - - else: - last_trade = Trade(**(last_trades[0])) - - first_quote = { + first_quote: dict = { 'symbol': sym, 'last': last_trade.price, 'brokerd_ts': last_trade.timestamp, @@ -158,13 +304,84 @@ async def stream_quotes( 'broker_ts': last_trade.timestamp }] } - task_status.started((init_msgs, first_quote)) + task_status.started(( + init_msgs, + first_quote, + )) feed_is_live.set() - async for typ, quote in stream: - topic = quote['symbol'] - await send_chan.send({topic: quote}) + # NOTE XXX, static for now! + # => since this only handles ONE mkt feed at a time we + # don't need a lookup table to map interleaved quotes + # from multiple possible mkt-pairs + topic: str = mkt.bs_fqme + + # deliver until cancelled + async for typ, ref in from_cf: + match typ: + case 'trade': + trade: cryptofeed.types.Trade = ref + + # TODO, re-impl this according to teh ideal + # fqme for opts that we choose!! + bs_fqme: str = cb_sym_to_deribit_inst( + str_to_cb_sym(trade.symbol) + ).lower() + + piker_quote: dict = { + 'symbol': bs_fqme, + 'last': trade.price, + 'broker_ts': time.time(), + # ^TODO, name this `brokerd/datad_ts` and + # use `time.time_ns()` ?? + 'ticks': [{ + 'type': 'trade', + 'price': float(trade.price), + 'size': float(trade.amount), + 'broker_ts': trade.timestamp, + }], + } + log.info( + f'deribit {typ!r} quote for {sym!r}\n\n' + f'{trade}\n\n' + f'{pfmt(piker_quote)}\n' + ) + + case 'l1': + book: cryptofeed.types.L1Book = ref + + # TODO, so this is where we can possibly change things + # and instead lever the `MktPair.bs_fqme: str` output? + bs_fqme: str = cb_sym_to_deribit_inst( + str_to_cb_sym(book.symbol) + ).lower() + + piker_quote: dict = { + 'symbol': bs_fqme, + 'ticks': [ + + {'type': 'bid', + 'price': float(book.bid_price), + 'size': float(book.bid_size)}, + + {'type': 'bsize', + 'price': float(book.bid_price), + 'size': float(book.bid_size),}, + + {'type': 'ask', + 'price': float(book.ask_price), + 'size': float(book.ask_size),}, + + {'type': 'asize', + 'price': float(book.ask_price), + 'size': float(book.ask_size),} + ] + } + + await send_chan.send({ + topic: piker_quote, + }) @tractor.context @@ -174,12 +391,21 @@ async def open_symbol_search( async with open_cached_client('deribit') as client: # load all symbols locally for fast search - cache = await client.cache_symbols() + # cache = client._pairs await ctx.started() async with ctx.open_stream() as stream: - + pattern: str async for pattern in stream: - # repack in dict form - await stream.send( - await client.search_symbols(pattern)) + + # NOTE: pattern fuzzy-matching is done within + # the methd impl. + pairs: dict[str, Pair] = await client.search_symbols( + pattern, + ) + # repack in fqme-keyed table + byfqme: dict[str, Pair] = {} + for pair in pairs.values(): + byfqme[pair.bs_fqme] = pair + + await stream.send(byfqme) diff --git a/piker/brokers/deribit/venues.py b/piker/brokers/deribit/venues.py new file mode 100644 index 00000000..0dda913e --- /dev/null +++ b/piker/brokers/deribit/venues.py @@ -0,0 +1,196 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Per market data-type definitions and schemas types. + +""" +from __future__ import annotations +import pendulum +from typing import ( + Literal, + Optional, +) +from decimal import Decimal + +from piker.types import Struct + + +# API endpoint paths by venue / sub-API +_domain: str = 'deribit.com' +_url = f'https://www.{_domain}' + +# WEBsocketz +_ws_url: str = f'wss://www.{_domain}/ws/api/v2' + +# test nets +_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2' + +MarketType = Literal[ + 'option' +] + + +def get_api_eps(venue: MarketType) -> tuple[str, str]: + ''' + Return API ep root paths per venue. + + ''' + return { + 'option': ( + _ws_url, + ), + }[venue] + + +class Pair(Struct, frozen=True, kw_only=True): + + symbol: str + + # src + quote_currency: str # 'BTC' + + # dst + base_currency: str # "BTC", + + tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}] + tick_size_steps: list[dict[str, float]] + + @property + def price_tick(self) -> Decimal: + return Decimal(str(self.tick_size_steps[0]['above_price'])) + + @property + def size_tick(self) -> Decimal: + return Decimal(str(self.tick_size)) + + @property + def bs_fqme(self) -> str: + return f'{self.symbol}' + + @property + def bs_mktid(self) -> str: + return f'{self.symbol}.{self.venue}' + + +class OptionPair(Pair, frozen=True): + + taker_commission: float # 0.0003 + strike: float # 5000.0 + settlement_period: str # 'day' + settlement_currency: str # "BTC", + rfq: bool # false + price_index: str # 'btc_usd' + option_type: str # 'call' + min_trade_amount: float # 0.1 + maker_commission: float # 0.0003 + kind: str # 'option' + is_active: bool # true + instrument_type: str # 'reversed' + instrument_name: str # 'BTC-1SEP24-55000-C' + instrument_id: int # 364671 + expiration_timestamp: int # 1725177600000 + creation_timestamp: int # 1724918461000 + counter_currency: str # 'USD' + contract_size: float # '1.0' + block_trade_tick_size: float # '0.0001' + block_trade_min_trade_amount: int # '25' + block_trade_commission: float # '0.003' + + # NOTE: see `.data._symcache.SymbologyCache.load()` for why + ns_path: str = 'piker.brokers.deribit:OptionPair' + + # TODO, impl this without the MM:SS part of + # the `'THH:MM:SS..'` etc.. + @property + def expiry(self) -> str: + iso_date = pendulum.from_timestamp( + self.expiration_timestamp / 1000 + ).isoformat() + return iso_date + + @property + def venue(self) -> str: + return f'{self.instrument_type}_option' + + @property + def bs_fqme(self) -> str: + return f'{self.symbol}' + + @property + def bs_src_asset(self) -> str: + return f'{self.quote_currency}' + + @property + def bs_dst_asset(self) -> str: + return f'{self.symbol}' + + +PAIRTYPES: dict[MarketType, Pair] = { + 'option': OptionPair, +} + + +class JSONRPCResult(Struct): + id: int + usIn: int + usOut: int + usDiff: int + testnet: bool + jsonrpc: str = '2.0' + error: Optional[dict] = None + result: Optional[list[dict]] = None + + +class JSONRPCChannel(Struct): + method: str + params: dict + jsonrpc: str = '2.0' + + +class KLinesResult(Struct): + low: list[float] + cost: list[float] + high: list[float] + open: list[float] + close: list[float] + ticks: list[int] + status: str + volume: list[float] + + +class Trade(Struct): + iv: float + price: float + amount: float + trade_id: str + contracts: float + direction: str + trade_seq: int + timestamp: int + mark_price: float + index_price: float + tick_direction: int + instrument_name: str + combo_id: Optional[str] = '', + combo_trade_id: Optional[int] = 0, + block_trade_id: Optional[str] = '', + block_trade_leg_count: Optional[int] = 0, + + +class LastTradesResult(Struct): + trades: list[Trade] + has_more: bool diff --git a/piker/log.py b/piker/log.py index 56776e1e..7f554f16 100644 --- a/piker/log.py +++ b/piker/log.py @@ -18,7 +18,11 @@ Log like a forester! """ import logging +import reprlib import json +from typing import ( + Callable, +) import tractor from pygments import ( @@ -84,3 +88,27 @@ def colorize_json( # likeable styles: algol_nu, tango, monokai formatters.TerminalTrueColorFormatter(style=style) ) + + +def mk_repr( + **repr_kws, +) -> Callable[[str], str]: + ''' + Allocate and deliver a `repr.Repr` instance with provided input + settings using the std-lib's `reprlib` mod, + * https://docs.python.org/3/library/reprlib.html + + ------ Ex. ------ + An up to 6-layer-nested `dict` as multi-line: + - https://stackoverflow.com/a/79102479 + - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel + + ''' + def_kws: dict[str, int] = dict( + indent=2, + maxlevel=6, # recursion levels + maxstring=66, # match editor line-len limit + ) + def_kws |= repr_kws + reprr = reprlib.Repr(**def_kws) + return reprr.repr