Deribit broker fix #8
|
@ -51,6 +51,7 @@ __brokers__: list[str] = [
|
|||
'ib',
|
||||
'kraken',
|
||||
'kucoin',
|
||||
'deribit',
|
||||
|
||||
# broken but used to work
|
||||
# 'questrade',
|
||||
|
@ -61,7 +62,6 @@ __brokers__: list[str] = [
|
|||
# wstrade
|
||||
# iex
|
||||
|
||||
# deribit
|
||||
# bitso
|
||||
]
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ from .api import (
|
|||
get_client,
|
||||
)
|
||||
from .feed import (
|
||||
get_mkt_info,
|
||||
open_history_client,
|
||||
open_symbol_search,
|
||||
stream_quotes,
|
||||
|
@ -34,15 +35,20 @@ from .feed import (
|
|||
# open_trade_dialog,
|
||||
# norm_trade_records,
|
||||
# )
|
||||
from .venues import (
|
||||
OptionPair,
|
||||
)
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
__all__ = [
|
||||
'get_client',
|
||||
# 'trades_dialogue',
|
||||
'get_mkt_info',
|
||||
'open_history_client',
|
||||
'open_symbol_search',
|
||||
'stream_quotes',
|
||||
'OptionPair',
|
||||
# 'norm_trade_records',
|
||||
]
|
||||
|
||||
|
|
|
@ -19,10 +19,14 @@ Deribit backend.
|
|||
|
||||
'''
|
||||
import asyncio
|
||||
from collections import ChainMap
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from datetime import datetime
|
||||
from decimal import (
|
||||
Decimal,
|
||||
)
|
||||
from functools import partial
|
||||
import time
|
||||
from typing import (
|
||||
|
@ -31,7 +35,7 @@ from typing import (
|
|||
Callable,
|
||||
)
|
||||
|
||||
import pendulum
|
||||
from pendulum import now
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from rapidfuzz import process as fuzzy
|
||||
|
@ -51,7 +55,18 @@ from cryptofeed.defines import (
|
|||
OPTION, CALL, PUT
|
||||
)
|
||||
from cryptofeed.symbols import Symbol
|
||||
|
||||
# types for managing the cb callbacks.
|
||||
# from cryptofeed.types import L1Book
|
||||
from .venues import (
|
||||
MarketType,
|
||||
PAIRTYPES,
|
||||
Pair,
|
||||
OptionPair,
|
||||
)
|
||||
from piker.accounting import (
|
||||
Asset,
|
||||
MktPair,
|
||||
)
|
||||
from piker.data import (
|
||||
def_iohlcv_fields,
|
||||
match_from_pairs,
|
||||
|
@ -80,19 +95,19 @@ _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
|
|||
|
||||
|
||||
class JSONRPCResult(Struct):
|
||||
jsonrpc: str = '2.0'
|
||||
id: int
|
||||
result: Optional[list[dict]] = None
|
||||
error: Optional[dict] = None
|
||||
usIn: int
|
||||
usOut: int
|
||||
usDiff: int
|
||||
testnet: bool
|
||||
jsonrpc: str = '2.0'
|
||||
result: Optional[list[dict]] = None
|
||||
error: Optional[dict] = None
|
||||
|
||||
class JSONRPCChannel(Struct):
|
||||
jsonrpc: str = '2.0'
|
||||
method: str
|
||||
params: dict
|
||||
jsonrpc: str = '2.0'
|
||||
|
||||
|
||||
class KLinesResult(Struct):
|
||||
|
@ -116,9 +131,12 @@ class Trade(Struct):
|
|||
instrument_name: str
|
||||
index_price: float
|
||||
direction: str
|
||||
contracts: float
|
||||
amount: float
|
||||
combo_trade_id: Optional[int] = 0,
|
||||
combo_id: Optional[str] = '',
|
||||
amount: float
|
||||
block_trade_leg_count: Optional[int] = 0,
|
||||
block_trade_id: Optional[str] = '',
|
||||
|
||||
class LastTradesResult(Struct):
|
||||
trades: list[Trade]
|
||||
|
@ -142,13 +160,15 @@ def str_to_cb_sym(name: str) -> Symbol:
|
|||
else:
|
||||
raise Exception("Couldn\'t parse option type")
|
||||
|
||||
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
base=base,
|
||||
quote=quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date,
|
||||
expiry_normalize=False)
|
||||
expiry_date=new_expiry_date)
|
||||
|
||||
|
||||
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||
|
@ -159,76 +179,110 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
|
|||
|
||||
if option_type == 'P':
|
||||
option_type = PUT
|
||||
elif option_type == 'C':
|
||||
elif option_type == 'C':
|
||||
option_type = CALL
|
||||
else:
|
||||
raise Exception("Couldn\'t parse option type")
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
base=base,
|
||||
quote=quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date.upper())
|
||||
expiry_date=expiry_date)
|
||||
|
||||
|
||||
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||
# cryptofeed normalized
|
||||
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
|
||||
|
||||
# deribit specific
|
||||
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
|
||||
|
||||
exp = sym.expiry_date
|
||||
|
||||
# YYMDD
|
||||
# 01234
|
||||
year, month, day = (
|
||||
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
|
||||
|
||||
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
|
||||
otype = 'C' if sym.option_type == CALL else 'P'
|
||||
|
||||
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
|
||||
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
|
||||
|
||||
|
||||
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
|
||||
# deribit specific
|
||||
cb_norm = [
|
||||
'F', 'G', 'H', 'J',
|
||||
'K', 'M', 'N', 'Q',
|
||||
'U', 'V', 'X', 'Z'
|
||||
]
|
||||
months = [
|
||||
'JAN', 'FEB', 'MAR', 'APR',
|
||||
'MAY', 'JUN', 'JUL', 'AUG',
|
||||
'SEP', 'OCT', 'NOV', 'DEC'
|
||||
]
|
||||
# YYMDD
|
||||
# 01234
|
||||
day, month, year = (
|
||||
expiry_date[3:],
|
||||
months[cb_norm.index(expiry_date[2:3])],
|
||||
expiry_date[:2]
|
||||
)
|
||||
return f'{day}{month}{year}'
|
||||
|
||||
|
||||
def get_config() -> dict[str, Any]:
|
||||
|
||||
conf, path = config.load()
|
||||
conf: dict
|
||||
path: Path
|
||||
|
||||
conf, path = config.load(
|
||||
conf_name='brokers',
|
||||
touch_if_dne=True,
|
||||
)
|
||||
section: dict = {}
|
||||
section = conf.get('deribit')
|
||||
|
||||
# TODO: document why we send this, basically because logging params for cryptofeed
|
||||
conf['log'] = {}
|
||||
conf['log']['disabled'] = True
|
||||
section['log'] = {}
|
||||
section['log']['filename'] = 'feedhandler.log'
|
||||
section['log']['level'] = 'DEBUG'
|
||||
section['log']['disabled'] = True
|
||||
|
||||
if section is None:
|
||||
log.warning(f'No config section found for deribit in {path}')
|
||||
return {}
|
||||
|
||||
return conf
|
||||
return section
|
||||
|
||||
def get_fh_config() -> dict[str, Any]:
|
||||
conf_option = get_config().get('option', {})
|
||||
conf_log = get_config().get('log', {})
|
||||
|
||||
return {
|
||||
'log': {
|
||||
'filename': conf_log.get('filename'),
|
||||
'level': conf_log.get('level'),
|
||||
'disabled': conf_log.get('disabled')
|
||||
},
|
||||
'deribit': {
|
||||
'key_id': conf_option.get('api_key'),
|
||||
'key_secret': conf_option.get('api_secret')
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class Client:
|
||||
|
||||
def __init__(self, json_rpc: Callable) -> None:
|
||||
self._pairs: dict[str, Any] = None
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
config = get_config().get('deribit', {})
|
||||
json_rpc: Callable
|
||||
|
||||
if ('key_id' in config) and ('key_secret' in config):
|
||||
self._key_id = config['key_id']
|
||||
self._key_secret = config['key_secret']
|
||||
) -> None:
|
||||
self._pairs: ChainMap[str, Pair] = ChainMap()
|
||||
|
||||
else:
|
||||
self._key_id = None
|
||||
self._key_secret = None
|
||||
config = get_config().get('option', {})
|
||||
|
||||
self._key_id = config.get('api_key')
|
||||
self._key_secret = config.get('api_secret')
|
||||
|
||||
self.json_rpc = json_rpc
|
||||
|
||||
@property
|
||||
def currencies(self):
|
||||
return ['btc', 'eth', 'sol', 'usd']
|
||||
|
||||
async def get_balances(self, kind: str = 'option') -> dict[str, float]:
|
||||
async def get_balances(
|
||||
self,
|
||||
kind: str = 'option'
|
||||
) -> dict[str, float]:
|
||||
"""Return the set of positions for this account
|
||||
by symbol.
|
||||
"""
|
||||
|
@ -244,20 +298,39 @@ class Client:
|
|||
|
||||
return balances
|
||||
|
||||
async def get_assets(self) -> dict[str, float]:
|
||||
async def get_assets(
|
||||
self,
|
||||
venue: str | None = None,
|
||||
|
||||
) -> dict[str, Asset]:
|
||||
"""Return the set of asset balances for this account
|
||||
by symbol.
|
||||
"""
|
||||
balances = {}
|
||||
assets = {}
|
||||
resp = await self.json_rpc(
|
||||
'private/get_account_summaries',
|
||||
params={
|
||||
'extended' : True
|
||||
}
|
||||
)
|
||||
summaries = resp.result['summaries']
|
||||
for summary in summaries:
|
||||
currency = summary['currency']
|
||||
tx_tick = Decimal('1e-08')
|
||||
atype='crypto_currency'
|
||||
assets[currency] = Asset(
|
||||
name=currency,
|
||||
atype=atype,
|
||||
tx_tick=tx_tick)
|
||||
return assets
|
||||
|
||||
for currency in self.currencies:
|
||||
resp = await self.json_rpc(
|
||||
'private/get_account_summary', params={
|
||||
'currency': currency.upper()})
|
||||
async def get_mkt_pairs(self) -> dict[str, Pair]:
|
||||
flat: dict[str, Pair] = {}
|
||||
for key in self._pairs:
|
||||
item = self._pairs.get(key)
|
||||
flat[item.bs_fqme] = item
|
||||
|
||||
balances[currency] = resp.result['balance']
|
||||
|
||||
return balances
|
||||
return flat
|
||||
|
||||
async def submit_limit(
|
||||
self,
|
||||
|
@ -286,6 +359,28 @@ class Client:
|
|||
'private/cancel', {'order_id': oid})
|
||||
return resp.result
|
||||
|
||||
async def exch_info(
|
||||
self,
|
||||
sym: str | None = None,
|
||||
|
||||
venue: MarketType | None = None,
|
||||
expiry: str | None = None,
|
||||
|
||||
) -> dict[str, Pair] | Pair:
|
||||
|
||||
pair_table: dict[str, Pair] = self._pairs
|
||||
|
||||
if (
|
||||
sym
|
||||
and (cached_pair := pair_table.get(sym))
|
||||
):
|
||||
return cached_pair
|
||||
|
||||
if sym:
|
||||
return pair_table[sym.lower()]
|
||||
else:
|
||||
return self._pairs
|
||||
|
||||
async def symbol_info(
|
||||
self,
|
||||
instrument: Optional[str] = None,
|
||||
|
@ -293,7 +388,7 @@ class Client:
|
|||
kind: str = 'option',
|
||||
expired: bool = False
|
||||
|
||||
) -> dict[str, dict]:
|
||||
) -> dict[str, Pair] | Pair:
|
||||
'''
|
||||
Get symbol infos.
|
||||
|
||||
|
@ -313,14 +408,29 @@ class Client:
|
|||
params,
|
||||
)
|
||||
# convert to symbol-keyed table
|
||||
pair_type: Type = PAIRTYPES[kind]
|
||||
results: list[dict] | None = resp.result
|
||||
instruments: dict[str, dict] = {
|
||||
item['instrument_name'].lower(): item
|
||||
for item in results
|
||||
}
|
||||
|
||||
instruments: dict[str, Pair] = {}
|
||||
for item in results:
|
||||
symbol=item['instrument_name'].lower()
|
||||
try:
|
||||
pair: Pair = pair_type(
|
||||
symbol=symbol,
|
||||
**item
|
||||
)
|
||||
except Exception as e:
|
||||
e.add_note(
|
||||
"\nDon't panic, prolly stupid deribit changed their symbology schema again..\n"
|
||||
'Check out their API docs here:\n\n'
|
||||
'https://docs.deribit.com/?python#deribit-api-v2-1-1'
|
||||
)
|
||||
raise
|
||||
|
||||
instruments[symbol] = pair
|
||||
|
||||
if instrument is not None:
|
||||
return instruments[instrument]
|
||||
return instruments[instrument.lower()]
|
||||
else:
|
||||
return instruments
|
||||
|
||||
|
@ -337,12 +447,12 @@ class Client:
|
|||
self,
|
||||
pattern: str,
|
||||
limit: int = 30,
|
||||
) -> dict[str, Any]:
|
||||
) -> dict[str, Pair]:
|
||||
'''
|
||||
Fuzzy search symbology set for pairs matching `pattern`.
|
||||
|
||||
'''
|
||||
pairs: dict[str, Any] = await self.symbol_info()
|
||||
pairs: dict[str, Pair] = await self.symbol_info()
|
||||
matches: dict[str, Pair] = match_from_pairs(
|
||||
pairs=pairs,
|
||||
query=pattern.upper(),
|
||||
|
@ -352,22 +462,25 @@ class Client:
|
|||
|
||||
# repack in name-keyed table
|
||||
return {
|
||||
pair['instrument_name'].lower(): pair
|
||||
pair.instrument_name.lower(): pair
|
||||
for pair in matches.values()
|
||||
}
|
||||
|
||||
async def bars(
|
||||
self,
|
||||
symbol: str,
|
||||
mkt: MktPair,
|
||||
|
||||
start_dt: Optional[datetime] = None,
|
||||
end_dt: Optional[datetime] = None,
|
||||
|
||||
limit: int = 1000,
|
||||
as_np: bool = True,
|
||||
) -> dict:
|
||||
instrument = symbol
|
||||
|
||||
) -> list[tuple] | np.ndarray:
|
||||
instrument: str = mkt.bs_mktid
|
||||
|
||||
if end_dt is None:
|
||||
end_dt = pendulum.now('UTC')
|
||||
end_dt = now('UTC')
|
||||
|
||||
if start_dt is None:
|
||||
start_dt = end_dt.start_of(
|
||||
|
@ -387,29 +500,27 @@ class Client:
|
|||
})
|
||||
|
||||
result = KLinesResult(**resp.result)
|
||||
new_bars = []
|
||||
new_bars: list[tuple] = []
|
||||
for i in range(len(result.close)):
|
||||
|
||||
_open = result.open[i]
|
||||
high = result.high[i]
|
||||
low = result.low[i]
|
||||
close = result.close[i]
|
||||
volume = result.volume[i]
|
||||
|
||||
row = [
|
||||
row = [
|
||||
(start_time + (i * (60 * 1000))) / 1000.0, # time
|
||||
result.open[i],
|
||||
result.high[i],
|
||||
result.low[i],
|
||||
result.close[i],
|
||||
result.volume[i],
|
||||
0
|
||||
result.volume[i]
|
||||
]
|
||||
|
||||
new_bars.append((i,) + tuple(row))
|
||||
|
||||
array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines
|
||||
return array
|
||||
if not as_np:
|
||||
return result
|
||||
|
||||
return np.array(
|
||||
new_bars,
|
||||
dtype=def_iohlcv_fields
|
||||
)
|
||||
|
||||
async def last_trades(
|
||||
self,
|
||||
|
@ -434,10 +545,10 @@ async def get_client(
|
|||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_jsonrpc_session(
|
||||
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc
|
||||
_ws_url, response_type=JSONRPCResult
|
||||
) as json_rpc
|
||||
):
|
||||
client = Client(json_rpc)
|
||||
|
||||
_refresh_token: Optional[str] = None
|
||||
_access_token: Optional[str] = None
|
||||
|
||||
|
@ -507,7 +618,7 @@ async def get_client(
|
|||
|
||||
@acm
|
||||
async def open_feed_handler():
|
||||
fh = FeedHandler(config=get_config())
|
||||
fh = FeedHandler(config=get_fh_config())
|
||||
yield fh
|
||||
await to_asyncio.run_task(fh.stop_async)
|
||||
|
||||
|
@ -523,7 +634,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
|
|||
|
||||
async def aio_price_feed_relay(
|
||||
fh: FeedHandler,
|
||||
instrument: Symbol,
|
||||
instrument: str,
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
|
@ -542,21 +653,33 @@ async def aio_price_feed_relay(
|
|||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'ticks': [
|
||||
{'type': 'bid',
|
||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
||||
{'type': 'bsize',
|
||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
||||
{'type': 'ask',
|
||||
'price': float(data.ask_price), 'size': float(data.ask_size)},
|
||||
{'type': 'asize',
|
||||
'price': float(data.ask_price), 'size': float(data.ask_size)}
|
||||
{
|
||||
'type': 'bid',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)
|
||||
},
|
||||
{
|
||||
'type': 'bsize',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)
|
||||
},
|
||||
{
|
||||
'type': 'ask',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)
|
||||
},
|
||||
{
|
||||
'type': 'asize',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)
|
||||
}
|
||||
]
|
||||
}))
|
||||
|
||||
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
channels=[TRADES, L1_BOOK],
|
||||
symbols=[piker_sym_to_cb_sym(instrument)],
|
||||
symbols=[sym],
|
||||
callbacks={
|
||||
TRADES: _trade,
|
||||
L1_BOOK: _l1
|
||||
|
@ -597,9 +720,9 @@ async def maybe_open_price_feed(
|
|||
async with maybe_open_context(
|
||||
acm_func=open_price_feed,
|
||||
kwargs={
|
||||
'instrument': instrument
|
||||
'instrument': instrument.split('.')[0]
|
||||
},
|
||||
key=f'{instrument}-price',
|
||||
key=f'{instrument.split('.')[0]}-price',
|
||||
) as (cache_hit, feed):
|
||||
if cache_hit:
|
||||
yield broadcast_receiver(feed, 10)
|
||||
|
@ -664,10 +787,10 @@ async def maybe_open_order_feed(
|
|||
async with maybe_open_context(
|
||||
acm_func=open_order_feed,
|
||||
kwargs={
|
||||
'instrument': instrument,
|
||||
'instrument': instrument.split('.')[0],
|
||||
'fh': fh
|
||||
},
|
||||
key=f'{instrument}-order',
|
||||
key=f'{instrument.split('.')[0]}-order',
|
||||
) as (cache_hit, feed):
|
||||
if cache_hit:
|
||||
yield broadcast_receiver(feed, 10)
|
||||
|
|
|
@ -21,18 +21,33 @@ Deribit backend.
|
|||
from contextlib import asynccontextmanager as acm
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional, Callable
|
||||
from pprint import pformat
|
||||
import time
|
||||
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import pendulum
|
||||
from pendulum import (
|
||||
from_timestamp,
|
||||
now,
|
||||
)
|
||||
from rapidfuzz import process as fuzzy
|
||||
import numpy as np
|
||||
import tractor
|
||||
|
||||
from piker.brokers import open_cached_client
|
||||
from piker.accounting import (
|
||||
MktPair,
|
||||
unpack_fqme,
|
||||
)
|
||||
from piker.brokers import (
|
||||
open_cached_client,
|
||||
NoData,
|
||||
)
|
||||
from piker._cacheables import (
|
||||
async_lifo_cache,
|
||||
)
|
||||
from piker.log import get_logger, get_console_log
|
||||
from piker.data import ShmArray
|
||||
from piker.data.validate import FeedInit
|
||||
from piker.brokers._util import (
|
||||
BrokerError,
|
||||
DataUnavailable,
|
||||
|
@ -47,9 +62,13 @@ from cryptofeed.symbols import Symbol
|
|||
from .api import (
|
||||
Client, Trade,
|
||||
get_config,
|
||||
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
||||
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
||||
maybe_open_price_feed
|
||||
)
|
||||
from .venues import (
|
||||
Pair,
|
||||
OptionPair,
|
||||
)
|
||||
|
||||
_spawn_kwargs = {
|
||||
'infect_asyncio': True,
|
||||
|
@ -64,36 +83,119 @@ async def open_history_client(
|
|||
mkt: MktPair,
|
||||
) -> tuple[Callable, int]:
|
||||
|
||||
fnstrument: str = mkt.bs_fqme
|
||||
# TODO implement history getter for the new storage layer.
|
||||
async with open_cached_client('deribit') as client:
|
||||
|
||||
async def get_ohlc(
|
||||
end_dt: Optional[datetime] = None,
|
||||
start_dt: Optional[datetime] = None,
|
||||
timeframe: float,
|
||||
end_dt: datetime | None = None,
|
||||
start_dt: datetime | None = None,
|
||||
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
datetime, # start
|
||||
datetime, # end
|
||||
]:
|
||||
if timeframe != 60:
|
||||
raise DataUnavailable('Only 1m bars are supported')
|
||||
|
||||
array = await client.bars(
|
||||
instrument,
|
||||
array: np.ndarray = await client.bars(
|
||||
mkt,
|
||||
start_dt=start_dt,
|
||||
end_dt=end_dt,
|
||||
)
|
||||
if len(array) == 0:
|
||||
raise DataUnavailable
|
||||
raise NoData(
|
||||
f'No frame for {start_dt} -> {end_dt}\n'
|
||||
)
|
||||
|
||||
start_dt = pendulum.from_timestamp(array[0]['time'])
|
||||
end_dt = pendulum.from_timestamp(array[-1]['time'])
|
||||
start_dt = from_timestamp(array[0]['time'])
|
||||
end_dt = from_timestamp(array[-1]['time'])
|
||||
|
||||
times = array['time']
|
||||
if not times.any():
|
||||
raise ValueError(
|
||||
'Bad frame with null-times?\n\n'
|
||||
f'{times}'
|
||||
)
|
||||
|
||||
if end_dt is None:
|
||||
inow: int = round(time.time())
|
||||
if (inow - times[-1]) > 60:
|
||||
await tractor.pause()
|
||||
|
||||
return array, start_dt, end_dt
|
||||
|
||||
yield get_ohlc, {'erlangs': 3, 'rate': 3}
|
||||
|
||||
|
||||
@async_lifo_cache()
|
||||
async def get_mkt_info(
|
||||
fqme: str,
|
||||
|
||||
) -> tuple[MktPair, Pair] | None:
|
||||
|
||||
# uppercase since kraken bs_mktid is always upper
|
||||
if 'deribit' not in fqme.lower():
|
||||
fqme += '.deribit'
|
||||
|
||||
mkt_mode: str = ''
|
||||
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
|
||||
|
||||
# NOTE: we always upper case all tokens to be consistent with
|
||||
# binance's symbology style for pairs, like `BTCUSDT`, but in
|
||||
# theory we could also just keep things lower case; as long as
|
||||
# we're consistent and the symcache matches whatever this func
|
||||
# returns, always!
|
||||
expiry: str = expiry.upper()
|
||||
venue: str = venue.upper()
|
||||
venue_lower: str = venue.lower()
|
||||
|
||||
mkt_mode: str = 'option'
|
||||
|
||||
async with open_cached_client(
|
||||
'deribit',
|
||||
) as client:
|
||||
|
||||
assets: dict[str, Asset] = await client.get_assets()
|
||||
pair_str: str = mkt_ep.lower()
|
||||
|
||||
# switch venue-mode depending on input pattern parsing
|
||||
# since we want to use a particular endpoint (set) for
|
||||
# pair info lookup!
|
||||
client.mkt_mode = mkt_mode
|
||||
|
||||
pair: Pair = await client.exch_info(
|
||||
sym=pair_str,
|
||||
)
|
||||
dst: Asset | None = assets.get(pair.bs_dst_asset)
|
||||
if (
|
||||
not dst
|
||||
# TODO: a known asset DNE list?
|
||||
# and pair.baseAsset == 'DEFI'
|
||||
):
|
||||
log.warning(
|
||||
f'UNKNOWN {venue} asset {pair.base_currency} from,\n'
|
||||
f'{pformat(pair.to_dict())}'
|
||||
)
|
||||
|
||||
# XXX UNKNOWN missing "asset", though no idea why?
|
||||
# maybe it's only avail in the margin venue(s): /dapi/ ?
|
||||
return None
|
||||
|
||||
mkt = MktPair(
|
||||
dst=dst,
|
||||
src=assets.get(pair.bs_src_asset),
|
||||
price_tick=pair.price_tick,
|
||||
size_tick=pair.size_tick,
|
||||
bs_mktid=pair.symbol,
|
||||
expiry=expiry,
|
||||
venue=venue,
|
||||
broker='deribit',
|
||||
)
|
||||
return mkt, pair
|
||||
|
||||
|
||||
async def stream_quotes(
|
||||
|
||||
send_chan: trio.abc.SendChannel,
|
||||
|
@ -110,25 +212,20 @@ async def stream_quotes(
|
|||
|
||||
sym = symbols[0]
|
||||
|
||||
init_msgs: list[FeedInit] = []
|
||||
|
||||
async with (
|
||||
open_cached_client('deribit') as client,
|
||||
send_chan as send_chan
|
||||
):
|
||||
|
||||
init_msgs = {
|
||||
# pass back token, and bool, signalling if we're the writer
|
||||
# and that history has been written
|
||||
sym: {
|
||||
'symbol_info': {
|
||||
'asset_type': 'option',
|
||||
'price_tick_size': 0.0005
|
||||
},
|
||||
'shm_write_opts': {'sum_tick_vml': False},
|
||||
'fqsn': sym,
|
||||
},
|
||||
}
|
||||
mkt, pair = await get_mkt_info(sym)
|
||||
|
||||
nsym = piker_sym_to_cb_sym(sym)
|
||||
# build out init msgs according to latest spec
|
||||
init_msgs.append(
|
||||
FeedInit(mkt_info=mkt)
|
||||
)
|
||||
nsym = piker_sym_to_cb_sym(sym.split('.')[0])
|
||||
|
||||
async with maybe_open_price_feed(sym) as stream:
|
||||
|
||||
|
@ -179,7 +276,16 @@ async def open_symbol_search(
|
|||
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
pattern: str
|
||||
async for pattern in stream:
|
||||
# repack in dict form
|
||||
await stream.send(
|
||||
await client.search_symbols(pattern))
|
||||
# NOTE: pattern fuzzy-matching is done within
|
||||
# the methd impl.
|
||||
pairs: dict[str, Pair] = await client.search_symbols(
|
||||
pattern,
|
||||
)
|
||||
# repack in fqme-keyed table
|
||||
byfqme: dict[str, Pair] = {}
|
||||
for pair in pairs.values():
|
||||
byfqme[pair.bs_fqme] = pair
|
||||
|
||||
await stream.send(byfqme)
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
Per market data-type definitions and schemas types.
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from typing import (
|
||||
Literal,
|
||||
)
|
||||
from decimal import Decimal
|
||||
|
||||
from msgspec import field
|
||||
|
||||
from piker.types import Struct
|
||||
|
||||
|
||||
# API endpoint paths by venue / sub-API
|
||||
_domain: str = 'deribit.com'
|
||||
_url = f'https://www.{_domain}'
|
||||
|
||||
# WEBsocketz
|
||||
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
|
||||
|
||||
# test nets
|
||||
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
|
||||
|
||||
MarketType = Literal[
|
||||
'option'
|
||||
]
|
||||
|
||||
|
||||
def get_api_eps(venue: MarketType) -> tuple[str, str]:
|
||||
'''
|
||||
Return API ep root paths per venue.
|
||||
|
||||
'''
|
||||
return {
|
||||
'option': (
|
||||
_ws_url,
|
||||
),
|
||||
}[venue]
|
||||
|
||||
|
||||
class Pair(Struct, frozen=True, kw_only=True):
|
||||
|
||||
symbol: str
|
||||
|
||||
# src
|
||||
quote_currency: str # 'BTC'
|
||||
|
||||
# dst
|
||||
base_currency: str # "BTC",
|
||||
|
||||
tick_size: float # 0.0001
|
||||
tick_size_steps: list[dict[str, str | int | float]] # [{'above_price': 0.005, 'tick_size': 0.0005}]
|
||||
|
||||
@property
|
||||
def price_tick(self) -> Decimal:
|
||||
step_size: float = self.tick_size_steps[0].get('above_price')
|
||||
return Decimal(step_size)
|
||||
|
||||
@property
|
||||
def size_tick(self) -> Decimal:
|
||||
step_size: float = self.tick_size_steps[0].get('tick_size')
|
||||
return Decimal(step_size)
|
||||
|
||||
@property
|
||||
def bs_fqme(self) -> str:
|
||||
return self.symbol
|
||||
|
||||
@property
|
||||
def bs_mktid(self) -> str:
|
||||
return f'{self.symbol}'
|
||||
|
||||
|
||||
class OptionPair(Pair, frozen=True, kw_only=True):
|
||||
|
||||
taker_commission: float # 0.0003
|
||||
strike: float # 5000.0
|
||||
settlement_period: str # 'day'
|
||||
settlement_currency: str # "BTC",
|
||||
rfq: bool # false
|
||||
price_index: str # 'btc_usd'
|
||||
option_type: str # 'call'
|
||||
min_trade_amount: float # 0.1
|
||||
maker_commission: float # 0.0003
|
||||
kind: str # 'option'
|
||||
is_active: bool # true
|
||||
instrument_type: str # 'reversed'
|
||||
instrument_name: str # 'BTC-1SEP24-55000-C'
|
||||
instrument_id: int # 364671
|
||||
expiration_timestamp: int # 1725177600000
|
||||
creation_timestamp: int # 1724918461000
|
||||
counter_currency: str # 'USD'
|
||||
contract_size: float # '1.0'
|
||||
block_trade_tick_size: float # '0.0001'
|
||||
block_trade_min_trade_amount: int # '25'
|
||||
block_trade_commission: float # '0.003'
|
||||
|
||||
|
||||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
||||
ns_path: str = 'piker.brokers.deribit:OptionPair'
|
||||
|
||||
@property
|
||||
def expiry(self) -> str:
|
||||
symbol: str = self.instrument_name.lower()
|
||||
pair, expiry, strike_price, otype = symbol.split('-')
|
||||
return f'{expiry}'
|
||||
|
||||
@property
|
||||
def venue(self) -> str:
|
||||
return 'OPTION'
|
||||
|
||||
@property
|
||||
def bs_fqme(self) -> str:
|
||||
return f'{self.symbol}.{self.venue}.{self.expiry}'
|
||||
|
||||
@property
|
||||
def bs_src_asset(self) -> str:
|
||||
return f'{self.quote_currency}'
|
||||
|
||||
@property
|
||||
def bs_dst_asset(self) -> str:
|
||||
return f'{self.base_currency}'
|
||||
|
||||
@property
|
||||
def bs_mktid(self) -> str:
|
||||
return f'{self.symbol}'
|
||||
|
||||
|
||||
PAIRTYPES: dict[MarketType, Pair] = {
|
||||
'option': OptionPair,
|
||||
}
|
|
@ -273,7 +273,7 @@ async def _reconnect_forever(
|
|||
nobsws._connected.set()
|
||||
await trio.sleep_forever()
|
||||
except HandshakeError:
|
||||
log.exception(f'Retrying connection')
|
||||
log.exception('Retrying connection')
|
||||
|
||||
# ws & nursery block ends
|
||||
|
||||
|
@ -359,8 +359,8 @@ async def open_autorecon_ws(
|
|||
|
||||
|
||||
'''
|
||||
JSONRPC response-request style machinery for transparent multiplexing of msgs
|
||||
over a NoBsWs.
|
||||
JSONRPC response-request style machinery for transparent multiplexing
|
||||
of msgs over a NoBsWs.
|
||||
|
||||
'''
|
||||
|
||||
|
@ -377,16 +377,20 @@ async def open_jsonrpc_session(
|
|||
url: str,
|
||||
start_id: int = 0,
|
||||
response_type: type = JSONRPCResult,
|
||||
request_type: Optional[type] = None,
|
||||
request_hook: Optional[Callable] = None,
|
||||
error_hook: Optional[Callable] = None,
|
||||
# request_type: Optional[type] = None,
|
||||
# request_hook: Optional[Callable] = None,
|
||||
# error_hook: Optional[Callable] = None,
|
||||
) -> Callable[[str, dict], dict]:
|
||||
|
||||
# NOTE, store all request msgs so we can raise errors on the
|
||||
# caller side!
|
||||
req_msgs: dict[int, dict] = {}
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_autorecon_ws(url) as ws
|
||||
):
|
||||
rpc_id: Iterable = count(start_id)
|
||||
rpc_id: Iterable[int] = count(start_id)
|
||||
rpc_results: dict[int, dict] = {}
|
||||
|
||||
async def json_rpc(method: str, params: dict) -> dict:
|
||||
|
@ -394,26 +398,40 @@ async def open_jsonrpc_session(
|
|||
perform a json rpc call and wait for the result, raise exception in
|
||||
case of error field present on response
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
|
||||
req_id: int = next(rpc_id)
|
||||
msg = {
|
||||
'jsonrpc': '2.0',
|
||||
'id': next(rpc_id),
|
||||
'id': req_id,
|
||||
'method': method,
|
||||
'params': params
|
||||
}
|
||||
_id = msg['id']
|
||||
|
||||
rpc_results[_id] = {
|
||||
result = rpc_results[_id] = {
|
||||
'result': None,
|
||||
'event': trio.Event()
|
||||
'error': None,
|
||||
'event': trio.Event(), # signal caller resp arrived
|
||||
}
|
||||
req_msgs[_id] = msg
|
||||
|
||||
await ws.send_msg(msg)
|
||||
|
||||
# wait for reponse before unblocking requester code
|
||||
await rpc_results[_id]['event'].wait()
|
||||
|
||||
ret = rpc_results[_id]['result']
|
||||
if (maybe_result := result['result']):
|
||||
ret = maybe_result
|
||||
del rpc_results[_id]
|
||||
|
||||
del rpc_results[_id]
|
||||
else:
|
||||
err = result['error']
|
||||
raise Exception(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {msg}\n'
|
||||
f'resp: {err}\n'
|
||||
)
|
||||
|
||||
if ret.error is not None:
|
||||
raise Exception(json.dumps(ret.error, indent=4))
|
||||
|
@ -428,6 +446,7 @@ async def open_jsonrpc_session(
|
|||
the server side.
|
||||
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
async for msg in ws:
|
||||
match msg:
|
||||
case {
|
||||
|
@ -451,15 +470,29 @@ async def open_jsonrpc_session(
|
|||
'params': _,
|
||||
}:
|
||||
log.debug(f'Recieved\n{msg}')
|
||||
if request_hook:
|
||||
await request_hook(request_type(**msg))
|
||||
# if request_hook:
|
||||
# await request_hook(request_type(**msg))
|
||||
|
||||
case {
|
||||
'error': error
|
||||
}:
|
||||
log.warning(f'Recieved\n{error}')
|
||||
if error_hook:
|
||||
await error_hook(response_type(**msg))
|
||||
# if error_hook:
|
||||
# await error_hook(response_type(**msg))
|
||||
|
||||
# retreive orig request msg, set error
|
||||
# response in original "result" msg,
|
||||
# THEN FINALLY set the event to signal caller
|
||||
# to raise the error in the parent task.
|
||||
req_id: int = msg['id']
|
||||
req_msg: dict = req_msgs[req_id]
|
||||
result: dict = rpc_results[req_id]
|
||||
result['error'] = error
|
||||
result['event'].set()
|
||||
log.error(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {req_msg}\n'
|
||||
f'resp: {error}\n'
|
||||
)
|
||||
|
||||
case _:
|
||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -69,6 +69,8 @@ pdbp = "^1.5.0"
|
|||
trio = "^0.24"
|
||||
pendulum = "^3.0.0"
|
||||
httpx = "^0.27.0"
|
||||
cryptofeed = "^2.4.0"
|
||||
pyarrow = "^17.0.0"
|
||||
|
||||
[tool.poetry.dependencies.tractor]
|
||||
develop = true
|
||||
|
|
Loading…
Reference in New Issue