Deribit broker fix #8

Open
ntorres wants to merge 1 commits from deribit_fix into nix-qt6-fix
5 changed files with 1088 additions and 74 deletions

View File

@ -51,6 +51,7 @@ __brokers__: list[str] = [
'ib',
'kraken',
'kucoin',
'deribit',
# broken but used to work
# 'questrade',
@ -61,7 +62,6 @@ __brokers__: list[str] = [
# wstrade
# iex
# deribit
# bitso
]

View File

@ -31,7 +31,7 @@ from typing import (
Callable,
)
import pendulum
from pendulum import now
import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
@ -51,7 +51,9 @@ from cryptofeed.defines import (
OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from piker.accounting import MktPair
from piker.data import (
def_iohlcv_fields,
match_from_pairs,
@ -80,19 +82,19 @@ _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: Optional[list[dict]] = None
error: Optional[dict] = None
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
result: Optional[list[dict]] = None
error: Optional[dict] = None
class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
@ -116,9 +118,12 @@ class Trade(Struct):
instrument_name: str
index_price: float
direction: str
contracts: float
amount: float
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
amount: float
block_trade_leg_count: Optional[int] = 0,
block_trade_id: Optional[str] = '',
class LastTradesResult(Struct):
trades: list[Trade]
@ -142,13 +147,15 @@ def str_to_cb_sym(name: str) -> Symbol:
else:
raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
return Symbol(
base, quote,
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
expiry_date=new_expiry_date)
def piker_sym_to_cb_sym(name: str) -> Symbol:
@ -165,62 +172,79 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
expiry_date=expiry_date)
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
# deribit specific
cb_norm = [
'F', 'G', 'H', 'J',
'K', 'M', 'N', 'Q',
'U', 'V', 'X', 'Z'
]
months = [
'JAN', 'FEB', 'MAR', 'APR',
'MAY', 'JUN', 'JUL', 'AUG',
'SEP', 'OCT', 'NOV', 'DEC'
]
# YYMDD
# 01234
day, month, year = (
expiry_date[3:],
months[cb_norm.index(expiry_date[2:3])],
expiry_date[:2]
)
return f'{day}{month}{year}'
def get_config() -> dict[str, Any]:
conf, path = config.load()
conf: dict
path: Path
section = conf.get('deribit')
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section['deribit'] = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
section['log'] = {}
section['log']['disabled'] = True
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
return conf
return section
class Client:
def __init__(self, json_rpc: Callable) -> None:
def __init__(
self,
json_rpc: Callable
) -> None:
self._pairs: dict[str, Any] = None
config = get_config().get('deribit', {})
if ('key_id' in config) and ('key_secret' in config):
self._key_id = config['key_id']
self._key_secret = config['key_secret']
else:
self._key_id = None
self._key_secret = None
self._key_id = config.get('key_id')
self._key_secret = config.get('key_secret')
self.json_rpc = json_rpc
@ -228,7 +252,10 @@ class Client:
def currencies(self):
return ['btc', 'eth', 'sol', 'usd']
async def get_balances(self, kind: str = 'option') -> dict[str, float]:
async def get_balances(
self,
kind: str = 'option'
) -> dict[str, float]:
"""Return the set of positions for this account
by symbol.
"""
@ -358,16 +385,19 @@ class Client:
async def bars(
self,
symbol: str,
mkt: MktPair,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
limit: int = 1000,
as_np: bool = True,
) -> dict:
instrument = symbol
) -> list[tuple] | np.ndarray:
instrument: str = mkt.bs_fqme
if end_dt is None:
end_dt = pendulum.now('UTC')
end_dt = now('UTC')
if start_dt is None:
start_dt = end_dt.start_of(
@ -387,29 +417,27 @@ class Client:
})
result = KLinesResult(**resp.result)
new_bars = []
new_bars: list[tuple] = []
for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
result.low[i],
result.close[i],
result.volume[i],
0
result.volume[i]
]
new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines
return array
if not as_np:
return result
return np.array(
new_bars,
dtype=def_iohlcv_fields
)
async def last_trades(
self,
@ -434,7 +462,8 @@ async def get_client(
async with (
trio.open_nursery() as n,
open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc
_ws_url, response_type=JSONRPCResult
) as json_rpc
):
client = Client(json_rpc)
@ -523,7 +552,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay(
fh: FeedHandler,
instrument: Symbol,
instrument: str,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:

View File

@ -25,12 +25,19 @@ import time
import trio
from trio_typing import TaskStatus
import pendulum
from pendulum import (
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.brokers import open_cached_client
from piker.accounting import MktPair
from piker.brokers import (
open_cached_client,
NoData,
)
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.brokers._util import (
@ -47,7 +54,7 @@ from cryptofeed.symbols import Symbol
from .api import (
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
@ -64,30 +71,46 @@ async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars(
instrument,
array: np.ndarray = await client.bars(
mkt,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
raise DataUnavailable
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
start_dt = from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt
@ -110,6 +133,8 @@ async def stream_quotes(
sym = symbols[0]
#init_msgs: list[FeedInit] = []
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
@ -123,7 +148,10 @@ async def stream_quotes(
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'shm_write_opts': {
'sum_tick_vml': True,
'has_vlm': True
},
'fqsn': sym,
},
}

957
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -69,6 +69,8 @@ pdbp = "^1.5.0"
trio = "^0.24"
pendulum = "^3.0.0"
httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor]
develop = true