Here are the necessary fuctions to fetch the data

from deribit, using cryptofeed library, this functions are located
in the deribit's api module.

Changes:
1. Add and fix auxiliar functions for handling cryptofeed data.
2. Add aio open interest functions for context management and cryptofeed conexions.
3. Some typos and format fixes too.
Nelson Torres 2024-11-26 15:10:24 -03:00
parent dc2b255548
commit da55856dd2
1 changed files with 228 additions and 22 deletions

View File

@ -52,12 +52,14 @@ from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT,
L1_BOOK, TRADES,
OPTION, CALL, PUT
OPTION, CALL, PUT,
OPEN_INTEREST,
)
from cryptofeed.symbols import Symbol
from cryptofeed.types import (
L1Book,
Trade,
OpenInterest,
)
from piker.brokers import SymbolNotFound
from .venues import (
@ -110,6 +112,10 @@ def deribit_timestamp(when: datetime) -> int:
)
def get_timestamp_int(expiry_date: str) -> int:
return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
@ -117,13 +123,14 @@ def str_to_cb_sym(name: str) -> Symbol:
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
elif option_type == 'call':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
new_expiry_date: int = get_timestamp_int(
get_values_from_cb_normalized_date(expiry_date)
)
return Symbol(
base=base,
quote=quote,
@ -143,11 +150,12 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
)= tuple(
name.upper().split('-'))
new_expiry_date = get_timestamp_int(expiry_date)
quote: str = base
if option_type == 'P':
if option_type == 'P' or option_type == 'PUT':
option_type = PUT
elif option_type == 'C':
elif option_type == 'C' or option_type == 'CALL':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
@ -158,7 +166,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date
expiry_date=new_expiry_date
)
@ -226,16 +234,18 @@ def get_config() -> dict[str, Any]:
)
conf_option = section.get('option', {})
section.clear # clear the dict to reuse it
section['deribit'] = {}
section['deribit']['key_id'] = conf_option.get('api_key')
section['deribit']['key_secret'] = conf_option.get('api_secret')
section['log'] = {}
section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG'
return section
conf_log = conf_option.get('log', {})
return {
'deribit': {
'key_id': conf_option['key_id'],
'key_secret': conf_option['key_secret'],
},
'log': {
'filename': conf_log['filename'],
'level': conf_log['level'],
'disabled': conf_log['disabled'],
}
}
class Client:
@ -311,6 +321,20 @@ class Client:
return balances
async def get_currencies(
self,
) -> list[dict]:
'''
Return the set of currencies for deribit.
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
return resp.result
async def get_assets(
self,
venue: str | None = None,
@ -323,11 +347,7 @@ class Client:
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
currencies: list[dict] = resp.result
currencies = await self.get_currencies()
for currency in currencies:
name: str = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
@ -359,6 +379,82 @@ class Client:
return flat
async def get_instruments(
self,
currency: str = 'btc',
kind: str = 'option',
expired: bool = False,
expiry_date: str = None,
) -> list[Symbol]:
"""
Get instruments for cryptoFeed.FeedHandler.
"""
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
'expired': expired,
}
r: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_instruments',
params,
)
resp = r.result
response_list = []
for i in range(len(resp)):
element = resp[i]
name = f'{element["instrument_name"].split("-")[1]}'
if not expiry_date or name == expiry_date.upper():
response_list.append(piker_sym_to_cb_sym(element['instrument_name']))
return response_list
async def get_expiration_dates(
self,
currency: str = 'btc',
kind: str = 'option',
) -> list[str]:
"""
Get a dict with all expiration dates listed as value and currency as key.
"""
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
}
r: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_expirations',
params,
)
resp = r.result
return resp[currency][kind]
def get_strikes_dict(
self,
instruments: list[Symbol],
) -> dict[str, dict[str, Decimal | None]]:
"""
Get a dict with strike prices as keys.
"""
response: dict[str, dict[str, Decimal | None]] = {}
for i in range(len(instruments)):
element = instruments[i]
strike = f'{str(element).split('-')[1]}'
response[f'{strike}'] = {
'C': None,
'P': None,
}
return response
async def submit_limit(
self,
symbol: str,
@ -738,6 +834,116 @@ async def maybe_open_price_feed(
yield feed
async def aio_open_interest_feed_relay(
fh: FeedHandler,
instruments: list[Symbol],
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(
trade: Trade, # cryptofeed, NOT ours from `.venues`!
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
to_trio.send_nowait(('trade', trade))
# trade and oi are user defined functions that
# will be called when trade and open interest updates are received
# data type is not dict, is an object: cryptofeed.types.OpenINterest
async def _oi(
oi: OpenInterest,
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side.
'''
symbol: Symbol = str_to_cb_sym(oi.symbol)
piker_sym: str = cb_sym_to_deribit_inst(symbol)
(
base,
expiry_date,
strike_price,
option_type
) = tuple(
piker_sym.split('-')
)
msg = {
'timestamp': oi.timestamp,
'strike_price': strike_price,
'option_type': option_type,
'open_interest': Decimal(oi.open_interest),
}
to_trio.send_nowait(('oi', msg))
channels = [TRADES, OPEN_INTEREST]
callbacks={TRADES: _trade, OPEN_INTEREST: _oi}
fh.add_feed(
DERIBIT,
channels=channels,
symbols=instruments,
callbacks=callbacks
)
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False
)
# sync with trio
to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf'))
@acm
async def open_oi_feed(
instruments: list[Symbol],
) -> to_asyncio.LinkedTaskChannel:
fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial(
aio_open_interest_feed_relay,
fh,
instruments,
)
) as (first, chan)
):
yield chan
@acm
async def maybe_open_oi_feed(
instruments: list[Symbol],
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context(
acm_func=open_oi_feed,
kwargs={
'instruments': instruments
},
key=f'{instruments[0].base}',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
# TODO, move all to `.broker` submod!
# async def aio_order_feed_relay(