Added cryptofeed and pyarrow necessary for the feed, enable deribit

in the brokers init file, at this point the feed is working, to check
the tables use vd tool.
jsonrpc_err_in_rent_task
Nelson Torres 2024-08-27 23:58:36 +00:00
parent 0be454c3d6
commit b2cfa3444f
5 changed files with 1088 additions and 74 deletions

View File

@ -51,6 +51,7 @@ __brokers__: list[str] = [
'ib', 'ib',
'kraken', 'kraken',
'kucoin', 'kucoin',
'deribit',
# broken but used to work # broken but used to work
# 'questrade', # 'questrade',
@ -61,7 +62,6 @@ __brokers__: list[str] = [
# wstrade # wstrade
# iex # iex
# deribit
# bitso # bitso
] ]

View File

@ -31,7 +31,7 @@ from typing import (
Callable, Callable,
) )
import pendulum from pendulum import now
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
@ -51,7 +51,9 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from piker.accounting import MktPair
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
@ -80,19 +82,19 @@ _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int id: int
result: Optional[list[dict]] = None
error: Optional[dict] = None
usIn: int usIn: int
usOut: int usOut: int
usDiff: int usDiff: int
testnet: bool testnet: bool
jsonrpc: str = '2.0'
result: Optional[list[dict]] = None
error: Optional[dict] = None
class JSONRPCChannel(Struct): class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str method: str
params: dict params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct): class KLinesResult(Struct):
@ -116,9 +118,12 @@ class Trade(Struct):
instrument_name: str instrument_name: str
index_price: float index_price: float
direction: str direction: str
contracts: float
amount: float
combo_trade_id: Optional[int] = 0, combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '', combo_id: Optional[str] = '',
amount: float block_trade_leg_count: Optional[int] = 0,
block_trade_id: Optional[str] = '',
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: list[Trade] trades: list[Trade]
@ -142,13 +147,15 @@ def str_to_cb_sym(name: str) -> Symbol:
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
return Symbol( return Symbol(
base, quote, base=base,
quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date, expiry_date=new_expiry_date)
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol: def piker_sym_to_cb_sym(name: str) -> Symbol:
@ -159,68 +166,85 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
if option_type == 'P': if option_type == 'P':
option_type = PUT option_type = PUT
elif option_type == 'C': elif option_type == 'C':
option_type = CALL option_type = CALL
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
return Symbol( return Symbol(
base, quote, base=base,
quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date.upper()) expiry_date=expiry_date)
def cb_sym_to_deribit_inst(sym: Symbol): def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P' otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
# deribit specific
cb_norm = [
'F', 'G', 'H', 'J',
'K', 'M', 'N', 'Q',
'U', 'V', 'X', 'Z'
]
months = [
'JAN', 'FEB', 'MAR', 'APR',
'MAY', 'JUN', 'JUL', 'AUG',
'SEP', 'OCT', 'NOV', 'DEC'
]
# YYMDD
# 01234
day, month, year = (
expiry_date[3:],
months[cb_norm.index(expiry_date[2:3])],
expiry_date[:2]
)
return f'{day}{month}{year}'
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
conf, path = config.load() conf: dict
path: Path
section = conf.get('deribit') conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section['deribit'] = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed section['log'] = {}
conf['log'] = {} section['log']['disabled'] = True
conf['log']['disabled'] = True
if section is None: if section is None:
log.warning(f'No config section found for deribit in {path}') log.warning(f'No config section found for deribit in {path}')
return {}
return conf return section
class Client: class Client:
def __init__(self, json_rpc: Callable) -> None: def __init__(
self,
json_rpc: Callable
) -> None:
self._pairs: dict[str, Any] = None self._pairs: dict[str, Any] = None
config = get_config().get('deribit', {}) config = get_config().get('deribit', {})
if ('key_id' in config) and ('key_secret' in config): self._key_id = config.get('key_id')
self._key_id = config['key_id'] self._key_secret = config.get('key_secret')
self._key_secret = config['key_secret']
else:
self._key_id = None
self._key_secret = None
self.json_rpc = json_rpc self.json_rpc = json_rpc
@ -228,7 +252,10 @@ class Client:
def currencies(self): def currencies(self):
return ['btc', 'eth', 'sol', 'usd'] return ['btc', 'eth', 'sol', 'usd']
async def get_balances(self, kind: str = 'option') -> dict[str, float]: async def get_balances(
self,
kind: str = 'option'
) -> dict[str, float]:
"""Return the set of positions for this account """Return the set of positions for this account
by symbol. by symbol.
""" """
@ -358,16 +385,19 @@ class Client:
async def bars( async def bars(
self, self,
symbol: str, mkt: MktPair,
start_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
) -> dict:
instrument = symbol ) -> list[tuple] | np.ndarray:
instrument: str = mkt.bs_fqme
if end_dt is None: if end_dt is None:
end_dt = pendulum.now('UTC') end_dt = now('UTC')
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
@ -387,29 +417,27 @@ class Client:
}) })
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars = [] new_bars: list[tuple] = []
for i in range(len(result.close)): for i in range(len(result.close)):
_open = result.open[i] row = [
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
result.high[i], result.high[i],
result.low[i], result.low[i],
result.close[i], result.close[i],
result.volume[i], result.volume[i]
0
] ]
new_bars.append((i,) + tuple(row)) new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines if not as_np:
return array return result
return np.array(
new_bars,
dtype=def_iohlcv_fields
)
async def last_trades( async def last_trades(
self, self,
@ -434,7 +462,8 @@ async def get_client(
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_jsonrpc_session( open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc _ws_url, response_type=JSONRPCResult
) as json_rpc
): ):
client = Client(json_rpc) client = Client(json_rpc)
@ -523,7 +552,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay( async def aio_price_feed_relay(
fh: FeedHandler, fh: FeedHandler,
instrument: Symbol, instrument: str,
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:

View File

@ -25,12 +25,19 @@ import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import pendulum from pendulum import (
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.brokers import open_cached_client from piker.accounting import MktPair
from piker.brokers import (
open_cached_client,
NoData,
)
from piker.log import get_logger, get_console_log from piker.log import get_logger, get_console_log
from piker.data import ShmArray from piker.data import ShmArray
from piker.brokers._util import ( from piker.brokers._util import (
@ -47,7 +54,7 @@ from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Trade, Client, Trade,
get_config, get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed maybe_open_price_feed
) )
@ -64,30 +71,46 @@ async def open_history_client(
mkt: MktPair, mkt: MktPair,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
async def get_ohlc( async def get_ohlc(
end_dt: Optional[datetime] = None, timeframe: float,
start_dt: Optional[datetime] = None, end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
datetime, # start datetime, # start
datetime, # end datetime, # end
]: ]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars( array: np.ndarray = await client.bars(
instrument, mkt,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
raise DataUnavailable raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = pendulum.from_timestamp(array[0]['time']) start_dt = from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time']) end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt return array, start_dt, end_dt
@ -110,6 +133,8 @@ async def stream_quotes(
sym = symbols[0] sym = symbols[0]
#init_msgs: list[FeedInit] = []
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
@ -123,7 +148,10 @@ async def stream_quotes(
'asset_type': 'option', 'asset_type': 'option',
'price_tick_size': 0.0005 'price_tick_size': 0.0005
}, },
'shm_write_opts': {'sum_tick_vml': False}, 'shm_write_opts': {
'sum_tick_vml': True,
'has_vlm': True
},
'fqsn': sym, 'fqsn': sym,
}, },
} }

957
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -69,6 +69,8 @@ pdbp = "^1.5.0"
trio = "^0.24" trio = "^0.24"
pendulum = "^3.0.0" pendulum = "^3.0.0"
httpx = "^0.27.0" httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor] [tool.poetry.dependencies.tractor]
develop = true develop = true