piker/piker/brokers/deribit/api.py

1043 lines
27 KiB
Python

# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Deribit backend.
'''
import asyncio
from collections import ChainMap
from contextlib import (
asynccontextmanager as acm,
)
from datetime import datetime
from decimal import (
Decimal,
)
from functools import partial
from pathlib import Path
from pprint import pformat
import time
from typing import (
Any,
Optional,
Callable,
)
from pendulum import now
import trio
import numpy as np
from tractor.trionics import (
broadcast_receiver,
maybe_open_context
)
from tractor import to_asyncio
# XXX WOOPS XD
# yeah you'll need to install it since it was removed in #489 by
# accident; well i thought we had removed all usage..
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT,
L1_BOOK, TRADES,
OPTION, CALL, PUT,
OPEN_INTEREST,
)
from cryptofeed.symbols import Symbol
from cryptofeed.types import (
L1Book,
Trade,
OpenInterest,
)
from piker.brokers import SymbolNotFound
from .venues import (
_ws_url,
MarketType,
PAIRTYPES,
Pair,
OptionPair,
JSONRPCResult,
KLinesResult,
LastTradesResult,
)
from piker.accounting import (
Asset,
digits_to_dec,
MktPair,
)
from piker.data import (
def_iohlcv_fields,
match_from_pairs,
# Struct,
)
from piker.data._web_bs import (
open_jsonrpc_session
)
from piker import config
from piker.log import get_logger
log = get_logger(__name__)
_spawn_kwargs = {
'infect_asyncio': True,
}
def deribit_timestamp(when: datetime) -> int:
'''
Convert conventional epoch timestamp, in secs, to unixtime in
milliseconds.
'''
return int(
(when.timestamp() * 1000)
+
(when.microsecond / 1000)
)
def get_timestamp_int(expiry_date: str) -> int:
return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
new_expiry_date: int = get_timestamp_int(
get_values_from_cb_normalized_date(expiry_date)
)
return Symbol(
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=new_expiry_date
)
def piker_sym_to_cb_sym(name: str) -> Symbol:
(
base,
expiry_date,
strike_price,
option_type,
)= tuple(
name.upper().split('-'))
new_expiry_date = get_timestamp_int(expiry_date)
quote: str = base
if option_type == 'P' or option_type == 'PUT':
option_type = PUT
elif option_type == 'C' or option_type == 'CALL':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=new_expiry_date
)
# TODO, instead can't we just lookup the `MktPair` directly
# and pass it upward to `stream_quotes()`??
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
'''
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
uniquely from its fields.
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
for now i suppose..?
'''
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = (
'C' if sym.option_type == CALL
else 'P'
)
return (
f'{sym.base}-'
f'{new_expiry_date}-'
f'{sym.strike_price}-'
f'{otype}'
)
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
# deribit specific
cb_norm = [
'F', 'G', 'H', 'J',
'K', 'M', 'N', 'Q',
'U', 'V', 'X', 'Z'
]
months = [
'JAN', 'FEB', 'MAR', 'APR',
'MAY', 'JUN', 'JUL', 'AUG',
'SEP', 'OCT', 'NOV', 'DEC'
]
# YYMDD
# 01234
day, month, year = (
expiry_date[3:],
months[cb_norm.index(expiry_date[2:3])],
expiry_date[:2]
)
return f'{day}{month}{year}'
def get_config() -> dict[str, Any]:
conf: dict
path: Path
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict|None = conf.get('deribit')
if section is None:
raise ValueError(
f'No `[deribit]` section found in\n'
f'{path!r}\n\n'
f'See the template config from the core repo for samples..\n'
# f'<TODO put repo link here??>'
)
conf_option = section.get('option', {})
section = {} # clear the dict to reuse it
section['deribit'] = {}
section['deribit']['key_id'] = conf_option.get('api_key')
section['deribit']['key_secret'] = conf_option.get('api_secret')
section['log'] = {}
section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG'
section['log']['disabled'] = True
return section
class Client:
'''
Hi-level interface for the jsron-RPC over websocket API.
'''
def __init__(
self,
json_rpc: Callable
) -> None:
self._pairs: ChainMap[str, Pair] = ChainMap()
config = get_config().get('deribit', {})
self._key_id = config.get('key_id')
self._key_secret = config.get('key_secret')
self.json_rpc = json_rpc
self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper(
self,
*args,
**kwargs,
) -> JSONRPCResult:
"""Background task that adquires a first access token and then will
refresh the access token.
https://docs.deribit.com/?python#authentication-2
"""
access_scope = 'trade:read_write'
current_ts = time.time()
if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts:
# if we are close to token expiry time
params = {
'grant_type': 'client_credentials',
'client_id': self._key_id,
'client_secret': self._key_secret,
'scope': access_scope
}
resp = await self.json_rpc('public/auth', params)
result = resp.result
self._auth_ts = time.time() + result['expires_in']
return await self.json_rpc(*args, **kwargs)
async def get_balances(
self,
kind: str = 'option'
) -> dict[str, float]:
"""Return the set of positions for this account
by symbol.
"""
balances = {}
for currency in self.currencies:
resp = await self._json_rpc_auth_wrapper(
'private/get_positions', params={
'currency': currency.upper(),
'kind': kind})
balances[currency] = resp.result
return balances
async def get_currencies(
self,
) -> list[dict]:
'''
Return the set of currencies for deribit.
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
return resp.result
async def get_assets(
self,
venue: str | None = None,
) -> dict[str, Asset]:
'''
Return the set of asset balances for this account
by (deribit's) symbol.
'''
assets = {}
currencies = await self.get_currencies()
for currency in currencies:
name: str = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
# TODO, handling of options, futures, perps etc. more
# specifically with diff `.atype`s?
assets[name] = Asset(
name=name,
atype='crypto_currency',
tx_tick=tx_tick,
)
instruments = await self.symbol_info(currency=name)
for instrument in instruments:
pair = instruments[instrument]
assets[pair.symbol] = Asset(
name=pair.symbol,
atype=pair.venue,
tx_tick=pair.size_tick,
)
return assets
async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {}
for key in self._pairs:
item = self._pairs.get(key)
flat[item.bs_fqme] = item
return flat
async def get_instruments(
self,
currency: str = 'btc',
kind: str = 'option',
expired: bool = False,
expiry_date: str = None,
) -> list[Symbol]:
"""
Get instruments for cryptoFeed.FeedHandler.
"""
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
'expired': expired,
}
r: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_instruments',
params,
)
resp = r.result
response_list = []
for i in range(len(resp)):
element = resp[i]
name = f'{element["instrument_name"].split("-")[1]}'
if not expiry_date or name == expiry_date.upper():
response_list.append(piker_sym_to_cb_sym(element['instrument_name']))
return response_list
async def submit_limit(
self,
symbol: str,
price: float,
action: str,
size: float
) -> dict:
"""Place an order
"""
params = {
'instrument_name': symbol.upper(),
'amount': size,
'type': 'limit',
'price': price,
}
resp = await self._json_rpc_auth_wrapper(
f'private/{action}', params)
return resp.result
async def submit_cancel(self, oid: str):
"""Send cancel request for order id
"""
resp = await self._json_rpc_auth_wrapper(
'private/cancel', {'order_id': oid})
return resp.result
async def exch_info(
self,
sym: str | None = None,
venue: MarketType = 'option',
expiry: str | None = None,
) -> dict[str, Pair] | Pair:
pair_table: dict[str, Pair] = self._pairs
if (
sym
and (cached_pair := pair_table.get(sym))
):
return cached_pair
if sym:
opt: OptionPair|None = pair_table.get(sym)
if not opt:
closest_matches: dict[str, Pair] = match_from_pairs(
pairs=pair_table,
query=sym,
score_cutoff=40,
)
closest_syms: list[str] = list(closest_matches.keys())
raise ValueError(
f'No contract found for {sym!r}\n\n'
f'Closest {len(closest_syms)} available contracts:\n\n'
f'{pformat(closest_syms)}\n'
)
return pair_table[sym]
else:
return self._pairs
async def symbol_info(
self,
instrument: Optional[str] = None,
currency: str = 'btc', # BTC, ETH, SOL, USDC
kind: str = 'option',
expired: bool = False
) -> dict[str, Pair] | Pair:
'''
Get symbol infos.
'''
if self._pairs:
return self._pairs
# will retrieve all symbols by default
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
'expired': expired,
}
resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_instruments',
params,
)
# convert to symbol-keyed table
pair_type: Pair = PAIRTYPES[kind]
results: list[dict] | None = resp.result
instruments: dict[str, Pair] = {}
for item in results:
symbol=item['instrument_name'].lower()
try:
pair: Pair = pair_type(
symbol=symbol,
**item
)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid deribit changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://docs.deribit.com/?python#deribit-api-v2-1-1'
)
raise
instruments[symbol] = pair
if instrument is not None:
return instruments[instrument.lower()]
else:
return instruments
async def cache_symbols(
self,
venue: MarketType = 'option',
) -> None:
# lookup internal mkt-specific pair table to update
pair_table: dict[str, Pair] = self._pairs
# make API request(s)
mkt_pairs = await self.symbol_info()
if not mkt_pairs:
raise SymbolNotFound(
f'No market pairs found!?:\n'
f'{mkt_pairs}'
)
pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs:
pair_type: Pair|OptionPair = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
pair_table[pair.symbol.upper()] = pair
# update an additional top-level-cross-venue-table
# `._pairs: ChainMap` for search B0
pairs_view_subtable[pair.bs_fqme] = pair
self._pairs.maps.append(pairs_view_subtable)
return self._pairs
async def search_symbols(
self,
pattern: str,
limit: int = 30,
) -> dict[str, Pair]:
'''
Fuzzy search symbology set for pairs matching `pattern`.
'''
pairs: dict[str, Pair] = await self.exch_info()
return match_from_pairs(
pairs=pairs,
query=pattern.upper(),
score_cutoff=35,
limit=limit
)
async def bars(
self,
mkt: MktPair,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
limit: int = 1000,
as_np: bool = True,
) -> list[tuple] | np.ndarray:
instrument: str = mkt.bs_fqme.split('.')[0]
if end_dt is None:
end_dt = now('UTC')
_orig_start_dt = start_dt
if start_dt is None:
start_dt = end_dt.start_of(
'minute'
).subtract(minutes=limit)
start_time: int = deribit_timestamp(start_dt)
end_time: int = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self._json_rpc_auth_wrapper(
'public/get_tradingview_chart_data',
params={
'instrument_name': instrument.upper(),
'start_timestamp': start_time,
'end_timestamp': end_time,
'resolution': '1'
})
result = KLinesResult(**resp.result)
new_bars: list[tuple] = []
# if _orig_start_dt is None:
# if not new_bars:
# import tractor
# await tractor.pause()
for i in range(len(result.close)):
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
result.low[i],
result.close[i],
result.volume[i]
]
new_bars.append((i,) + tuple(row))
if not as_np:
return result
return np.array(
new_bars,
dtype=def_iohlcv_fields
)
async def last_trades(
self,
instrument: str,
count: int = 10
):
resp = await self._json_rpc_auth_wrapper(
'public/get_last_trades_by_instrument',
params={
'instrument_name': instrument,
'count': count
})
return LastTradesResult(**resp.result)
@acm
async def get_client(
is_brokercheck: bool = False,
venue: MarketType = 'option',
) -> Client:
async with (
trio.open_nursery() as n,
open_jsonrpc_session(
_ws_url, response_type=JSONRPCResult
) as json_rpc
):
client = Client(json_rpc)
await client.cache_symbols()
yield client
n.cancel_scope.cancel()
@acm
async def open_feed_handler() -> FeedHandler:
fh = FeedHandler(config=get_config())
yield fh
await to_asyncio.run_task(fh.stop_async)
@acm
async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async with maybe_open_context(
acm_func=open_feed_handler,
key='feedhandler',
) as (cache_hit, fh):
yield fh
async def aio_price_feed_relay(
fh: FeedHandler,
instrument: str,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
'''
Relay price feed quotes from the `cryptofeed.FeedHandler` to
the `piker`-side `trio.task` consumers for delivery to consumer
sub-actors for various subsystems.
'''
async def _trade(
trade: Trade, # cryptofeed, NOT ours from `.venues`!
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
to_trio.send_nowait(('trade', trade))
async def _l1(
book: L1Book,
receipt_timestamp: int,
) -> None:
'''
Relay-thru "l1 book" updates.
'''
to_trio.send_nowait(('l1', book))
# TODO, make this work!
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
# breakpoint()
sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=[sym],
callbacks={
TRADES: _trade,
L1_BOOK: _l1
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False
)
# sync with trio
to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf'))
@acm
async def open_price_feed(
instrument: str
) -> to_asyncio.LinkedTaskChannel:
fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial(
aio_price_feed_relay,
fh,
instrument
)
) as (first, chan)
):
yield chan
@acm
async def maybe_open_price_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context(
acm_func=open_price_feed,
kwargs={
'instrument': instrument.split('.')[0]
},
key=f'{instrument.split('.')[0]}-price',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
async def aio_open_interest_feed_relay(
fh: FeedHandler,
instruments: list,
open_interests: dict[str, dict[str, dict[str, list[dict[str, Decimal]]]]],
losses_cache: dict[str, Decimal],
max_losses: Decimal,
max_pain: Decimal,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(
trade: Trade, # cryptofeed, NOT ours from `.venues`!
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
to_trio.send_nowait(('trade', trade))
# trade and oi are user defined functions that
# will be called when trade and open interest updates are received
# data type is not dict, is an object: cryptofeed.types.OpenINterest
async def _oi(
oi: OpenInterest,
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side.
'''
nonlocal losses_cache
nonlocal max_losses
nonlocal max_pain
nonlocal open_interests
symbol: Symbol = str_to_cb_sym(oi.symbol)
piker_sym: str = cb_sym_to_deribit_inst(symbol)
data: dict = oi.raw['params']['data']
(
base,
expiry_date,
strike_price,
option_type
) = tuple(
piker_sym.split('-')
)
if not f'{expiry_date}' in open_interests:
open_interests[f'{expiry_date}'] = {
f'{strike_price}': {
'C': [],
'P': [],
'strike_losses': Decimal(0),
}
}
if not f'{strike_price}' in open_interests[f'{expiry_date}']:
open_interests[f'{expiry_date}'][f'{strike_price}'] = {
'C': [],
'P': [],
'strike_losses': Decimal(0),
}
index_price: Decimal = data['index_price']
price_delta: Decimal = abs(index_price - Decimal(strike_price))
open_interest: Decimal = oi.open_interest
losses: Decimal = price_delta * open_interest
if not f'{strike_price}' in losses_cache:
losses_cache[f'{strike_price}'] = {
'C': Decimal(0),
'P': Decimal(0),
}
losses_cache[f'{strike_price}'][f'{option_type}'] = losses
strike_losses: Decimal = (
losses_cache[f'{strike_price}']['C']
+
losses_cache[f'{strike_price}']['P']
)
print(f'>>>> Open Interest...')
print(f'max_losses: {max_losses}\n')
print(f'max_pain: {max_pain}')
print('-----------------------------------------------')
open_interests[f'{expiry_date}'][f'{strike_price}'][f'{option_type}'] = {
'date': oi.timestamp,
'open_interest': open_interest,
'index_price': index_price,
'strike_price': strike_price,
'price_delta': price_delta,
'losses': losses, # this updates the global value call_losses and put_losses
}
# calculate with latest values stored in call_losses and put_losses global cache.
open_interests[f'{expiry_date}'][f'{strike_price}']['strike_losses'] = strike_losses
for strike in open_interests[f'{expiry_date}']:
if open_interests[f'{expiry_date}'][strike]['strike_losses'] > max_losses:
max_losses = open_interests[f'{expiry_date}'][strike]['strike_losses']
max_pain = strike
print('-----------------------------------------------')
print(f'strike_price: {strike_price}')
print(f'strike_losses: {open_interests[f'{expiry_date}'][strike]['strike_losses']}')
print(f'{pformat(open_interests[f'{expiry_date}'][strike])}')
print('-----------------------------------------------')
channels = [TRADES, OPEN_INTEREST]
callbacks={TRADES: _trade, OPEN_INTEREST: _oi}
fh.add_feed(
DERIBIT,
channels=channels,
symbols=instruments,
callbacks=callbacks
)
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False
)
# sync with trio
to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf'))
@acm
async def open_oi_feed(
) -> to_asyncio.LinkedTaskChannel:
expiry_date: str = '6DEC24' # '6DEC24' '26SEP25' '27JUN25' '13DEC24'
instruments: list[Symbol] = []
async with get_client(
) as client:
# to get all currencies available in deribit
# currencies = await client.get_currencies()
instruments = await client.get_instruments(
expiry_date=expiry_date,
)
losses_cache: dict[str, Decimal] = { # {'<strike_price>': <value>}
'C': Decimal(0),
'P': Decimal(0),
}
max_losses: Decimal = Decimal(0)
max_pain: Decimal = Decimal(0)
open_interests: dict[str, dict[str, dict[str, list[dict]]]] = {}
fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial(
aio_open_interest_feed_relay,
fh,
instruments,
open_interests,
losses_cache,
max_losses,
max_pain,
)
) as (first, chan)
):
yield chan
@acm
async def maybe_open_oi_feed(
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context(
acm_func=open_oi_feed,
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
# TODO, move all to `.broker` submod!
# async def aio_order_feed_relay(
# fh: FeedHandler,
# instrument: Symbol,
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
# ) -> None:
# async def _fill(data: dict, receipt_timestamp):
# breakpoint()
# async def _order_info(data: dict, receipt_timestamp):
# breakpoint()
# fh.add_feed(
# DERIBIT,
# channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()],
# callbacks={
# FILLS: _fill,
# ORDER_INFO: _order_info,
# })
# if not fh.running:
# fh.run(
# start_loop=False,
# install_signal_handlers=False)
# # sync with trio
# to_trio.send_nowait(None)
# await asyncio.sleep(float('inf'))
# @acm
# async def open_order_feed(
# instrument: list[str]
# ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from(
# partial(
# aio_order_feed_relay,
# fh,
# instrument
# )
# ) as (first, chan):
# yield chan
# @acm
# async def maybe_open_order_feed(
# instrument: str
# ) -> trio.abc.ReceiveStream:
# # TODO: add a predicate to maybe_open_context
# async with maybe_open_context(
# acm_func=open_order_feed,
# kwargs={
# 'instrument': instrument.split('.')[0],
# 'fh': fh
# },
# key=f'{instrument.split('.')[0]}-order',
# ) as (cache_hit, feed):
# if cache_hit:
# yield broadcast_receiver(feed, 10)
# else:
# yield feed