Added cryptofeed and pyarrow necessary for the feed, enable deribit

in the brokers init file, at this point the feed is working, to check
the tables use vd tool.
Nelson Torres 2024-08-27 23:58:36 +00:00
parent 0be454c3d6
commit 53148b453d
5 changed files with 1081 additions and 71 deletions

View File

@ -51,6 +51,7 @@ __brokers__: list[str] = [
'ib',
'kraken',
'kucoin',
'deribit',
# broken but used to work
# 'questrade',
@ -61,7 +62,6 @@ __brokers__: list[str] = [
# wstrade
# iex
# deribit
# bitso
]

View File

@ -31,7 +31,7 @@ from typing import (
Callable,
)
import pendulum
from pendulum import now
import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
@ -51,7 +51,9 @@ from cryptofeed.defines import (
OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from piker.accounting import MktPair
from piker.data import (
def_iohlcv_fields,
match_from_pairs,
@ -80,19 +82,19 @@ _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: Optional[list[dict]] = None
error: Optional[dict] = None
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
result: Optional[list[dict]] = None
error: Optional[dict] = None
class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
@ -116,9 +118,10 @@ class Trade(Struct):
instrument_name: str
index_price: float
direction: str
amount: float
contracts: float
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
amount: float
class LastTradesResult(Struct):
trades: list[Trade]
@ -142,13 +145,17 @@ def str_to_cb_sym(name: str) -> Symbol:
else:
raise Exception("Couldn\'t parse option type")
year, month, day = get_values_from_cb_normalized_date(expiry_date)
exp = f'{day}{month}{year}'
return Symbol(
base, quote,
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
expiry_date=exp)
def piker_sym_to_cb_sym(name: str) -> Symbol:
@ -163,48 +170,65 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
expiry_date=expiry_date)
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
year, month, day = get_values_from_cb_normalized_date(sym.expiry_date)
exp = f'{day}{month}{year}'
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
return f'{sym.base}-{exp}-{sym.strike_price}-{otype}'
def get_values_from_cb_normalized_date(expiry_date: str):
# deribit specific
cb_norm = [
'F', 'G', 'H', 'J',
'K', 'M', 'N', 'Q',
'U', 'V', 'X', 'Z'
]
months = [
'JAN', 'FEB', 'MAR', 'APR',
'MAY', 'JUN', 'JUL', 'AUG',
'SEP', 'OCT', 'NOV', 'DEC'
]
# YYMDD
# 01234
return (
expiry_date[:2],
months[cb_norm.index(expiry_date[2:3])],
expiry_date[3:]
)
def get_config() -> dict[str, Any]:
conf, path = config.load()
conf: dict
path: Path
section = conf.get('deribit')
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section['deribit'] = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
section['log'] = {}
section['log']['disabled'] = True
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
return conf
return section
class Client:
@ -212,8 +236,9 @@ class Client:
def __init__(self, json_rpc: Callable) -> None:
self._pairs: dict[str, Any] = None
config = get_config().get('deribit', {})
config = get_config()
if ('deribit' in config):
config = config['deribit']
if ('key_id' in config) and ('key_secret' in config):
self._key_id = config['key_id']
self._key_secret = config['key_secret']
@ -358,16 +383,19 @@ class Client:
async def bars(
self,
symbol: str,
mkt: MktPair,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
limit: int = 1000,
as_np: bool = True,
) -> dict:
instrument = symbol
) -> list[tuple] | np.ndarray:
instrument: str = mkt.bs_fqme
if end_dt is None:
end_dt = pendulum.now('UTC')
end_dt = now('UTC')
if start_dt is None:
start_dt = end_dt.start_of(
@ -387,9 +415,10 @@ class Client:
})
result = KLinesResult(**resp.result)
new_bars = []
new_bars: list[tuple] = []
for i in range(len(result.close)):
timestamp = (start_time + (i * (60 * 1000))) / 1000.0
_open = result.open[i]
high = result.high[i]
low = result.low[i]
@ -397,19 +426,23 @@ class Client:
volume = result.volume[i]
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
result.low[i],
result.close[i],
result.volume[i],
0
timestamp, # time
_open,
high,
low,
close,
volume
]
new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines
return array
if not as_np:
return result
return np.array(
new_bars,
dtype=def_iohlcv_fields
)
async def last_trades(
self,
@ -434,7 +467,7 @@ async def get_client(
async with (
trio.open_nursery() as n,
open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc
_ws_url, response_type=JSONRPCResult) as json_rpc
):
client = Client(json_rpc)
@ -450,7 +483,7 @@ async def get_client(
https://docs.deribit.com/?python#authentication-2
"""
renew_time = 10
access_scope = 'trade:read_write'
access_scope = 'trade:read'
_expiry_time = time.time()
got_access = False
nonlocal _refresh_token
@ -523,14 +556,14 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay(
fh: FeedHandler,
instrument: Symbol,
instrument: str,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
piker_symbol = cb_sym_to_deribit_inst(str_to_cb_sym(data.symbol)).lower()
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'symbol': piker_symbol,
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
@ -538,9 +571,9 @@ async def aio_price_feed_relay(
}))
async def _l1(data: dict, receipt_timestamp):
piker_symbol = cb_sym_to_deribit_inst(str_to_cb_sym(data.symbol)).lower()
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'symbol': piker_symbol,
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},

View File

@ -25,11 +25,15 @@ import time
import trio
from trio_typing import TaskStatus
import pendulum
from pendulum import (
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.accounting import MktPair
from piker.brokers import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
@ -47,7 +51,7 @@ from cryptofeed.symbols import Symbol
from .api import (
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
@ -64,13 +68,13 @@ async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
@ -78,16 +82,30 @@ async def open_history_client(
datetime, # end
]:
array = await client.bars(
instrument,
array: np.ndarray = await client.bars(
mkt,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
raise DataUnavailable
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
start_dt = from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt
@ -110,6 +128,8 @@ async def stream_quotes(
sym = symbols[0]
#init_msgs: list[FeedInit] = []
async with (
open_cached_client('deribit') as client,
send_chan as send_chan

957
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -69,6 +69,8 @@ pdbp = "^1.5.0"
trio = "^0.24"
pendulum = "^3.0.0"
httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor]
develop = true