From d981b87ea1c7374f28abcce0849e623022c18016 Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Tue, 26 Nov 2024 15:10:24 -0300 Subject: [PATCH] Here are the necessary fuctions to fetch the data from deribit, using cryptofeed library, this functions are located in the deribit's api module. Changes: 1. Add and fix auxiliar functions for handling cryptofeed data. 2. Add aio open interest functions for context management and cryptofeed conexions. 3. Some typos and format fixes too. --- examples/derivs/__init__.py | 0 examples/derivs/max_pain.py | 139 +++++++++++++++++++ piker/brokers/deribit/api.py | 250 ++++++++++++++++++++++++++++++++--- 3 files changed, 367 insertions(+), 22 deletions(-) create mode 100644 examples/derivs/__init__.py create mode 100644 examples/derivs/max_pain.py diff --git a/examples/derivs/__init__.py b/examples/derivs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/derivs/max_pain.py b/examples/derivs/max_pain.py new file mode 100644 index 00000000..03232397 --- /dev/null +++ b/examples/derivs/max_pain.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python +from decimal import ( + Decimal, +) +import trio +import tractor +from datetime import datetime +from pprint import pformat +from piker.brokers.deribit.api import ( + get_client, + maybe_open_oi_feed, +) + +def check_if_complete( + oi: dict[str, dict[str, Decimal | None]] + ) -> bool: + return all( + oi[strike]['C'] is not None + and + oi[strike]['P'] is not None for strike in oi + ) + + +async def max_pain_daemon( +) -> None: + oi_by_strikes: dict[str, dict[str, Decimal | None]] + expiry_dates: list[str] + currency: str = 'btc' + kind: str = 'option' + + async with get_client( + ) as client: + expiry_dates: list[str] = await client.get_expiration_dates( + currency=currency, + kind=kind + ) + + print(f'Available expiration dates for {currency}-{kind}:') + print(f'{expiry_dates}') + expiry_date: str = input('Please enter a valid expiration date: ').upper() + print('Starting little daemon...') + instruments: list[Symbol] = [] + oi_by_strikes: dict[str, dict[str, Decimal]] + + def update_oi_by_strikes(msg: tuple): + nonlocal oi_by_strikes + if 'oi' == msg[0]: + strike_price = msg[1]['strike_price'] + option_type = msg[1]['option_type'] + open_interest = msg[1]['open_interest'] + oi_by_strikes.setdefault( + strike_price, {} + ).update( + {option_type: open_interest} + ) + + def get_max_pain( + oi_by_strikes: dict[str, dict[str, Decimal]] + ) -> dict[str, str | Decimal]: + ''' + This method requires only the strike_prices and oi for call + and puts, the closes list are the same as the strike_prices + the idea is to sum all the calls and puts cash for each strike + and the ITM strikes from that strike, the lowest value is what we + are looking for the intrinsic value. + + ''' + + nonlocal timestamp + # We meed to find the lowest value, so we start at + # infinity to ensure that, and the max_pain must be + # an amount greater than zero. + total_intrinsic_value: Decimal = Decimal('Infinity') + max_pain: Decimal = Decimal(0) + call_cash: Decimal = Decimal(0) + put_cash: Decimal = Decimal(0) + intrinsic_values: dict[str, dict[str, Decimal]] = {} + closes: list = sorted(Decimal(close) for close in oi_by_strikes) + + for strike, oi in oi_by_strikes.items(): + s = Decimal(strike) + call_cash = sum(max(0, (s - c) * oi_by_strikes[str(c)]['C']) for c in closes) + put_cash = sum(max(0, (c - s) * oi_by_strikes[str(c)]['P']) for c in closes) + + intrinsic_values[strike] = { + 'C': call_cash, + 'P': put_cash, + 'total': call_cash + put_cash, + } + + if intrinsic_values[strike]['total'] < total_intrinsic_value: + total_intrinsic_value = intrinsic_values[strike]['total'] + max_pain = s + + return { + 'timestamp': timestamp, + 'expiry_date': expiry_date, + 'total_intrinsic_value': total_intrinsic_value, + 'max_pain': max_pain, + } + + async with get_client( + ) as client: + instruments = await client.get_instruments( + expiry_date=expiry_date, + ) + oi_by_strikes = client.get_strikes_dict(instruments) + + async with maybe_open_oi_feed( + instruments, + ) as oi_feed: + async for msg in oi_feed: + + update_oi_by_strikes(msg) + if check_if_complete(oi_by_strikes): + if 'oi' == msg[0]: + timestamp = msg[1]['timestamp'] + max_pain = get_max_pain(oi_by_strikes) + print('-----------------------------------------------') + print(f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}') + print(f'expiry_date: {max_pain['expiry_date']}') + print(f'max_pain: {max_pain['max_pain']}') + print(f'total intrinsic value: {max_pain['total_intrinsic_value']}') + print('-----------------------------------------------') + + +async def main(): + + async with tractor.open_nursery() as n: + + p: tractor.Portal = await n.start_actor( + 'max_pain_daemon', + enable_modules=[__name__], + infect_asyncio=True, + ) + await p.run(max_pain_daemon) + +if __name__ == '__main__': + trio.run(main) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index f846a5c0..2f73856b 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -52,12 +52,14 @@ from cryptofeed import FeedHandler from cryptofeed.defines import ( DERIBIT, L1_BOOK, TRADES, - OPTION, CALL, PUT + OPTION, CALL, PUT, + OPEN_INTEREST, ) from cryptofeed.symbols import Symbol from cryptofeed.types import ( L1Book, Trade, + OpenInterest, ) from piker.brokers import SymbolNotFound from .venues import ( @@ -110,6 +112,10 @@ def deribit_timestamp(when: datetime) -> int: ) +def get_timestamp_int(expiry_date: str) -> int: + return int(time.mktime(time.strptime(expiry_date, '%d%b%y'))) + + def str_to_cb_sym(name: str) -> Symbol: base, strike_price, expiry_date, option_type = name.split('-') @@ -117,13 +123,14 @@ def str_to_cb_sym(name: str) -> Symbol: if option_type == 'put': option_type = PUT - elif option_type == 'call': + elif option_type == 'call': option_type = CALL else: raise Exception("Couldn\'t parse option type") - new_expiry_date = get_values_from_cb_normalized_date(expiry_date) - + new_expiry_date: int = get_timestamp_int( + get_values_from_cb_normalized_date(expiry_date) + ) return Symbol( base=base, quote=quote, @@ -143,11 +150,12 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: )= tuple( name.upper().split('-')) + new_expiry_date = get_timestamp_int(expiry_date) quote: str = base - if option_type == 'P': + if option_type == 'P' or option_type == 'PUT': option_type = PUT - elif option_type == 'C': + elif option_type == 'C' or option_type == 'CALL': option_type = CALL else: raise Exception("Couldn\'t parse option type") @@ -158,7 +166,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=expiry_date + expiry_date=new_expiry_date ) @@ -226,16 +234,18 @@ def get_config() -> dict[str, Any]: ) conf_option = section.get('option', {}) - section.clear # clear the dict to reuse it - section['deribit'] = {} - section['deribit']['key_id'] = conf_option.get('api_key') - section['deribit']['key_secret'] = conf_option.get('api_secret') - - section['log'] = {} - section['log']['filename'] = 'feedhandler.log' - section['log']['level'] = 'DEBUG' - - return section + conf_log = conf_option.get('log', {}) + return { + 'deribit': { + 'key_id': conf_option['key_id'], + 'key_secret': conf_option['key_secret'], + }, + 'log': { + 'filename': conf_log['filename'], + 'level': conf_log['level'], + 'disabled': conf_log['disabled'], + } + } class Client: @@ -311,6 +321,20 @@ class Client: return balances + async def get_currencies( + self, + + ) -> list[dict]: + ''' + Return the set of currencies for deribit. + ''' + assets = {} + resp = await self._json_rpc_auth_wrapper( + 'public/get_currencies', + params={} + ) + return resp.result + async def get_assets( self, venue: str | None = None, @@ -323,11 +347,7 @@ class Client: ''' assets = {} - resp = await self._json_rpc_auth_wrapper( - 'public/get_currencies', - params={} - ) - currencies: list[dict] = resp.result + currencies = await self.get_currencies() for currency in currencies: name: str = currency['currency'] tx_tick: Decimal = digits_to_dec(currency['fee_precision']) @@ -359,6 +379,82 @@ class Client: return flat + async def get_instruments( + self, + currency: str = 'btc', + kind: str = 'option', + expired: bool = False, + expiry_date: str = None, + + ) -> list[Symbol]: + """ + Get instruments for cryptoFeed.FeedHandler. + """ + params: dict[str, str] = { + 'currency': currency.upper(), + 'kind': kind, + 'expired': expired, + } + + r: JSONRPCResult = await self._json_rpc_auth_wrapper( + 'public/get_instruments', + params, + ) + resp = r.result + response_list = [] + + for i in range(len(resp)): + element = resp[i] + name = f'{element["instrument_name"].split("-")[1]}' + if not expiry_date or name == expiry_date.upper(): + response_list.append(piker_sym_to_cb_sym(element['instrument_name'])) + + return response_list + + async def get_expiration_dates( + self, + currency: str = 'btc', + kind: str = 'option', + + ) -> list[str]: + """ + Get a dict with all expiration dates listed as value and currency as key. + """ + + params: dict[str, str] = { + 'currency': currency.upper(), + 'kind': kind, + } + + r: JSONRPCResult = await self._json_rpc_auth_wrapper( + 'public/get_expirations', + params, + ) + resp = r.result + + return resp[currency][kind] + + def get_strikes_dict( + self, + instruments: list[Symbol], + + ) -> dict[str, dict[str, Decimal | None]]: + """ + Get a dict with strike prices as keys. + """ + + response: dict[str, dict[str, Decimal | None]] = {} + + for i in range(len(instruments)): + element = instruments[i] + strike = f'{str(element).split('-')[1]}' + response[f'{strike}'] = { + 'C': None, + 'P': None, + } + + return response + async def submit_limit( self, symbol: str, @@ -738,6 +834,116 @@ async def maybe_open_price_feed( yield feed +async def aio_open_interest_feed_relay( + fh: FeedHandler, + instruments: list[Symbol], + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> None: + async def _trade( + trade: Trade, # cryptofeed, NOT ours from `.venues`! + receipt_timestamp: int, + ) -> None: + ''' + Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. + + ''' + to_trio.send_nowait(('trade', trade)) + + # trade and oi are user defined functions that + # will be called when trade and open interest updates are received + # data type is not dict, is an object: cryptofeed.types.OpenINterest + async def _oi( + oi: OpenInterest, + receipt_timestamp: int, + ) -> None: + ''' + Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side. + + ''' + symbol: Symbol = str_to_cb_sym(oi.symbol) + piker_sym: str = cb_sym_to_deribit_inst(symbol) + ( + base, + expiry_date, + strike_price, + option_type + ) = tuple( + piker_sym.split('-') + ) + msg = { + 'timestamp': oi.timestamp, + 'strike_price': strike_price, + 'option_type': option_type, + 'open_interest': Decimal(oi.open_interest), + } + to_trio.send_nowait(('oi', msg)) + + + channels = [TRADES, OPEN_INTEREST] + callbacks={TRADES: _trade, OPEN_INTEREST: _oi} + + fh.add_feed( + DERIBIT, + channels=channels, + symbols=instruments, + callbacks=callbacks + ) + + if not fh.running: + fh.run( + start_loop=False, + install_signal_handlers=False + ) + + # sync with trio + to_trio.send_nowait(None) + + # run until cancelled + await asyncio.sleep(float('inf')) + + +@acm +async def open_oi_feed( + instruments: list[Symbol], +) -> to_asyncio.LinkedTaskChannel: + + fh: FeedHandler + first: None + chan: to_asyncio.LinkedTaskChannel + async with ( + maybe_open_feed_handler() as fh, + to_asyncio.open_channel_from( + partial( + aio_open_interest_feed_relay, + fh, + instruments, + ) + ) as (first, chan) + ): + yield chan + + +@acm +async def maybe_open_oi_feed( + instruments: list[Symbol], +) -> trio.abc.ReceiveStream: + + # TODO: add a predicate to maybe_open_context + feed: to_asyncio.LinkedTaskChannel + async with maybe_open_context( + acm_func=open_oi_feed, + kwargs={ + 'instruments': instruments + }, + key=f'{instruments[0].base}', + + ) as (cache_hit, feed): + if cache_hit: + yield broadcast_receiver(feed, 10) + else: + yield feed + # TODO, move all to `.broker` submod! # async def aio_order_feed_relay(