Added cryptofeed and pyarrow necessary for the feed, enable deribit

in the brokers init file, at this point the feed is working, to check
the tables use vd tool.
Nelson Torres 2024-08-27 23:58:36 +00:00
parent 0be454c3d6
commit cd92ef8b3a
5 changed files with 1077 additions and 74 deletions

View File

@ -51,6 +51,7 @@ __brokers__: list[str] = [
'ib', 'ib',
'kraken', 'kraken',
'kucoin', 'kucoin',
'deribit',
# broken but used to work # broken but used to work
# 'questrade', # 'questrade',
@ -61,7 +62,6 @@ __brokers__: list[str] = [
# wstrade # wstrade
# iex # iex
# deribit
# bitso # bitso
] ]

View File

@ -31,7 +31,7 @@ from typing import (
Callable, Callable,
) )
import pendulum from pendulum import now
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
@ -51,7 +51,9 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from piker.accounting import MktPair
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
@ -80,19 +82,19 @@ _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int id: int
result: Optional[list[dict]] = None
error: Optional[dict] = None
usIn: int usIn: int
usOut: int usOut: int
usDiff: int usDiff: int
testnet: bool testnet: bool
jsonrpc: str = '2.0'
result: Optional[list[dict]] = None
error: Optional[dict] = None
class JSONRPCChannel(Struct): class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str method: str
params: dict params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct): class KLinesResult(Struct):
@ -116,9 +118,10 @@ class Trade(Struct):
instrument_name: str instrument_name: str
index_price: float index_price: float
direction: str direction: str
amount: float
contracts: float
combo_trade_id: Optional[int] = 0, combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '', combo_id: Optional[str] = '',
amount: float
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: list[Trade] trades: list[Trade]
@ -142,13 +145,17 @@ def str_to_cb_sym(name: str) -> Symbol:
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
year, month, day = get_values_from_cb_normalized_date(expiry_date)
exp = f'{day}{month}{year}'
return Symbol( return Symbol(
base, quote, base=base,
quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date, expiry_date=exp)
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol: def piker_sym_to_cb_sym(name: str) -> Symbol:
@ -159,52 +166,70 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
if option_type == 'P': if option_type == 'P':
option_type = PUT option_type = PUT
elif option_type == 'C': elif option_type == 'C':
option_type = CALL option_type = CALL
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
return Symbol( return Symbol(
base, quote, base=base,
quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date.upper()) expiry_date=expiry_date)
def cb_sym_to_deribit_inst(sym: Symbol): def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized year, month, day = get_values_from_cb_normalized_date(sym.expiry_date)
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] exp = f'{day}{month}{year}'
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P' otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' return f'{sym.base}-{exp}-{sym.strike_price}-{otype}'
def get_values_from_cb_normalized_date(expiry_date: str):
# deribit specific
cb_norm = [
'F', 'G', 'H', 'J',
'K', 'M', 'N', 'Q',
'U', 'V', 'X', 'Z'
]
months = [
'JAN', 'FEB', 'MAR', 'APR',
'MAY', 'JUN', 'JUL', 'AUG',
'SEP', 'OCT', 'NOV', 'DEC'
]
# YYMDD
# 01234
return (
expiry_date[:2],
months[cb_norm.index(expiry_date[2:3])],
expiry_date[3:]
)
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
conf, path = config.load() conf: dict
path: Path
section = conf.get('deribit') conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section['deribit'] = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed # TODO: document why we send this, basically because logging params for cryptofeed
conf['log'] = {} section['log'] = {}
conf['log']['disabled'] = True section['log']['disabled'] = True
if section is None: if section is None:
log.warning(f'No config section found for deribit in {path}') log.warning(f'No config section found for deribit in {path}')
return {}
return conf return section
class Client: class Client:
@ -214,13 +239,8 @@ class Client:
config = get_config().get('deribit', {}) config = get_config().get('deribit', {})
if ('key_id' in config) and ('key_secret' in config): self._key_id = config.get('key_id')
self._key_id = config['key_id'] self._key_secret = config.get('key_secret')
self._key_secret = config['key_secret']
else:
self._key_id = None
self._key_secret = None
self.json_rpc = json_rpc self.json_rpc = json_rpc
@ -358,16 +378,19 @@ class Client:
async def bars( async def bars(
self, self,
symbol: str, mkt: MktPair,
start_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
) -> dict:
instrument = symbol ) -> list[tuple] | np.ndarray:
instrument: str = mkt.bs_fqme
if end_dt is None: if end_dt is None:
end_dt = pendulum.now('UTC') end_dt = now('UTC')
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
@ -387,29 +410,27 @@ class Client:
}) })
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars = [] new_bars: list[tuple] = []
for i in range(len(result.close)): for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [ row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
result.high[i], result.high[i],
result.low[i], result.low[i],
result.close[i], result.close[i],
result.volume[i], result.volume[i]
0
] ]
new_bars.append((i,) + tuple(row)) new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines if not as_np:
return array return result
return np.array(
new_bars,
dtype=def_iohlcv_fields
)
async def last_trades( async def last_trades(
self, self,
@ -434,7 +455,7 @@ async def get_client(
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_jsonrpc_session( open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc _ws_url, response_type=JSONRPCResult) as json_rpc
): ):
client = Client(json_rpc) client = Client(json_rpc)
@ -523,14 +544,14 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay( async def aio_price_feed_relay(
fh: FeedHandler, fh: FeedHandler,
instrument: Symbol, instrument: str,
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
async def _trade(data: dict, receipt_timestamp): async def _trade(data: dict, receipt_timestamp):
piker_symbol = cb_sym_to_deribit_inst(str_to_cb_sym(data.symbol)).lower()
to_trio.send_nowait(('trade', { to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst( 'symbol': piker_symbol,
str_to_cb_sym(data.symbol)).lower(),
'last': data, 'last': data,
'broker_ts': time.time(), 'broker_ts': time.time(),
'data': data.to_dict(), 'data': data.to_dict(),
@ -538,9 +559,9 @@ async def aio_price_feed_relay(
})) }))
async def _l1(data: dict, receipt_timestamp): async def _l1(data: dict, receipt_timestamp):
piker_symbol = cb_sym_to_deribit_inst(str_to_cb_sym(data.symbol)).lower()
to_trio.send_nowait(('l1', { to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst( 'symbol': piker_symbol,
str_to_cb_sym(data.symbol)).lower(),
'ticks': [ 'ticks': [
{'type': 'bid', {'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)}, 'price': float(data.bid_price), 'size': float(data.bid_size)},

View File

@ -25,12 +25,19 @@ import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import pendulum from pendulum import (
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.brokers import open_cached_client from piker.accounting import MktPair
from piker.brokers import (
open_cached_client,
NoData,
)
from piker.log import get_logger, get_console_log from piker.log import get_logger, get_console_log
from piker.data import ShmArray from piker.data import ShmArray
from piker.brokers._util import ( from piker.brokers._util import (
@ -47,7 +54,7 @@ from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Trade, Client, Trade,
get_config, get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed maybe_open_price_feed
) )
@ -64,30 +71,46 @@ async def open_history_client(
mkt: MktPair, mkt: MktPair,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
async def get_ohlc( async def get_ohlc(
end_dt: Optional[datetime] = None, timeframe: float,
start_dt: Optional[datetime] = None, end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
datetime, # start datetime, # start
datetime, # end datetime, # end
]: ]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars( array: np.ndarray = await client.bars(
instrument, mkt,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
raise DataUnavailable raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = pendulum.from_timestamp(array[0]['time']) start_dt = from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time']) end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt return array, start_dt, end_dt
@ -110,6 +133,8 @@ async def stream_quotes(
sym = symbols[0] sym = symbols[0]
#init_msgs: list[FeedInit] = []
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan

957
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -69,6 +69,8 @@ pdbp = "^1.5.0"
trio = "^0.24" trio = "^0.24"
pendulum = "^3.0.0" pendulum = "^3.0.0"
httpx = "^0.27.0" httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor] [tool.poetry.dependencies.tractor]
develop = true develop = true