Added cryptofeed and pyarrow necessary for the feed, enable deribit
in the brokers init file, at this point the feed is working, to check the tables use vd tool.
parent
0be454c3d6
commit
00406028ea
|
@ -51,6 +51,7 @@ __brokers__: list[str] = [
|
|||
'ib',
|
||||
'kraken',
|
||||
'kucoin',
|
||||
'deribit',
|
||||
|
||||
# broken but used to work
|
||||
# 'questrade',
|
||||
|
@ -61,7 +62,6 @@ __brokers__: list[str] = [
|
|||
# wstrade
|
||||
# iex
|
||||
|
||||
# deribit
|
||||
# bitso
|
||||
]
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ from typing import (
|
|||
Callable,
|
||||
)
|
||||
|
||||
import pendulum
|
||||
from pendulum import now
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from rapidfuzz import process as fuzzy
|
||||
|
@ -51,7 +51,9 @@ from cryptofeed.defines import (
|
|||
OPTION, CALL, PUT
|
||||
)
|
||||
from cryptofeed.symbols import Symbol
|
||||
|
||||
# types for managing the cb callbacks.
|
||||
# from cryptofeed.types import L1Book
|
||||
from piker.accounting import MktPair
|
||||
from piker.data import (
|
||||
def_iohlcv_fields,
|
||||
match_from_pairs,
|
||||
|
@ -80,19 +82,19 @@ _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
|
|||
|
||||
|
||||
class JSONRPCResult(Struct):
|
||||
jsonrpc: str = '2.0'
|
||||
id: int
|
||||
result: Optional[list[dict]] = None
|
||||
error: Optional[dict] = None
|
||||
usIn: int
|
||||
usOut: int
|
||||
usDiff: int
|
||||
testnet: bool
|
||||
jsonrpc: str = '2.0'
|
||||
result: Optional[list[dict]] = None
|
||||
error: Optional[dict] = None
|
||||
|
||||
class JSONRPCChannel(Struct):
|
||||
jsonrpc: str = '2.0'
|
||||
method: str
|
||||
params: dict
|
||||
jsonrpc: str = '2.0'
|
||||
|
||||
|
||||
class KLinesResult(Struct):
|
||||
|
@ -116,9 +118,10 @@ class Trade(Struct):
|
|||
instrument_name: str
|
||||
index_price: float
|
||||
direction: str
|
||||
amount: float
|
||||
contracts: float
|
||||
combo_trade_id: Optional[int] = 0,
|
||||
combo_id: Optional[str] = '',
|
||||
amount: float
|
||||
|
||||
class LastTradesResult(Struct):
|
||||
trades: list[Trade]
|
||||
|
@ -142,13 +145,17 @@ def str_to_cb_sym(name: str) -> Symbol:
|
|||
else:
|
||||
raise Exception("Couldn\'t parse option type")
|
||||
|
||||
year, month, day = get_values_from_cb_normalized_date(expiry_date)
|
||||
|
||||
exp = f'{day}{month}{year}'
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
base=base,
|
||||
quote=quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date,
|
||||
expiry_normalize=False)
|
||||
expiry_date=exp)
|
||||
|
||||
|
||||
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||
|
@ -159,52 +166,70 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
|
|||
|
||||
if option_type == 'P':
|
||||
option_type = PUT
|
||||
elif option_type == 'C':
|
||||
elif option_type == 'C':
|
||||
option_type = CALL
|
||||
else:
|
||||
raise Exception("Couldn\'t parse option type")
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
base=base,
|
||||
quote=quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date.upper())
|
||||
expiry_date=expiry_date)
|
||||
|
||||
|
||||
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||
# cryptofeed normalized
|
||||
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
|
||||
|
||||
# deribit specific
|
||||
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
|
||||
|
||||
exp = sym.expiry_date
|
||||
|
||||
# YYMDD
|
||||
# 01234
|
||||
year, month, day = (
|
||||
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
|
||||
|
||||
year, month, day = get_values_from_cb_normalized_date(sym.expiry_date)
|
||||
exp = f'{day}{month}{year}'
|
||||
otype = 'C' if sym.option_type == CALL else 'P'
|
||||
|
||||
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
|
||||
return f'{sym.base}-{exp}-{sym.strike_price}-{otype}'
|
||||
|
||||
|
||||
def get_values_from_cb_normalized_date(expiry_date: str):
|
||||
# deribit specific
|
||||
cb_norm = [
|
||||
'F', 'G', 'H', 'J',
|
||||
'K', 'M', 'N', 'Q',
|
||||
'U', 'V', 'X', 'Z'
|
||||
]
|
||||
months = [
|
||||
'JAN', 'FEB', 'MAR', 'APR',
|
||||
'MAY', 'JUN', 'JUL', 'AUG',
|
||||
'SEP', 'OCT', 'NOV', 'DEC'
|
||||
]
|
||||
# YYMDD
|
||||
# 01234
|
||||
return (
|
||||
expiry_date[:2],
|
||||
months[cb_norm.index(expiry_date[2:3])],
|
||||
expiry_date[3:]
|
||||
)
|
||||
|
||||
|
||||
def get_config() -> dict[str, Any]:
|
||||
|
||||
conf, path = config.load()
|
||||
conf: dict
|
||||
path: Path
|
||||
|
||||
section = conf.get('deribit')
|
||||
conf, path = config.load(
|
||||
conf_name='brokers',
|
||||
touch_if_dne=True,
|
||||
)
|
||||
section: dict = {}
|
||||
section['deribit'] = conf.get('deribit')
|
||||
|
||||
# TODO: document why we send this, basically because logging params for cryptofeed
|
||||
conf['log'] = {}
|
||||
conf['log']['disabled'] = True
|
||||
section['log'] = {}
|
||||
section['log']['disabled'] = True
|
||||
|
||||
if section is None:
|
||||
log.warning(f'No config section found for deribit in {path}')
|
||||
return {}
|
||||
|
||||
return conf
|
||||
return section
|
||||
|
||||
|
||||
class Client:
|
||||
|
@ -214,13 +239,8 @@ class Client:
|
|||
|
||||
config = get_config().get('deribit', {})
|
||||
|
||||
if ('key_id' in config) and ('key_secret' in config):
|
||||
self._key_id = config['key_id']
|
||||
self._key_secret = config['key_secret']
|
||||
|
||||
else:
|
||||
self._key_id = None
|
||||
self._key_secret = None
|
||||
self._key_id = config.get('key_id')
|
||||
self._key_secret = config.get('key_secret')
|
||||
|
||||
self.json_rpc = json_rpc
|
||||
|
||||
|
@ -358,16 +378,19 @@ class Client:
|
|||
|
||||
async def bars(
|
||||
self,
|
||||
symbol: str,
|
||||
mkt: MktPair,
|
||||
|
||||
start_dt: Optional[datetime] = None,
|
||||
end_dt: Optional[datetime] = None,
|
||||
|
||||
limit: int = 1000,
|
||||
as_np: bool = True,
|
||||
) -> dict:
|
||||
instrument = symbol
|
||||
|
||||
) -> list[tuple] | np.ndarray:
|
||||
instrument: str = mkt.bs_fqme
|
||||
|
||||
if end_dt is None:
|
||||
end_dt = pendulum.now('UTC')
|
||||
end_dt = now('UTC')
|
||||
|
||||
if start_dt is None:
|
||||
start_dt = end_dt.start_of(
|
||||
|
@ -387,29 +410,27 @@ class Client:
|
|||
})
|
||||
|
||||
result = KLinesResult(**resp.result)
|
||||
new_bars = []
|
||||
new_bars: list[tuple] = []
|
||||
for i in range(len(result.close)):
|
||||
|
||||
_open = result.open[i]
|
||||
high = result.high[i]
|
||||
low = result.low[i]
|
||||
close = result.close[i]
|
||||
volume = result.volume[i]
|
||||
|
||||
row = [
|
||||
(start_time + (i * (60 * 1000))) / 1000.0, # time
|
||||
result.open[i],
|
||||
result.high[i],
|
||||
result.low[i],
|
||||
result.close[i],
|
||||
result.volume[i],
|
||||
0
|
||||
result.volume[i]
|
||||
]
|
||||
|
||||
new_bars.append((i,) + tuple(row))
|
||||
|
||||
array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines
|
||||
return array
|
||||
if not as_np:
|
||||
return result
|
||||
|
||||
return np.array(
|
||||
new_bars,
|
||||
dtype=def_iohlcv_fields
|
||||
)
|
||||
|
||||
async def last_trades(
|
||||
self,
|
||||
|
@ -434,7 +455,7 @@ async def get_client(
|
|||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_jsonrpc_session(
|
||||
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc
|
||||
_ws_url, response_type=JSONRPCResult) as json_rpc
|
||||
):
|
||||
client = Client(json_rpc)
|
||||
|
||||
|
@ -450,7 +471,7 @@ async def get_client(
|
|||
https://docs.deribit.com/?python#authentication-2
|
||||
"""
|
||||
renew_time = 10
|
||||
access_scope = 'trade:read_write'
|
||||
access_scope = 'trade:read'
|
||||
_expiry_time = time.time()
|
||||
got_access = False
|
||||
nonlocal _refresh_token
|
||||
|
@ -523,14 +544,14 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
|
|||
|
||||
async def aio_price_feed_relay(
|
||||
fh: FeedHandler,
|
||||
instrument: Symbol,
|
||||
instrument: str,
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
async def _trade(data: dict, receipt_timestamp):
|
||||
piker_symbol = cb_sym_to_deribit_inst(str_to_cb_sym(data.symbol)).lower()
|
||||
to_trio.send_nowait(('trade', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'symbol': piker_symbol,
|
||||
'last': data,
|
||||
'broker_ts': time.time(),
|
||||
'data': data.to_dict(),
|
||||
|
@ -538,9 +559,9 @@ async def aio_price_feed_relay(
|
|||
}))
|
||||
|
||||
async def _l1(data: dict, receipt_timestamp):
|
||||
piker_symbol = cb_sym_to_deribit_inst(str_to_cb_sym(data.symbol)).lower()
|
||||
to_trio.send_nowait(('l1', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'symbol': piker_symbol,
|
||||
'ticks': [
|
||||
{'type': 'bid',
|
||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
||||
|
|
|
@ -25,12 +25,19 @@ import time
|
|||
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import pendulum
|
||||
from pendulum import (
|
||||
from_timestamp,
|
||||
now,
|
||||
)
|
||||
from rapidfuzz import process as fuzzy
|
||||
import numpy as np
|
||||
import tractor
|
||||
|
||||
from piker.brokers import open_cached_client
|
||||
from piker.accounting import MktPair
|
||||
from piker.brokers import (
|
||||
open_cached_client,
|
||||
NoData,
|
||||
)
|
||||
from piker.log import get_logger, get_console_log
|
||||
from piker.data import ShmArray
|
||||
from piker.brokers._util import (
|
||||
|
@ -47,7 +54,7 @@ from cryptofeed.symbols import Symbol
|
|||
from .api import (
|
||||
Client, Trade,
|
||||
get_config,
|
||||
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
||||
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
||||
maybe_open_price_feed
|
||||
)
|
||||
|
||||
|
@ -64,30 +71,46 @@ async def open_history_client(
|
|||
mkt: MktPair,
|
||||
) -> tuple[Callable, int]:
|
||||
|
||||
fnstrument: str = mkt.bs_fqme
|
||||
# TODO implement history getter for the new storage layer.
|
||||
async with open_cached_client('deribit') as client:
|
||||
|
||||
async def get_ohlc(
|
||||
end_dt: Optional[datetime] = None,
|
||||
start_dt: Optional[datetime] = None,
|
||||
timeframe: float,
|
||||
end_dt: datetime | None = None,
|
||||
start_dt: datetime | None = None,
|
||||
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
datetime, # start
|
||||
datetime, # end
|
||||
]:
|
||||
if timeframe != 60:
|
||||
raise DataUnavailable('Only 1m bars are supported')
|
||||
|
||||
array = await client.bars(
|
||||
instrument,
|
||||
array: np.ndarray = await client.bars(
|
||||
mkt,
|
||||
start_dt=start_dt,
|
||||
end_dt=end_dt,
|
||||
)
|
||||
if len(array) == 0:
|
||||
raise DataUnavailable
|
||||
raise NoData(
|
||||
f'No frame for {start_dt} -> {end_dt}\n'
|
||||
)
|
||||
|
||||
start_dt = pendulum.from_timestamp(array[0]['time'])
|
||||
end_dt = pendulum.from_timestamp(array[-1]['time'])
|
||||
start_dt = from_timestamp(array[0]['time'])
|
||||
end_dt = from_timestamp(array[-1]['time'])
|
||||
|
||||
times = array['time']
|
||||
if not times.any():
|
||||
raise ValueError(
|
||||
'Bad frame with null-times?\n\n'
|
||||
f'{times}'
|
||||
)
|
||||
|
||||
if end_dt is None:
|
||||
inow: int = round(time.time())
|
||||
if (inow - times[-1]) > 60:
|
||||
await tractor.pause()
|
||||
|
||||
return array, start_dt, end_dt
|
||||
|
||||
|
@ -110,6 +133,8 @@ async def stream_quotes(
|
|||
|
||||
sym = symbols[0]
|
||||
|
||||
#init_msgs: list[FeedInit] = []
|
||||
|
||||
async with (
|
||||
open_cached_client('deribit') as client,
|
||||
send_chan as send_chan
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -69,6 +69,8 @@ pdbp = "^1.5.0"
|
|||
trio = "^0.24"
|
||||
pendulum = "^3.0.0"
|
||||
httpx = "^0.27.0"
|
||||
cryptofeed = "^2.4.0"
|
||||
pyarrow = "^17.0.0"
|
||||
|
||||
[tool.poetry.dependencies.tractor]
|
||||
develop = true
|
||||
|
|
Loading…
Reference in New Issue