Here are the necessary fuctions to fetch the data

from deribit, using cryptofeed library, this functions are located
in the deribit's api module.

Changes:
1. Add and fix auxiliar functions for handling cryptofeed data.
2. Add aio open interest functions for context management and cryptofeed conexions.
3. Some typos and format fixes too.
Nelson Torres 2024-11-26 15:10:24 -03:00
parent dc2b255548
commit d981b87ea1
3 changed files with 367 additions and 22 deletions

View File

View File

@ -0,0 +1,139 @@
#!/usr/bin/env python
from decimal import (
Decimal,
)
import trio
import tractor
from datetime import datetime
from pprint import pformat
from piker.brokers.deribit.api import (
get_client,
maybe_open_oi_feed,
)
def check_if_complete(
oi: dict[str, dict[str, Decimal | None]]
) -> bool:
return all(
oi[strike]['C'] is not None
and
oi[strike]['P'] is not None for strike in oi
)
async def max_pain_daemon(
) -> None:
oi_by_strikes: dict[str, dict[str, Decimal | None]]
expiry_dates: list[str]
currency: str = 'btc'
kind: str = 'option'
async with get_client(
) as client:
expiry_dates: list[str] = await client.get_expiration_dates(
currency=currency,
kind=kind
)
print(f'Available expiration dates for {currency}-{kind}:')
print(f'{expiry_dates}')
expiry_date: str = input('Please enter a valid expiration date: ').upper()
print('Starting little daemon...')
instruments: list[Symbol] = []
oi_by_strikes: dict[str, dict[str, Decimal]]
def update_oi_by_strikes(msg: tuple):
nonlocal oi_by_strikes
if 'oi' == msg[0]:
strike_price = msg[1]['strike_price']
option_type = msg[1]['option_type']
open_interest = msg[1]['open_interest']
oi_by_strikes.setdefault(
strike_price, {}
).update(
{option_type: open_interest}
)
def get_max_pain(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, str | Decimal]:
'''
This method requires only the strike_prices and oi for call
and puts, the closes list are the same as the strike_prices
the idea is to sum all the calls and puts cash for each strike
and the ITM strikes from that strike, the lowest value is what we
are looking for the intrinsic value.
'''
nonlocal timestamp
# We meed to find the lowest value, so we start at
# infinity to ensure that, and the max_pain must be
# an amount greater than zero.
total_intrinsic_value: Decimal = Decimal('Infinity')
max_pain: Decimal = Decimal(0)
call_cash: Decimal = Decimal(0)
put_cash: Decimal = Decimal(0)
intrinsic_values: dict[str, dict[str, Decimal]] = {}
closes: list = sorted(Decimal(close) for close in oi_by_strikes)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
call_cash = sum(max(0, (s - c) * oi_by_strikes[str(c)]['C']) for c in closes)
put_cash = sum(max(0, (c - s) * oi_by_strikes[str(c)]['P']) for c in closes)
intrinsic_values[strike] = {
'C': call_cash,
'P': put_cash,
'total': call_cash + put_cash,
}
if intrinsic_values[strike]['total'] < total_intrinsic_value:
total_intrinsic_value = intrinsic_values[strike]['total']
max_pain = s
return {
'timestamp': timestamp,
'expiry_date': expiry_date,
'total_intrinsic_value': total_intrinsic_value,
'max_pain': max_pain,
}
async with get_client(
) as client:
instruments = await client.get_instruments(
expiry_date=expiry_date,
)
oi_by_strikes = client.get_strikes_dict(instruments)
async with maybe_open_oi_feed(
instruments,
) as oi_feed:
async for msg in oi_feed:
update_oi_by_strikes(msg)
if check_if_complete(oi_by_strikes):
if 'oi' == msg[0]:
timestamp = msg[1]['timestamp']
max_pain = get_max_pain(oi_by_strikes)
print('-----------------------------------------------')
print(f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}')
print(f'expiry_date: {max_pain['expiry_date']}')
print(f'max_pain: {max_pain['max_pain']}')
print(f'total intrinsic value: {max_pain['total_intrinsic_value']}')
print('-----------------------------------------------')
async def main():
async with tractor.open_nursery() as n:
p: tractor.Portal = await n.start_actor(
'max_pain_daemon',
enable_modules=[__name__],
infect_asyncio=True,
)
await p.run(max_pain_daemon)
if __name__ == '__main__':
trio.run(main)

View File

@ -52,12 +52,14 @@ from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT,
L1_BOOK, TRADES,
OPTION, CALL, PUT
OPTION, CALL, PUT,
OPEN_INTEREST,
)
from cryptofeed.symbols import Symbol
from cryptofeed.types import (
L1Book,
Trade,
OpenInterest,
)
from piker.brokers import SymbolNotFound
from .venues import (
@ -110,6 +112,10 @@ def deribit_timestamp(when: datetime) -> int:
)
def get_timestamp_int(expiry_date: str) -> int:
return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
@ -117,13 +123,14 @@ def str_to_cb_sym(name: str) -> Symbol:
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
elif option_type == 'call':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
new_expiry_date: int = get_timestamp_int(
get_values_from_cb_normalized_date(expiry_date)
)
return Symbol(
base=base,
quote=quote,
@ -143,11 +150,12 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
)= tuple(
name.upper().split('-'))
new_expiry_date = get_timestamp_int(expiry_date)
quote: str = base
if option_type == 'P':
if option_type == 'P' or option_type == 'PUT':
option_type = PUT
elif option_type == 'C':
elif option_type == 'C' or option_type == 'CALL':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
@ -158,7 +166,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date
expiry_date=new_expiry_date
)
@ -226,16 +234,18 @@ def get_config() -> dict[str, Any]:
)
conf_option = section.get('option', {})
section.clear # clear the dict to reuse it
section['deribit'] = {}
section['deribit']['key_id'] = conf_option.get('api_key')
section['deribit']['key_secret'] = conf_option.get('api_secret')
section['log'] = {}
section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG'
return section
conf_log = conf_option.get('log', {})
return {
'deribit': {
'key_id': conf_option['key_id'],
'key_secret': conf_option['key_secret'],
},
'log': {
'filename': conf_log['filename'],
'level': conf_log['level'],
'disabled': conf_log['disabled'],
}
}
class Client:
@ -311,6 +321,20 @@ class Client:
return balances
async def get_currencies(
self,
) -> list[dict]:
'''
Return the set of currencies for deribit.
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
return resp.result
async def get_assets(
self,
venue: str | None = None,
@ -323,11 +347,7 @@ class Client:
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
currencies: list[dict] = resp.result
currencies = await self.get_currencies()
for currency in currencies:
name: str = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
@ -359,6 +379,82 @@ class Client:
return flat
async def get_instruments(
self,
currency: str = 'btc',
kind: str = 'option',
expired: bool = False,
expiry_date: str = None,
) -> list[Symbol]:
"""
Get instruments for cryptoFeed.FeedHandler.
"""
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
'expired': expired,
}
r: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_instruments',
params,
)
resp = r.result
response_list = []
for i in range(len(resp)):
element = resp[i]
name = f'{element["instrument_name"].split("-")[1]}'
if not expiry_date or name == expiry_date.upper():
response_list.append(piker_sym_to_cb_sym(element['instrument_name']))
return response_list
async def get_expiration_dates(
self,
currency: str = 'btc',
kind: str = 'option',
) -> list[str]:
"""
Get a dict with all expiration dates listed as value and currency as key.
"""
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
}
r: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_expirations',
params,
)
resp = r.result
return resp[currency][kind]
def get_strikes_dict(
self,
instruments: list[Symbol],
) -> dict[str, dict[str, Decimal | None]]:
"""
Get a dict with strike prices as keys.
"""
response: dict[str, dict[str, Decimal | None]] = {}
for i in range(len(instruments)):
element = instruments[i]
strike = f'{str(element).split('-')[1]}'
response[f'{strike}'] = {
'C': None,
'P': None,
}
return response
async def submit_limit(
self,
symbol: str,
@ -738,6 +834,116 @@ async def maybe_open_price_feed(
yield feed
async def aio_open_interest_feed_relay(
fh: FeedHandler,
instruments: list[Symbol],
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(
trade: Trade, # cryptofeed, NOT ours from `.venues`!
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
to_trio.send_nowait(('trade', trade))
# trade and oi are user defined functions that
# will be called when trade and open interest updates are received
# data type is not dict, is an object: cryptofeed.types.OpenINterest
async def _oi(
oi: OpenInterest,
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side.
'''
symbol: Symbol = str_to_cb_sym(oi.symbol)
piker_sym: str = cb_sym_to_deribit_inst(symbol)
(
base,
expiry_date,
strike_price,
option_type
) = tuple(
piker_sym.split('-')
)
msg = {
'timestamp': oi.timestamp,
'strike_price': strike_price,
'option_type': option_type,
'open_interest': Decimal(oi.open_interest),
}
to_trio.send_nowait(('oi', msg))
channels = [TRADES, OPEN_INTEREST]
callbacks={TRADES: _trade, OPEN_INTEREST: _oi}
fh.add_feed(
DERIBIT,
channels=channels,
symbols=instruments,
callbacks=callbacks
)
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False
)
# sync with trio
to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf'))
@acm
async def open_oi_feed(
instruments: list[Symbol],
) -> to_asyncio.LinkedTaskChannel:
fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial(
aio_open_interest_feed_relay,
fh,
instruments,
)
) as (first, chan)
):
yield chan
@acm
async def maybe_open_oi_feed(
instruments: list[Symbol],
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context(
acm_func=open_oi_feed,
kwargs={
'instruments': instruments
},
key=f'{instruments[0].base}',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
# TODO, move all to `.broker` submod!
# async def aio_order_feed_relay(