Compare commits

...

11 Commits

Author SHA1 Message Date
Guillermo Rodriguez 926ab1dfa6
Add stream ticker test 2023-03-10 17:16:50 -03:00
Guillermo Rodriguez 77fbc7eb86
Fruther generalize json_rpc hook mechanic to allow for multi hook, Add new maybe_open_ticker_feed to stream greeks, iv, open interest of an instrument 2023-03-10 13:25:40 -03:00
Guillermo Rodriguez fef8073113
Add new documented api get_book_summary_by_currency 2023-03-09 13:46:19 -03:00
Guillermo Rodriguez 1c833e7175
Remove cryptofeeds/asyncio from deribit backend
Add hook management to open_jsonrpc_session helper
2023-03-08 13:32:47 -03:00
Tyler Goodlet 4c838474be `flake8` linter cleanup and comment out order ctl draft code 2023-03-03 18:32:24 -05:00
Tyler Goodlet 1bd421a0f3 Block hist queries for non-60s 2023-03-03 18:17:02 -05:00
Tyler Goodlet 2ea850eed0 `deribit`: add new `Trade.block_trade_id` field.. 2023-03-03 18:17:02 -05:00
Tyler Goodlet e6fd2adb69 Include `deribit` backend in default brokers scan set 2023-03-03 18:17:02 -05:00
Tyler Goodlet 3bfe541259 `deribit`: fix history query routine sig to take `timeframe: float` 2023-03-03 18:17:02 -05:00
Tyler Goodlet 18d70447cd `deribit`: various lib API compat fixes
- port to new `msgspec` "default fields must come after non-default
  ones" shite they changed.
- adjust to  `open_jsonrpc_session()` kwarg remap: `dtype` ->
  `response_type=JSONRPCResult`.
2023-03-03 18:17:02 -05:00
Tyler Goodlet c85324f142 `deribit`: drop removed (now deprecated and removed) `.backfill_bars()` endpoint 2023-03-03 18:17:02 -05:00
6 changed files with 330 additions and 318 deletions

View File

@ -24,6 +24,7 @@ __brokers__ = [
'binance', 'binance',
'ib', 'ib',
'kraken', 'kraken',
'deribit',
# broken but used to work # broken but used to work
# 'questrade', # 'questrade',

View File

@ -30,7 +30,6 @@ from .feed import (
open_history_client, open_history_client,
open_symbol_search, open_symbol_search,
stream_quotes, stream_quotes,
backfill_bars
) )
# from .broker import ( # from .broker import (
# trades_dialogue, # trades_dialogue,
@ -53,13 +52,3 @@ __enable_modules__: list[str] = [
'feed', 'feed',
# 'broker', # 'broker',
] ]
# passed to ``tractor.ActorNursery.start_actor()``
_spawn_kwargs = {
'infect_asyncio': True,
}
# annotation to let backend agnostic code
# know if ``brokerd`` should be spawned with
# ``tractor``'s aio mode.
_infect_asyncio: bool = True

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) # Copyright (C) Guillermo Rodriguez (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -18,58 +18,42 @@
Deribit backend. Deribit backend.
''' '''
import json from __future__ import annotations
import time import time
import asyncio
from contextlib import asynccontextmanager as acm, AsyncExitStack from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from datetime import datetime from datetime import datetime
from typing import Any, Optional, Iterable, Callable from typing import (
Any,
Optional,
Callable,
)
import pendulum import pendulum
import asks import asks
import trio import trio
from trio_typing import Nursery, TaskStatus from trio_typing import TaskStatus
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
from tractor.trionics import (
broadcast_receiver,
maybe_open_context
)
from piker.data.types import Struct from piker.data.types import Struct
from piker.data._web_bs import ( from piker.data._web_bs import (
NoBsWs,
open_autorecon_ws,
open_jsonrpc_session open_jsonrpc_session
) )
from .._util import resproc
from piker import config from piker import config
from piker.log import get_logger from piker.log import get_logger
from piker._cacheables import open_cached_client
from tractor.trionics import (
broadcast_receiver,
BroadcastReceiver,
maybe_open_context
)
from tractor import to_asyncio
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT,
L1_BOOK, TRADES,
OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
log = get_logger(__name__) log = get_logger(__name__)
_spawn_kwargs = {
'infect_asyncio': True,
}
_url = 'https://www.deribit.com' _url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2' _ws_url = 'wss://www.deribit.com/ws/api/v2'
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
@ -89,19 +73,20 @@ _ohlc_dtype = [
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int id: int
result: Optional[dict] = None usIn: int
error: Optional[dict] = None usOut: int
usIn: int
usOut: int
usDiff: int usDiff: int
testnet: bool testnet: bool
jsonrpc: str = '2.0'
result: Optional[dict] = None
error: Optional[dict] = None
class JSONRPCChannel(Struct): class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str method: str
params: dict params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct): class KLinesResult(Struct):
@ -114,6 +99,7 @@ class KLinesResult(Struct):
ticks: list[int] ticks: list[int]
volume: list[float] volume: list[float]
class Trade(Struct): class Trade(Struct):
trade_seq: int trade_seq: int
trade_id: str trade_id: str
@ -125,9 +111,11 @@ class Trade(Struct):
instrument_name: str instrument_name: str
index_price: float index_price: float
direction: str direction: str
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
amount: float amount: float
combo_trade_id: Optional[int] = 0
combo_id: Optional[str] = ''
block_trade_id: str | None = ''
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: list[Trade] trades: list[Trade]
@ -139,65 +127,12 @@ def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def str_to_cb_sym(name: str) -> Symbol: def sym_fmt_piker_to_deribit(sym: str) -> str:
base, strike_price, expiry_date, option_type = name.split('-') return sym.upper()
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol: def sym_fmt_deribit_to_piker(sym: str):
base, expiry_date, strike_price, option_type = tuple( return sym.lower()
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
@ -206,32 +141,30 @@ def get_config() -> dict[str, Any]:
section = conf.get('deribit') section = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
if section is None: if section is None:
log.warning(f'No config section found for deribit in {path}') log.warning(f'No config section found for deribit in {path}')
return conf return conf
class Client: class Client:
def __init__(self, json_rpc: Callable) -> None: def __init__(
self,
json_rpc: Callable,
append_hooks: Callable,
update_types: Callable,
key_id: str | None = None,
key_secret: str | None = None
) -> None:
self._pairs: dict[str, Any] = None self._pairs: dict[str, Any] = None
self._key_id = key_id
config = get_config().get('deribit', {}) self._key_secret = key_secret
if ('key_id' in config) and ('key_secret' in config):
self._key_id = config['key_id']
self._key_secret = config['key_secret']
else:
self._key_id = None
self._key_secret = None
self.json_rpc = json_rpc self.json_rpc = json_rpc
self.append_hooks = append_hooks
self.update_types = update_types
@property @property
def currencies(self): def currencies(self):
@ -278,7 +211,7 @@ class Client:
"""Place an order """Place an order
""" """
params = { params = {
'instrument_name': symbol.upper(), 'instrument_name': sym_fmt_piker_to_deribit(symbol),
'amount': size, 'amount': size,
'type': 'limit', 'type': 'limit',
'price': price, 'price': price,
@ -319,7 +252,7 @@ class Client:
results = resp.result results = resp.result
instruments = { instruments = {
item['instrument_name'].lower(): item sym_fmt_deribit_to_piker(item['instrument_name']): item
for item in results for item in results
} }
@ -350,8 +283,10 @@ class Client:
limit=limit limit=limit
) )
# repack in dict form # repack in dict form
return {item[0]['instrument_name'].lower(): item[0] return {
for item in matches} sym_fmt_deribit_to_piker(item[0]['instrument_name']): item[0]
for item in matches
}
async def bars( async def bars(
self, self,
@ -360,6 +295,7 @@ class Client:
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
) -> dict: ) -> dict:
instrument = symbol instrument = symbol
@ -377,7 +313,7 @@ class Client:
resp = await self.json_rpc( resp = await self.json_rpc(
'public/get_tradingview_chart_data', 'public/get_tradingview_chart_data',
params={ params={
'instrument_name': instrument.upper(), 'instrument_name': sym_fmt_piker_to_deribit(instrument),
'start_timestamp': start_time, 'start_timestamp': start_time,
'end_timestamp': end_time, 'end_timestamp': end_time,
'resolution': '1' 'resolution': '1'
@ -385,14 +321,8 @@ class Client:
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars = [] new_bars = []
for i in range(len(result.close)): for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [ row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
@ -405,7 +335,7 @@ class Client:
new_bars.append((i,) + tuple(row)) new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else new_bars
return array return array
async def last_trades( async def last_trades(
@ -416,24 +346,55 @@ class Client:
resp = await self.json_rpc( resp = await self.json_rpc(
'public/get_last_trades_by_instrument', 'public/get_last_trades_by_instrument',
params={ params={
'instrument_name': instrument, 'instrument_name': sym_fmt_piker_to_deribit(instrument),
'count': count 'count': count
}) })
return LastTradesResult(**resp.result) return LastTradesResult(**resp.result)
async def get_book_summary(
self,
currency: str,
kind: str = 'option'
):
return await self.json_rpc(
'public/get_book_summary_by_currency',
params={
'currency': currency,
'kind': kind
})
class JSONRPCSubRequest(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
@acm @acm
async def get_client( async def get_client(
is_brokercheck: bool = False is_brokercheck: bool = False
) -> Client: ) -> Client:
config = get_config().get('deribit', {})
ws_url = config.get('ws_url', _ws_url)
key_id = config.get('key_id', None)
key_secret = config.get('key_secret', None)
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_jsonrpc_session( open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc ws_url,
response_type=JSONRPCResult
) as control_functions
): ):
client = Client(json_rpc) client = Client(
*control_functions,
key_id=key_id,
key_secret=key_secret
)
_refresh_token: Optional[str] = None _refresh_token: Optional[str] = None
_access_token: Optional[str] = None _access_token: Optional[str] = None
@ -446,7 +407,7 @@ async def get_client(
https://docs.deribit.com/?python#authentication-2 https://docs.deribit.com/?python#authentication-2
""" """
renew_time = 10 renew_time = 240
access_scope = 'trade:read_write' access_scope = 'trade:read_write'
_expiry_time = time.time() _expiry_time = time.time()
got_access = False got_access = False
@ -457,7 +418,7 @@ async def get_client(
if time.time() - _expiry_time < renew_time: if time.time() - _expiry_time < renew_time:
# if we are close to token expiry time # if we are close to token expiry time
if _refresh_token != None: if _refresh_token is not None:
# if we have a refresh token already dont need to send # if we have a refresh token already dont need to send
# secret # secret
params = { params = {
@ -467,7 +428,8 @@ async def get_client(
} }
else: else:
# we don't have refresh token, send secret to initialize # we don't have refresh token, send secret to
# initialize
params = { params = {
'grant_type': 'client_credentials', 'grant_type': 'client_credentials',
'client_id': client._key_id, 'client_id': client._key_id,
@ -475,7 +437,7 @@ async def get_client(
'scope': access_scope 'scope': access_scope
} }
resp = await json_rpc('public/auth', params) resp = await client.json_rpc('public/auth', params)
result = resp.result result = resp.result
_expiry_time = time.time() + result['expires_in'] _expiry_time = time.time() + result['expires_in']
@ -502,88 +464,76 @@ async def get_client(
n.cancel_scope.cancel() n.cancel_scope.cancel()
@acm
async def open_feed_handler():
fh = FeedHandler(config=get_config())
yield fh
await to_asyncio.run_task(fh.stop_async)
@acm
async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async with maybe_open_context(
acm_func=open_feed_handler,
key='feedhandler',
) as (cache_hit, fh):
yield fh
async def aio_price_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]
}))
fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=[piker_sym_to_cb_sym(instrument)],
callbacks={
TRADES: _trade,
L1_BOOK: _l1
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@acm @acm
async def open_price_feed( async def open_price_feed(
instrument: str instrument: str
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_price_feed_relay,
fh,
instrument
)
) as (first, chan):
yield chan
instrument_db = sym_fmt_piker_to_deribit(instrument)
trades_chan = f'trades.{instrument_db}.raw'
book_chan = f'book.{instrument_db}.none.1.100ms'
channels = [trades_chan, book_chan]
send_chann, recv_chann = trio.open_memory_channel(0)
async def sub_hook(msg):
chan = msg.params['channel']
data = msg.params['data']
if chan == trades_chan:
await send_chann.send((
'trade', {
'symbol': instrument,
'last': data['price'],
'brokerd_ts': time.time(),
'ticks': [{
'type': 'trade',
'price': data['price'],
'size': data['amount'],
'broker_ts': data['timestamp']
}]
}
))
return True
elif chan == book_chan:
bid, bsize = data['bids'][0]
ask, asize = data['asks'][0]
await send_chann.send((
'l1', {
'symbol': instrument,
'ticks': [
{'type': 'bid', 'price': bid, 'size': bsize},
{'type': 'bsize', 'price': bid, 'size': bsize},
{'type': 'ask', 'price': ask, 'size': asize},
{'type': 'asize', 'price': ask, 'size': asize}
]}
))
return True
return False
async with open_cached_client('deribit') as client:
client.append_hooks({
'request': [sub_hook]
})
client.update_types({
'request': JSONRPCSubRequest
})
resp = await client.json_rpc(
'private/subscribe', {'channels': channels})
assert not resp.error
log.info(f'Subscribed to {channels}')
yield recv_chann
resp = await client.json_rpc('private/unsubscribe', {'channels': channels})
assert not resp.error
@acm @acm
async def maybe_open_price_feed( async def maybe_open_price_feed(
@ -604,67 +554,65 @@ async def maybe_open_price_feed(
yield feed yield feed
async def aio_order_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
async def _order_info(data: dict, receipt_timestamp):
breakpoint()
fh.add_feed(
DERIBIT,
channels=[FILLS, ORDER_INFO],
symbols=[instrument.upper()],
callbacks={
FILLS: _fill,
ORDER_INFO: _order_info,
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@acm @acm
async def open_order_feed( async def open_ticker_feed(
instrument: list[str] instrument: str
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( instrument_db = sym_fmt_piker_to_deribit(instrument)
partial(
aio_order_feed_relay, ticker_chan = f'incremental_ticker.{instrument_db}'
fh,
instrument channels = [ticker_chan]
)
) as (first, chan): send_chann, recv_chann = trio.open_memory_channel(0)
yield chan async def sub_hook(msg):
chann = msg.params['channel']
if chann == ticker_chan:
@acm data = msg.params['data']
async def maybe_open_order_feed( await send_chann.send((
'ticker', {
'symbol': instrument,
'data': data
}
))
return True
return False
async with open_cached_client('deribit') as client:
client.append_hooks({
'request': [sub_hook]
})
client.update_types({
'request': JSONRPCSubRequest
})
resp = await client.json_rpc(
'private/subscribe', {'channels': channels})
assert not resp.error
log.info(f'Subscribed to {channels}')
yield recv_chann
resp = await client.json_rpc('private/unsubscribe', {'channels': channels})
assert not resp.error
@acm
async def maybe_open_ticker_feed(
instrument: str instrument: str
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
async with maybe_open_context( async with maybe_open_context(
acm_func=open_order_feed, acm_func=open_ticker_feed,
kwargs={ kwargs={
'instrument': instrument, 'instrument': instrument
'fh': fh
}, },
key=f'{instrument}-order', key=f'{instrument}-ticker',
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)

View File

@ -20,43 +20,29 @@ Deribit backend.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import Any, Optional, Callable from typing import (
import time Callable,
)
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import pendulum import pendulum
from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker._cacheables import open_cached_client from piker._cacheables import open_cached_client
from piker.log import get_logger, get_console_log from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.brokers._util import ( from piker.brokers._util import (
BrokerError,
DataUnavailable, DataUnavailable,
) )
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Trade, Client,
get_config, Trade,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed maybe_open_price_feed
) )
_spawn_kwargs = {
'infect_asyncio': True,
}
log = get_logger(__name__) log = get_logger(__name__)
@ -69,20 +55,24 @@ async def open_history_client(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
async def get_ohlc( async def get_ohlc(
end_dt: Optional[datetime] = None, timeframe: float,
start_dt: Optional[datetime] = None, end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
datetime, # start datetime, # start
datetime, # end datetime, # end
]: ]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars( array = await client.bars(
instrument, instrument,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
raise DataUnavailable raise DataUnavailable
@ -110,10 +100,7 @@ async def stream_quotes(
sym = symbols[0] sym = symbols[0]
async with ( async with open_cached_client('deribit') as client:
open_cached_client('deribit') as client,
send_chan as send_chan
):
init_msgs = { init_msgs = {
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
@ -121,22 +108,19 @@ async def stream_quotes(
sym: { sym: {
'symbol_info': { 'symbol_info': {
'asset_type': 'option', 'asset_type': 'option',
'price_tick_size': 0.0005 'price_tick_size': 0.0005,
'lot_tick_size': 0.1
}, },
'shm_write_opts': {'sum_tick_vml': False}, 'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym, 'fqsn': sym,
}, },
} }
nsym = piker_sym_to_cb_sym(sym)
last_trades = (await client.last_trades(sym, count=1)).trades
async with maybe_open_price_feed(sym) as stream: async with maybe_open_price_feed(sym) as stream:
cache = await client.cache_symbols()
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
if len(last_trades) == 0: if len(last_trades) == 0:
last_trade = None last_trade = None
async for typ, quote in stream: async for typ, quote in stream:
@ -174,7 +158,7 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
cache = await client.cache_symbols() await client.cache_symbols()
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:

View File

@ -187,7 +187,6 @@ over a NoBsWs.
''' '''
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
id: int id: int
jsonrpc: str = '2.0' jsonrpc: str = '2.0'
@ -202,9 +201,41 @@ async def open_jsonrpc_session(
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
request_type: Optional[type] = None, request_type: Optional[type] = None,
request_hook: Optional[Callable] = None, request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None, error_hook: Optional[Callable] = None
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
# xor: this two params need to be passed together or not at all
if bool(request_type) ^ bool(request_hook):
raise ValueError(
'Need to path both a request_type and request_hook')
req_hooks = []
if request_hook:
req_hooks.append(request_hook)
err_hooks = []
if error_hook:
err_hooks.append(error_hook)
hook_table = {
'request': req_hooks,
'error': err_hooks
}
types_table = {
'response': response_type,
'request': request_type
}
def append_hooks(new_hooks: dict):
nonlocal hook_table
for htype, hooks in new_hooks.items():
hook_table[htype] += hooks
def update_types(new_types: dict):
nonlocal types_table
types_table.update(new_types)
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws(url) as ws open_autorecon_ws(url) as ws
@ -212,7 +243,7 @@ async def open_jsonrpc_session(
rpc_id: Iterable = count(start_id) rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict: async def json_rpc(method: str, params: dict = {}) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
@ -257,8 +288,7 @@ async def open_jsonrpc_session(
'result': _, 'result': _,
'id': mid, 'id': mid,
} if res_entry := rpc_results.get(mid): } if res_entry := rpc_results.get(mid):
res_entry['result'] = types_table['response'](**msg)
res_entry['result'] = response_type(**msg)
res_entry['event'].set() res_entry['event'].set()
case { case {
@ -269,24 +299,38 @@ async def open_jsonrpc_session(
f'Unexpected ws msg: {json.dumps(msg, indent=4)}' f'Unexpected ws msg: {json.dumps(msg, indent=4)}'
) )
case {
'error': error,
'id': mid
} if res_entry := rpc_results.get(mid):
res_entry['result'] = types_table['response'](**msg)
res_entry['event'].set()
case { case {
'method': _, 'method': _,
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.info(f'Recieved\n{msg}')
if request_hook: if len(hook_table['request']) > 0:
await request_hook(request_type(**msg)) for hook in hook_table['request']:
result = await hook(types_table['request'](**msg))
if result:
break
case { case {
'error': error 'error': error,
}: }:
log.warning(f'Recieved\n{error}') log.warning(f'Recieved\n{error}')
if error_hook: if len(hook_table['error']) > 0:
await error_hook(response_type(**msg)) for hook in hook_table['error']:
result = await hook(types_table['response'](**msg))
if result:
break
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
n.start_soon(recv_task) n.start_soon(recv_task)
yield json_rpc yield json_rpc, append_hooks, update_types
n.cancel_scope.cancel() n.cancel_scope.cancel()

View File

@ -0,0 +1,46 @@
import trio
import pytest
import tractor
from piker import config
from piker.brokers.deribit import api as deribit
from piker.brokers.deribit.api import _testnet_ws_url
from piker._cacheables import open_cached_client
TESTNET_KEY_ID: str | None = None
TESTNET_KEY_SECRET: str | None = None
@pytest.mark.skipif(
not TESTNET_KEY_ID or not TESTNET_KEY_SECRET,
reason='configure a deribit testnet key pair before running this test'
)
def test_deribit_get_ticker(open_test_pikerd):
async def _test_main():
async with open_test_pikerd() as _:
async with open_cached_client('deribit') as client:
symbols = await client.symbol_info()
syms = list(symbols.keys())
sym = syms[int(len(syms) / 2)]
async with deribit.maybe_open_ticker_feed(sym) as tick_stream:
async for typ, msg in tick_stream:
assert typ == 'ticker'
assert 'open_interest' in msg['data']
break
config.write({
'deribit': {
'ws_url': _testnet_ws_url,
'key_id': TESTNET_KEY_ID,
'key_secret': TESTNET_KEY_SECRET
}
})
trio.run(_test_main)