Compare commits

..

4 Commits

5 changed files with 285 additions and 289 deletions

View File

@ -52,13 +52,3 @@ __enable_modules__: list[str] = [
'feed',
# 'broker',
]
# passed to ``tractor.ActorNursery.start_actor()``
_spawn_kwargs = {
'infect_asyncio': True,
}
# annotation to let backend agnostic code
# know if ``brokerd`` should be spawned with
# ``tractor``'s aio mode.
_infect_asyncio: bool = True

View File

@ -20,7 +20,6 @@ Deribit backend.
'''
from __future__ import annotations
import time
import asyncio
from contextlib import asynccontextmanager as acm
from functools import partial
@ -31,16 +30,8 @@ from typing import (
Callable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT,
L1_BOOK,
TRADES,
OPTION,
CALL,
PUT,
)
import pendulum
import asks
import trio
from trio_typing import TaskStatus
from fuzzywuzzy import process as fuzzy
@ -49,8 +40,6 @@ from tractor.trionics import (
broadcast_receiver,
maybe_open_context
)
from tractor import to_asyncio
from cryptofeed.symbols import Symbol
from piker.data.types import Struct
from piker.data._web_bs import (
@ -59,16 +48,12 @@ from piker.data._web_bs import (
from piker import config
from piker.log import get_logger
from piker._cacheables import open_cached_client
log = get_logger(__name__)
_spawn_kwargs = {
'infect_asyncio': True,
}
_url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2'
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
@ -142,70 +127,12 @@ def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
def sym_fmt_piker_to_deribit(sym: str) -> str:
return sym.upper()
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = [
'JAN', 'FEB', 'MAR',
'APR', 'MAY', 'JUN',
'JUL', 'AUG', 'SEP',
'OCT', 'NOV', 'DEC',
]
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
def sym_fmt_deribit_to_piker(sym: str):
return sym.lower()
def get_config() -> dict[str, Any]:
@ -214,11 +141,6 @@ def get_config() -> dict[str, Any]:
section = conf.get('deribit')
# TODO: document why we send this, basically because logging params
# for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
if section is None:
log.warning(f'No config section found for deribit in {path}')
@ -227,20 +149,22 @@ def get_config() -> dict[str, Any]:
class Client:
def __init__(self, json_rpc: Callable) -> None:
def __init__(
self,
json_rpc: Callable,
append_hooks: Callable,
update_types: Callable,
key_id: str | None = None,
key_secret: str | None = None
) -> None:
self._pairs: dict[str, Any] = None
config = get_config().get('deribit', {})
if ('key_id' in config) and ('key_secret' in config):
self._key_id = config['key_id']
self._key_secret = config['key_secret']
else:
self._key_id = None
self._key_secret = None
self._key_id = key_id
self._key_secret = key_secret
self.json_rpc = json_rpc
self.append_hooks = append_hooks
self.update_types = update_types
@property
def currencies(self):
@ -287,7 +211,7 @@ class Client:
"""Place an order
"""
params = {
'instrument_name': symbol.upper(),
'instrument_name': sym_fmt_piker_to_deribit(symbol),
'amount': size,
'type': 'limit',
'price': price,
@ -328,7 +252,7 @@ class Client:
results = resp.result
instruments = {
item['instrument_name'].lower(): item
sym_fmt_deribit_to_piker(item['instrument_name']): item
for item in results
}
@ -359,8 +283,10 @@ class Client:
limit=limit
)
# repack in dict form
return {item[0]['instrument_name'].lower(): item[0]
for item in matches}
return {
sym_fmt_deribit_to_piker(item[0]['instrument_name']): item[0]
for item in matches
}
async def bars(
self,
@ -387,7 +313,7 @@ class Client:
resp = await self.json_rpc(
'public/get_tradingview_chart_data',
params={
'instrument_name': instrument.upper(),
'instrument_name': sym_fmt_piker_to_deribit(instrument),
'start_timestamp': start_time,
'end_timestamp': end_time,
'resolution': '1'
@ -420,26 +346,55 @@ class Client:
resp = await self.json_rpc(
'public/get_last_trades_by_instrument',
params={
'instrument_name': instrument,
'instrument_name': sym_fmt_piker_to_deribit(instrument),
'count': count
})
return LastTradesResult(**resp.result)
async def get_book_summary(
self,
currency: str,
kind: str = 'option'
):
return await self.json_rpc(
'public/get_book_summary_by_currency',
params={
'currency': currency,
'kind': kind
})
class JSONRPCSubRequest(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
@acm
async def get_client(
is_brokercheck: bool = False
) -> Client:
config = get_config().get('deribit', {})
ws_url = config.get('ws_url', _ws_url)
key_id = config.get('key_id', None)
key_secret = config.get('key_secret', None)
async with (
trio.open_nursery() as n,
open_jsonrpc_session(
_testnet_ws_url,
ws_url,
response_type=JSONRPCResult
) as json_rpc
) as control_functions
):
client = Client(json_rpc)
client = Client(
*control_functions,
key_id=key_id,
key_secret=key_secret
)
_refresh_token: Optional[str] = None
_access_token: Optional[str] = None
@ -452,7 +407,7 @@ async def get_client(
https://docs.deribit.com/?python#authentication-2
"""
renew_time = 10
renew_time = 240
access_scope = 'trade:read_write'
_expiry_time = time.time()
got_access = False
@ -482,7 +437,7 @@ async def get_client(
'scope': access_scope
}
resp = await json_rpc('public/auth', params)
resp = await client.json_rpc('public/auth', params)
result = resp.result
_expiry_time = time.time() + result['expires_in']
@ -509,98 +464,76 @@ async def get_client(
n.cancel_scope.cancel()
@acm
async def open_feed_handler():
fh = FeedHandler(config=get_config())
yield fh
await to_asyncio.run_task(fh.stop_async)
@acm
async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async with maybe_open_context(
acm_func=open_feed_handler,
key='feedhandler',
) as (cache_hit, fh):
yield fh
async def aio_price_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(
('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price),
'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)}
]
})
)
fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=[piker_sym_to_cb_sym(instrument)],
callbacks={
TRADES: _trade,
L1_BOOK: _l1
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@acm
async def open_price_feed(
instrument: str
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_price_feed_relay,
fh,
instrument
)
) as (first, chan):
yield chan
instrument_db = sym_fmt_piker_to_deribit(instrument)
trades_chan = f'trades.{instrument_db}.raw'
book_chan = f'book.{instrument_db}.none.1.100ms'
channels = [trades_chan, book_chan]
send_chann, recv_chann = trio.open_memory_channel(0)
async def sub_hook(msg):
chan = msg.params['channel']
data = msg.params['data']
if chan == trades_chan:
await send_chann.send((
'trade', {
'symbol': instrument,
'last': data['price'],
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': data['price'],
'size': data['amount'],
'broker_ts': data['timestamp']
}]
}
))
return True
elif chan == book_chan:
bid, bsize = data['bids'][0]
ask, asize = data['asks'][0]
await send_chann.send((
'l1', {
'symbol': instrument,
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize}
]}
))
return True
return False
async with open_cached_client('deribit') as client:
client.append_hooks({
'request': [sub_hook]
})
client.update_types({
'request': JSONRPCSubRequest
})
resp = await client.json_rpc(
'private/subscribe', {'channels': channels})
assert not resp.error
log.info(f'Subscribed to {channels}')
yield recv_chann
resp = await client.json_rpc('private/unsubscribe', {'channels': channels})
assert not resp.error
@acm
async def maybe_open_price_feed(
@ -621,71 +554,67 @@ async def maybe_open_price_feed(
yield feed
# TODO: order broker support: this is all draft code from @guilledk B)
@acm
async def open_ticker_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# async def aio_order_feed_relay(
# fh: FeedHandler,
# instrument: Symbol,
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
instrument_db = sym_fmt_piker_to_deribit(instrument)
# ) -> None:
# async def _fill(data: dict, receipt_timestamp):
# breakpoint()
ticker_chan = f'incremental_ticker.{instrument_db}'
# async def _order_info(data: dict, receipt_timestamp):
# breakpoint()
channels = [ticker_chan]
# fh.add_feed(
# DERIBIT,
# channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()],
# callbacks={
# FILLS: _fill,
# ORDER_INFO: _order_info,
# })
send_chann, recv_chann = trio.open_memory_channel(0)
async def sub_hook(msg):
chann = msg.params['channel']
if chann == ticker_chan:
data = msg.params['data']
await send_chann.send((
'ticker', {
'symbol': instrument,
'data': data
}
))
return True
# if not fh.running:
# fh.run(
# start_loop=False,
# install_signal_handlers=False)
return False
# # sync with trio
# to_trio.send_nowait(None)
async with open_cached_client('deribit') as client:
# await asyncio.sleep(float('inf'))
client.append_hooks({
'request': [sub_hook]
})
client.update_types({
'request': JSONRPCSubRequest
})
resp = await client.json_rpc(
'private/subscribe', {'channels': channels})
# @acm
# async def open_order_feed(
# instrument: list[str]
# ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from(
# partial(
# aio_order_feed_relay,
# fh,
# instrument
# )
# ) as (first, chan):
# yield chan
assert not resp.error
log.info(f'Subscribed to {channels}')
# @acm
# async def maybe_open_order_feed(
# instrument: str
# ) -> trio.abc.ReceiveStream:
yield recv_chann
# # TODO: add a predicate to maybe_open_context
# async with maybe_open_context(
# acm_func=open_order_feed,
# kwargs={
# 'instrument': instrument,
# 'fh': fh
# },
# key=f'{instrument}-order',
# ) as (cache_hit, feed):
# if cache_hit:
# yield broadcast_receiver(feed, 10)
# else:
# yield feed
resp = await client.json_rpc('private/unsubscribe', {'channels': channels})
assert not resp.error
@acm
async def maybe_open_ticker_feed(
instrument: str
) -> trio.abc.ReceiveStream:
async with maybe_open_context(
acm_func=open_ticker_feed,
kwargs={
'instrument': instrument
},
key=f'{instrument}-ticker',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed

View File

@ -40,16 +40,9 @@ from piker.brokers._util import (
from .api import (
Client,
Trade,
piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
maybe_open_price_feed
)
_spawn_kwargs = {
'infect_asyncio': True,
}
log = get_logger(__name__)
@ -107,10 +100,7 @@ async def stream_quotes(
sym = symbols[0]
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
async with open_cached_client('deribit') as client:
init_msgs = {
# pass back token, and bool, signalling if we're the writer
@ -118,22 +108,19 @@ async def stream_quotes(
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
'price_tick_size': 0.0005,
'lot_tick_size': 0.1
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
nsym = piker_sym_to_cb_sym(sym)
last_trades = (await client.last_trades(sym, count=1)).trades
async with maybe_open_price_feed(sym) as stream:
await client.cache_symbols()
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:

View File

@ -187,7 +187,6 @@ over a NoBsWs.
'''
class JSONRPCResult(Struct):
id: int
jsonrpc: str = '2.0'
@ -202,9 +201,41 @@ async def open_jsonrpc_session(
response_type: type = JSONRPCResult,
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None
) -> Callable[[str, dict], dict]:
# xor: this two params need to be passed together or not at all
if bool(request_type) ^ bool(request_hook):
raise ValueError(
'Need to path both a request_type and request_hook')
req_hooks = []
if request_hook:
req_hooks.append(request_hook)
err_hooks = []
if error_hook:
err_hooks.append(error_hook)
hook_table = {
'request': req_hooks,
'error': err_hooks
}
types_table = {
'response': response_type,
'request': request_type
}
def append_hooks(new_hooks: dict):
nonlocal hook_table
for htype, hooks in new_hooks.items():
hook_table[htype] += hooks
def update_types(new_types: dict):
nonlocal types_table
types_table.update(new_types)
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
@ -212,7 +243,7 @@ async def open_jsonrpc_session(
rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
async def json_rpc(method: str, params: dict = {}) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
@ -257,8 +288,7 @@ async def open_jsonrpc_session(
'result': _,
'id': mid,
} if res_entry := rpc_results.get(mid):
res_entry['result'] = response_type(**msg)
res_entry['result'] = types_table['response'](**msg)
res_entry['event'].set()
case {
@ -269,24 +299,38 @@ async def open_jsonrpc_session(
f'Unexpected ws msg: {json.dumps(msg, indent=4)}'
)
case {
'error': error,
'id': mid
} if res_entry := rpc_results.get(mid):
res_entry['result'] = types_table['response'](**msg)
res_entry['event'].set()
case {
'method': _,
'params': _,
}:
log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
log.info(f'Recieved\n{msg}')
if len(hook_table['request']) > 0:
for hook in hook_table['request']:
result = await hook(types_table['request'](**msg))
if result:
break
case {
'error': error
'error': error,
}:
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
if len(hook_table['error']) > 0:
for hook in hook_table['error']:
result = await hook(types_table['response'](**msg))
if result:
break
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
n.start_soon(recv_task)
yield json_rpc
yield json_rpc, append_hooks, update_types
n.cancel_scope.cancel()

View File

@ -0,0 +1,46 @@
import trio
import pytest
import tractor
from piker import config
from piker.brokers.deribit import api as deribit
from piker.brokers.deribit.api import _testnet_ws_url
from piker._cacheables import open_cached_client
TESTNET_KEY_ID: str | None = None
TESTNET_KEY_SECRET: str | None = None
@pytest.mark.skipif(
not TESTNET_KEY_ID or not TESTNET_KEY_SECRET,
reason='configure a deribit testnet key pair before running this test'
)
def test_deribit_get_ticker(open_test_pikerd):
async def _test_main():
async with open_test_pikerd() as _:
async with open_cached_client('deribit') as client:
symbols = await client.symbol_info()
syms = list(symbols.keys())
sym = syms[int(len(syms) / 2)]
async with deribit.maybe_open_ticker_feed(sym) as tick_stream:
async for typ, msg in tick_stream:
assert typ == 'ticker'
assert 'open_interest' in msg['data']
break
config.write({
'deribit': {
'ws_url': _testnet_ws_url,
'key_id': TESTNET_KEY_ID,
'key_secret': TESTNET_KEY_SECRET
}
})
trio.run(_test_main)