Remove cryptofeeds/asyncio from deribit backend
Add hook management to open_jsonrpc_session helperderibit_updates
parent
4c838474be
commit
1c833e7175
|
@ -52,13 +52,3 @@ __enable_modules__: list[str] = [
|
|||
'feed',
|
||||
# 'broker',
|
||||
]
|
||||
|
||||
# passed to ``tractor.ActorNursery.start_actor()``
|
||||
_spawn_kwargs = {
|
||||
'infect_asyncio': True,
|
||||
}
|
||||
|
||||
# annotation to let backend agnostic code
|
||||
# know if ``brokerd`` should be spawned with
|
||||
# ``tractor``'s aio mode.
|
||||
_infect_asyncio: bool = True
|
||||
|
|
|
@ -20,7 +20,6 @@ Deribit backend.
|
|||
'''
|
||||
from __future__ import annotations
|
||||
import time
|
||||
import asyncio
|
||||
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from functools import partial
|
||||
|
@ -31,15 +30,6 @@ from typing import (
|
|||
Callable,
|
||||
)
|
||||
|
||||
from cryptofeed import FeedHandler
|
||||
from cryptofeed.defines import (
|
||||
DERIBIT,
|
||||
L1_BOOK,
|
||||
TRADES,
|
||||
OPTION,
|
||||
CALL,
|
||||
PUT,
|
||||
)
|
||||
import pendulum
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
|
@ -49,8 +39,6 @@ from tractor.trionics import (
|
|||
broadcast_receiver,
|
||||
maybe_open_context
|
||||
)
|
||||
from tractor import to_asyncio
|
||||
from cryptofeed.symbols import Symbol
|
||||
|
||||
from piker.data.types import Struct
|
||||
from piker.data._web_bs import (
|
||||
|
@ -59,16 +47,12 @@ from piker.data._web_bs import (
|
|||
|
||||
from piker import config
|
||||
from piker.log import get_logger
|
||||
from piker._cacheables import open_cached_client
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
_spawn_kwargs = {
|
||||
'infect_asyncio': True,
|
||||
}
|
||||
|
||||
|
||||
_url = 'https://www.deribit.com'
|
||||
_ws_url = 'wss://www.deribit.com/ws/api/v2'
|
||||
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
|
||||
|
@ -142,70 +126,12 @@ def deribit_timestamp(when):
|
|||
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
|
||||
|
||||
|
||||
def str_to_cb_sym(name: str) -> Symbol:
|
||||
base, strike_price, expiry_date, option_type = name.split('-')
|
||||
|
||||
quote = base
|
||||
|
||||
if option_type == 'put':
|
||||
option_type = PUT
|
||||
elif option_type == 'call':
|
||||
option_type = CALL
|
||||
else:
|
||||
raise Exception("Couldn\'t parse option type")
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date,
|
||||
expiry_normalize=False)
|
||||
def sym_fmt_piker_to_deribit(sym: str) -> str:
|
||||
return sym.upper()
|
||||
|
||||
|
||||
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||
base, expiry_date, strike_price, option_type = tuple(
|
||||
name.upper().split('-'))
|
||||
|
||||
quote = base
|
||||
|
||||
if option_type == 'P':
|
||||
option_type = PUT
|
||||
elif option_type == 'C':
|
||||
option_type = CALL
|
||||
else:
|
||||
raise Exception("Couldn\'t parse option type")
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date.upper())
|
||||
|
||||
|
||||
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||
# cryptofeed normalized
|
||||
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
|
||||
|
||||
# deribit specific
|
||||
months = [
|
||||
'JAN', 'FEB', 'MAR',
|
||||
'APR', 'MAY', 'JUN',
|
||||
'JUL', 'AUG', 'SEP',
|
||||
'OCT', 'NOV', 'DEC',
|
||||
]
|
||||
|
||||
exp = sym.expiry_date
|
||||
|
||||
# YYMDD
|
||||
# 01234
|
||||
year, month, day = (
|
||||
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
|
||||
|
||||
otype = 'C' if sym.option_type == CALL else 'P'
|
||||
|
||||
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
|
||||
def sym_fmt_deribit_to_piker(sym: str):
|
||||
return sym.lower()
|
||||
|
||||
|
||||
def get_config() -> dict[str, Any]:
|
||||
|
@ -214,11 +140,6 @@ def get_config() -> dict[str, Any]:
|
|||
|
||||
section = conf.get('deribit')
|
||||
|
||||
# TODO: document why we send this, basically because logging params
|
||||
# for cryptofeed
|
||||
conf['log'] = {}
|
||||
conf['log']['disabled'] = True
|
||||
|
||||
if section is None:
|
||||
log.warning(f'No config section found for deribit in {path}')
|
||||
|
||||
|
@ -227,7 +148,13 @@ def get_config() -> dict[str, Any]:
|
|||
|
||||
class Client:
|
||||
|
||||
def __init__(self, json_rpc: Callable) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
json_rpc: Callable,
|
||||
update_hooks: Callable,
|
||||
update_types: Callable,
|
||||
) -> None:
|
||||
|
||||
self._pairs: dict[str, Any] = None
|
||||
|
||||
config = get_config().get('deribit', {})
|
||||
|
@ -241,6 +168,8 @@ class Client:
|
|||
self._key_secret = None
|
||||
|
||||
self.json_rpc = json_rpc
|
||||
self.update_hooks = update_hooks
|
||||
self.update_types = update_types
|
||||
|
||||
@property
|
||||
def currencies(self):
|
||||
|
@ -287,7 +216,7 @@ class Client:
|
|||
"""Place an order
|
||||
"""
|
||||
params = {
|
||||
'instrument_name': symbol.upper(),
|
||||
'instrument_name': sym_fmt_piker_to_deribit(symbol),
|
||||
'amount': size,
|
||||
'type': 'limit',
|
||||
'price': price,
|
||||
|
@ -328,7 +257,7 @@ class Client:
|
|||
results = resp.result
|
||||
|
||||
instruments = {
|
||||
item['instrument_name'].lower(): item
|
||||
sym_fmt_deribit_to_piker(item['instrument_name']): item
|
||||
for item in results
|
||||
}
|
||||
|
||||
|
@ -359,8 +288,10 @@ class Client:
|
|||
limit=limit
|
||||
)
|
||||
# repack in dict form
|
||||
return {item[0]['instrument_name'].lower(): item[0]
|
||||
for item in matches}
|
||||
return {
|
||||
sym_fmt_deribit_to_piker(item[0]['instrument_name']): item[0]
|
||||
for item in matches
|
||||
}
|
||||
|
||||
async def bars(
|
||||
self,
|
||||
|
@ -387,7 +318,7 @@ class Client:
|
|||
resp = await self.json_rpc(
|
||||
'public/get_tradingview_chart_data',
|
||||
params={
|
||||
'instrument_name': instrument.upper(),
|
||||
'instrument_name': sym_fmt_piker_to_deribit(instrument),
|
||||
'start_timestamp': start_time,
|
||||
'end_timestamp': end_time,
|
||||
'resolution': '1'
|
||||
|
@ -420,13 +351,19 @@ class Client:
|
|||
resp = await self.json_rpc(
|
||||
'public/get_last_trades_by_instrument',
|
||||
params={
|
||||
'instrument_name': instrument,
|
||||
'instrument_name': sym_fmt_piker_to_deribit(instrument),
|
||||
'count': count
|
||||
})
|
||||
|
||||
return LastTradesResult(**resp.result)
|
||||
|
||||
|
||||
class JSONRPCSubRequest(Struct):
|
||||
method: str
|
||||
params: dict
|
||||
jsonrpc: str = '2.0'
|
||||
|
||||
|
||||
@acm
|
||||
async def get_client(
|
||||
is_brokercheck: bool = False
|
||||
|
@ -435,11 +372,11 @@ async def get_client(
|
|||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_jsonrpc_session(
|
||||
_testnet_ws_url,
|
||||
_ws_url,
|
||||
response_type=JSONRPCResult
|
||||
) as json_rpc
|
||||
) as control_functions
|
||||
):
|
||||
client = Client(json_rpc)
|
||||
client = Client(*control_functions)
|
||||
|
||||
_refresh_token: Optional[str] = None
|
||||
_access_token: Optional[str] = None
|
||||
|
@ -452,7 +389,7 @@ async def get_client(
|
|||
|
||||
https://docs.deribit.com/?python#authentication-2
|
||||
"""
|
||||
renew_time = 10
|
||||
renew_time = 240
|
||||
access_scope = 'trade:read_write'
|
||||
_expiry_time = time.time()
|
||||
got_access = False
|
||||
|
@ -482,7 +419,7 @@ async def get_client(
|
|||
'scope': access_scope
|
||||
}
|
||||
|
||||
resp = await json_rpc('public/auth', params)
|
||||
resp = await client.json_rpc('public/auth', params)
|
||||
result = resp.result
|
||||
|
||||
_expiry_time = time.time() + result['expires_in']
|
||||
|
@ -509,97 +446,68 @@ async def get_client(
|
|||
n.cancel_scope.cancel()
|
||||
|
||||
|
||||
@acm
|
||||
async def open_feed_handler():
|
||||
fh = FeedHandler(config=get_config())
|
||||
yield fh
|
||||
await to_asyncio.run_task(fh.stop_async)
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
|
||||
async with maybe_open_context(
|
||||
acm_func=open_feed_handler,
|
||||
key='feedhandler',
|
||||
) as (cache_hit, fh):
|
||||
yield fh
|
||||
|
||||
|
||||
async def aio_price_feed_relay(
|
||||
fh: FeedHandler,
|
||||
instrument: Symbol,
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
async def _trade(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('trade', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'last': data,
|
||||
'broker_ts': time.time(),
|
||||
'data': data.to_dict(),
|
||||
'receipt': receipt_timestamp
|
||||
}))
|
||||
|
||||
async def _l1(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(
|
||||
('l1', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'ticks': [
|
||||
|
||||
{'type': 'bid',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)},
|
||||
|
||||
{'type': 'bsize',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)},
|
||||
|
||||
{'type': 'ask',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)},
|
||||
|
||||
{'type': 'asize',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)}
|
||||
]
|
||||
})
|
||||
)
|
||||
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
channels=[TRADES, L1_BOOK],
|
||||
symbols=[piker_sym_to_cb_sym(instrument)],
|
||||
callbacks={
|
||||
TRADES: _trade,
|
||||
L1_BOOK: _l1
|
||||
})
|
||||
|
||||
if not fh.running:
|
||||
fh.run(
|
||||
start_loop=False,
|
||||
install_signal_handlers=False)
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
|
||||
@acm
|
||||
async def open_price_feed(
|
||||
instrument: str
|
||||
) -> trio.abc.ReceiveStream:
|
||||
async with maybe_open_feed_handler() as fh:
|
||||
async with to_asyncio.open_channel_from(
|
||||
partial(
|
||||
aio_price_feed_relay,
|
||||
fh,
|
||||
instrument
|
||||
)
|
||||
) as (first, chan):
|
||||
yield chan
|
||||
|
||||
instrument_db = sym_fmt_piker_to_deribit(instrument)
|
||||
|
||||
trades_chan = f'trades.{instrument_db}.raw'
|
||||
book_chan = f'book.{instrument_db}.none.1.100ms'
|
||||
|
||||
channels = [trades_chan, book_chan]
|
||||
|
||||
send_chann, recv_chann = trio.open_memory_channel(0)
|
||||
async def sub_hook(msg):
|
||||
chan = msg.params['channel']
|
||||
data = msg.params['data']
|
||||
if chan == trades_chan:
|
||||
await send_chann.send((
|
||||
'trade', {
|
||||
'symbol': instrument,
|
||||
'last': data['price'],
|
||||
'brokerd_ts': time.time(),
|
||||
'ticks': [{
|
||||
'type': 'trade',
|
||||
'price': data['price'],
|
||||
'size': data['amount'],
|
||||
'broker_ts': data['timestamp']
|
||||
}]
|
||||
}
|
||||
))
|
||||
|
||||
elif chan == book_chan:
|
||||
bid, bsize = data['bids'][0]
|
||||
ask, asize = data['asks'][0]
|
||||
await send_chann.send((
|
||||
'l1', {
|
||||
'symbol': instrument,
|
||||
'ticks': [
|
||||
{'type': 'bid', 'price': bid, 'size': bsize},
|
||||
{'type': 'bsize', 'price': bid, 'size': bsize},
|
||||
{'type': 'ask', 'price': ask, 'size': asize},
|
||||
{'type': 'asize', 'price': ask, 'size': asize}
|
||||
]}
|
||||
))
|
||||
|
||||
async with open_cached_client('deribit') as client:
|
||||
|
||||
client.update_hooks({
|
||||
'request': sub_hook
|
||||
})
|
||||
client.update_types({
|
||||
'request': JSONRPCSubRequest
|
||||
})
|
||||
|
||||
resp = await client.json_rpc(
|
||||
'private/subscribe', {'channels': channels})
|
||||
|
||||
assert resp.result == channels
|
||||
|
||||
log.info(f'Subscribed to {channels}')
|
||||
|
||||
yield recv_chann
|
||||
|
||||
|
||||
@acm
|
||||
|
|
|
@ -40,16 +40,9 @@ from piker.brokers._util import (
|
|||
from .api import (
|
||||
Client,
|
||||
Trade,
|
||||
piker_sym_to_cb_sym,
|
||||
cb_sym_to_deribit_inst,
|
||||
maybe_open_price_feed
|
||||
)
|
||||
|
||||
_spawn_kwargs = {
|
||||
'infect_asyncio': True,
|
||||
}
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
|
@ -107,10 +100,7 @@ async def stream_quotes(
|
|||
|
||||
sym = symbols[0]
|
||||
|
||||
async with (
|
||||
open_cached_client('deribit') as client,
|
||||
send_chan as send_chan
|
||||
):
|
||||
async with open_cached_client('deribit') as client:
|
||||
|
||||
init_msgs = {
|
||||
# pass back token, and bool, signalling if we're the writer
|
||||
|
@ -118,22 +108,19 @@ async def stream_quotes(
|
|||
sym: {
|
||||
'symbol_info': {
|
||||
'asset_type': 'option',
|
||||
'price_tick_size': 0.0005
|
||||
'price_tick_size': 0.0005,
|
||||
'lot_tick_size': 0.1
|
||||
},
|
||||
'shm_write_opts': {'sum_tick_vml': False},
|
||||
'fqsn': sym,
|
||||
},
|
||||
}
|
||||
|
||||
nsym = piker_sym_to_cb_sym(sym)
|
||||
|
||||
last_trades = (await client.last_trades(sym, count=1)).trades
|
||||
|
||||
async with maybe_open_price_feed(sym) as stream:
|
||||
|
||||
await client.cache_symbols()
|
||||
|
||||
last_trades = (await client.last_trades(
|
||||
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||
|
||||
if len(last_trades) == 0:
|
||||
last_trade = None
|
||||
async for typ, quote in stream:
|
||||
|
|
|
@ -187,7 +187,6 @@ over a NoBsWs.
|
|||
|
||||
'''
|
||||
|
||||
|
||||
class JSONRPCResult(Struct):
|
||||
id: int
|
||||
jsonrpc: str = '2.0'
|
||||
|
@ -202,9 +201,32 @@ async def open_jsonrpc_session(
|
|||
response_type: type = JSONRPCResult,
|
||||
request_type: Optional[type] = None,
|
||||
request_hook: Optional[Callable] = None,
|
||||
error_hook: Optional[Callable] = None,
|
||||
error_hook: Optional[Callable] = None
|
||||
) -> Callable[[str, dict], dict]:
|
||||
|
||||
# xor: this two params need to be passed together or not at all
|
||||
if bool(request_type) ^ bool(request_hook):
|
||||
raise ValueError(
|
||||
'Need to path both a request_type and request_hook')
|
||||
|
||||
hook_table = {
|
||||
'request': request_hook,
|
||||
'error': error_hook
|
||||
}
|
||||
|
||||
types_table = {
|
||||
'response': response_type,
|
||||
'request': request_type
|
||||
}
|
||||
|
||||
def update_hooks(new_hooks: dict):
|
||||
nonlocal hook_table
|
||||
hook_table.update(new_hooks)
|
||||
|
||||
def update_types(new_types: dict):
|
||||
nonlocal types_table
|
||||
types_table.update(new_types)
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_autorecon_ws(url) as ws
|
||||
|
@ -257,8 +279,7 @@ async def open_jsonrpc_session(
|
|||
'result': _,
|
||||
'id': mid,
|
||||
} if res_entry := rpc_results.get(mid):
|
||||
|
||||
res_entry['result'] = response_type(**msg)
|
||||
res_entry['result'] = types_table['response'](**msg)
|
||||
res_entry['event'].set()
|
||||
|
||||
case {
|
||||
|
@ -269,24 +290,32 @@ async def open_jsonrpc_session(
|
|||
f'Unexpected ws msg: {json.dumps(msg, indent=4)}'
|
||||
)
|
||||
|
||||
case {
|
||||
'error': error,
|
||||
'id': mid
|
||||
} if res_entry := rpc_results.get(mid):
|
||||
|
||||
res_entry['result'] = types_table['response'](**msg)
|
||||
res_entry['event'].set()
|
||||
|
||||
case {
|
||||
'method': _,
|
||||
'params': _,
|
||||
}:
|
||||
log.debug(f'Recieved\n{msg}')
|
||||
if request_hook:
|
||||
await request_hook(request_type(**msg))
|
||||
log.info(f'Recieved\n{msg}')
|
||||
if hook_table['request']:
|
||||
await hook_table['request'](types_table['request'](**msg))
|
||||
|
||||
case {
|
||||
'error': error
|
||||
'error': error,
|
||||
}:
|
||||
log.warning(f'Recieved\n{error}')
|
||||
if error_hook:
|
||||
await error_hook(response_type(**msg))
|
||||
if hook_table['error']:
|
||||
await hook_table['error'](types_table['response'](**msg))
|
||||
|
||||
case _:
|
||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
||||
|
||||
n.start_soon(recv_task)
|
||||
yield json_rpc
|
||||
yield json_rpc, update_hooks, update_types
|
||||
n.cancel_scope.cancel()
|
||||
|
|
Loading…
Reference in New Issue