Compare commits
No commits in common. "926ab1dfa623b85a097a9b0ebade82e00fc803f3" and "4c838474bebd2949bf73fa71e4109bbb3953c88c" have entirely different histories.
926ab1dfa6
...
4c838474be
|
@ -52,3 +52,13 @@ __enable_modules__: list[str] = [
|
||||||
'feed',
|
'feed',
|
||||||
# 'broker',
|
# 'broker',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# passed to ``tractor.ActorNursery.start_actor()``
|
||||||
|
_spawn_kwargs = {
|
||||||
|
'infect_asyncio': True,
|
||||||
|
}
|
||||||
|
|
||||||
|
# annotation to let backend agnostic code
|
||||||
|
# know if ``brokerd`` should be spawned with
|
||||||
|
# ``tractor``'s aio mode.
|
||||||
|
_infect_asyncio: bool = True
|
||||||
|
|
|
@ -20,6 +20,7 @@ Deribit backend.
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import time
|
import time
|
||||||
|
import asyncio
|
||||||
|
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
@ -30,8 +31,16 @@ from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from cryptofeed import FeedHandler
|
||||||
|
from cryptofeed.defines import (
|
||||||
|
DERIBIT,
|
||||||
|
L1_BOOK,
|
||||||
|
TRADES,
|
||||||
|
OPTION,
|
||||||
|
CALL,
|
||||||
|
PUT,
|
||||||
|
)
|
||||||
import pendulum
|
import pendulum
|
||||||
import asks
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
from fuzzywuzzy import process as fuzzy
|
from fuzzywuzzy import process as fuzzy
|
||||||
|
@ -40,6 +49,8 @@ from tractor.trionics import (
|
||||||
broadcast_receiver,
|
broadcast_receiver,
|
||||||
maybe_open_context
|
maybe_open_context
|
||||||
)
|
)
|
||||||
|
from tractor import to_asyncio
|
||||||
|
from cryptofeed.symbols import Symbol
|
||||||
|
|
||||||
from piker.data.types import Struct
|
from piker.data.types import Struct
|
||||||
from piker.data._web_bs import (
|
from piker.data._web_bs import (
|
||||||
|
@ -48,12 +59,16 @@ from piker.data._web_bs import (
|
||||||
|
|
||||||
from piker import config
|
from piker import config
|
||||||
from piker.log import get_logger
|
from piker.log import get_logger
|
||||||
from piker._cacheables import open_cached_client
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
_spawn_kwargs = {
|
||||||
|
'infect_asyncio': True,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
_url = 'https://www.deribit.com'
|
_url = 'https://www.deribit.com'
|
||||||
_ws_url = 'wss://www.deribit.com/ws/api/v2'
|
_ws_url = 'wss://www.deribit.com/ws/api/v2'
|
||||||
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
|
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
|
||||||
|
@ -127,12 +142,70 @@ def deribit_timestamp(when):
|
||||||
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
|
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
|
||||||
|
|
||||||
|
|
||||||
def sym_fmt_piker_to_deribit(sym: str) -> str:
|
def str_to_cb_sym(name: str) -> Symbol:
|
||||||
return sym.upper()
|
base, strike_price, expiry_date, option_type = name.split('-')
|
||||||
|
|
||||||
|
quote = base
|
||||||
|
|
||||||
|
if option_type == 'put':
|
||||||
|
option_type = PUT
|
||||||
|
elif option_type == 'call':
|
||||||
|
option_type = CALL
|
||||||
|
else:
|
||||||
|
raise Exception("Couldn\'t parse option type")
|
||||||
|
|
||||||
|
return Symbol(
|
||||||
|
base, quote,
|
||||||
|
type=OPTION,
|
||||||
|
strike_price=strike_price,
|
||||||
|
option_type=option_type,
|
||||||
|
expiry_date=expiry_date,
|
||||||
|
expiry_normalize=False)
|
||||||
|
|
||||||
|
|
||||||
def sym_fmt_deribit_to_piker(sym: str):
|
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||||
return sym.lower()
|
base, expiry_date, strike_price, option_type = tuple(
|
||||||
|
name.upper().split('-'))
|
||||||
|
|
||||||
|
quote = base
|
||||||
|
|
||||||
|
if option_type == 'P':
|
||||||
|
option_type = PUT
|
||||||
|
elif option_type == 'C':
|
||||||
|
option_type = CALL
|
||||||
|
else:
|
||||||
|
raise Exception("Couldn\'t parse option type")
|
||||||
|
|
||||||
|
return Symbol(
|
||||||
|
base, quote,
|
||||||
|
type=OPTION,
|
||||||
|
strike_price=strike_price,
|
||||||
|
option_type=option_type,
|
||||||
|
expiry_date=expiry_date.upper())
|
||||||
|
|
||||||
|
|
||||||
|
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||||
|
# cryptofeed normalized
|
||||||
|
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
|
||||||
|
|
||||||
|
# deribit specific
|
||||||
|
months = [
|
||||||
|
'JAN', 'FEB', 'MAR',
|
||||||
|
'APR', 'MAY', 'JUN',
|
||||||
|
'JUL', 'AUG', 'SEP',
|
||||||
|
'OCT', 'NOV', 'DEC',
|
||||||
|
]
|
||||||
|
|
||||||
|
exp = sym.expiry_date
|
||||||
|
|
||||||
|
# YYMDD
|
||||||
|
# 01234
|
||||||
|
year, month, day = (
|
||||||
|
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
|
||||||
|
|
||||||
|
otype = 'C' if sym.option_type == CALL else 'P'
|
||||||
|
|
||||||
|
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
|
||||||
|
|
||||||
|
|
||||||
def get_config() -> dict[str, Any]:
|
def get_config() -> dict[str, Any]:
|
||||||
|
@ -141,6 +214,11 @@ def get_config() -> dict[str, Any]:
|
||||||
|
|
||||||
section = conf.get('deribit')
|
section = conf.get('deribit')
|
||||||
|
|
||||||
|
# TODO: document why we send this, basically because logging params
|
||||||
|
# for cryptofeed
|
||||||
|
conf['log'] = {}
|
||||||
|
conf['log']['disabled'] = True
|
||||||
|
|
||||||
if section is None:
|
if section is None:
|
||||||
log.warning(f'No config section found for deribit in {path}')
|
log.warning(f'No config section found for deribit in {path}')
|
||||||
|
|
||||||
|
@ -149,22 +227,20 @@ def get_config() -> dict[str, Any]:
|
||||||
|
|
||||||
class Client:
|
class Client:
|
||||||
|
|
||||||
def __init__(
|
def __init__(self, json_rpc: Callable) -> None:
|
||||||
self,
|
|
||||||
json_rpc: Callable,
|
|
||||||
append_hooks: Callable,
|
|
||||||
update_types: Callable,
|
|
||||||
key_id: str | None = None,
|
|
||||||
key_secret: str | None = None
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
self._pairs: dict[str, Any] = None
|
self._pairs: dict[str, Any] = None
|
||||||
self._key_id = key_id
|
|
||||||
self._key_secret = key_secret
|
config = get_config().get('deribit', {})
|
||||||
|
|
||||||
|
if ('key_id' in config) and ('key_secret' in config):
|
||||||
|
self._key_id = config['key_id']
|
||||||
|
self._key_secret = config['key_secret']
|
||||||
|
|
||||||
|
else:
|
||||||
|
self._key_id = None
|
||||||
|
self._key_secret = None
|
||||||
|
|
||||||
self.json_rpc = json_rpc
|
self.json_rpc = json_rpc
|
||||||
self.append_hooks = append_hooks
|
|
||||||
self.update_types = update_types
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def currencies(self):
|
def currencies(self):
|
||||||
|
@ -211,7 +287,7 @@ class Client:
|
||||||
"""Place an order
|
"""Place an order
|
||||||
"""
|
"""
|
||||||
params = {
|
params = {
|
||||||
'instrument_name': sym_fmt_piker_to_deribit(symbol),
|
'instrument_name': symbol.upper(),
|
||||||
'amount': size,
|
'amount': size,
|
||||||
'type': 'limit',
|
'type': 'limit',
|
||||||
'price': price,
|
'price': price,
|
||||||
|
@ -252,7 +328,7 @@ class Client:
|
||||||
results = resp.result
|
results = resp.result
|
||||||
|
|
||||||
instruments = {
|
instruments = {
|
||||||
sym_fmt_deribit_to_piker(item['instrument_name']): item
|
item['instrument_name'].lower(): item
|
||||||
for item in results
|
for item in results
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -283,10 +359,8 @@ class Client:
|
||||||
limit=limit
|
limit=limit
|
||||||
)
|
)
|
||||||
# repack in dict form
|
# repack in dict form
|
||||||
return {
|
return {item[0]['instrument_name'].lower(): item[0]
|
||||||
sym_fmt_deribit_to_piker(item[0]['instrument_name']): item[0]
|
for item in matches}
|
||||||
for item in matches
|
|
||||||
}
|
|
||||||
|
|
||||||
async def bars(
|
async def bars(
|
||||||
self,
|
self,
|
||||||
|
@ -313,7 +387,7 @@ class Client:
|
||||||
resp = await self.json_rpc(
|
resp = await self.json_rpc(
|
||||||
'public/get_tradingview_chart_data',
|
'public/get_tradingview_chart_data',
|
||||||
params={
|
params={
|
||||||
'instrument_name': sym_fmt_piker_to_deribit(instrument),
|
'instrument_name': instrument.upper(),
|
||||||
'start_timestamp': start_time,
|
'start_timestamp': start_time,
|
||||||
'end_timestamp': end_time,
|
'end_timestamp': end_time,
|
||||||
'resolution': '1'
|
'resolution': '1'
|
||||||
|
@ -346,55 +420,26 @@ class Client:
|
||||||
resp = await self.json_rpc(
|
resp = await self.json_rpc(
|
||||||
'public/get_last_trades_by_instrument',
|
'public/get_last_trades_by_instrument',
|
||||||
params={
|
params={
|
||||||
'instrument_name': sym_fmt_piker_to_deribit(instrument),
|
'instrument_name': instrument,
|
||||||
'count': count
|
'count': count
|
||||||
})
|
})
|
||||||
|
|
||||||
return LastTradesResult(**resp.result)
|
return LastTradesResult(**resp.result)
|
||||||
|
|
||||||
async def get_book_summary(
|
|
||||||
self,
|
|
||||||
currency: str,
|
|
||||||
kind: str = 'option'
|
|
||||||
):
|
|
||||||
return await self.json_rpc(
|
|
||||||
'public/get_book_summary_by_currency',
|
|
||||||
params={
|
|
||||||
'currency': currency,
|
|
||||||
'kind': kind
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class JSONRPCSubRequest(Struct):
|
|
||||||
method: str
|
|
||||||
params: dict
|
|
||||||
jsonrpc: str = '2.0'
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def get_client(
|
async def get_client(
|
||||||
is_brokercheck: bool = False
|
is_brokercheck: bool = False
|
||||||
) -> Client:
|
) -> Client:
|
||||||
|
|
||||||
config = get_config().get('deribit', {})
|
|
||||||
|
|
||||||
ws_url = config.get('ws_url', _ws_url)
|
|
||||||
key_id = config.get('key_id', None)
|
|
||||||
key_secret = config.get('key_secret', None)
|
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
open_jsonrpc_session(
|
open_jsonrpc_session(
|
||||||
ws_url,
|
_testnet_ws_url,
|
||||||
response_type=JSONRPCResult
|
response_type=JSONRPCResult
|
||||||
) as control_functions
|
) as json_rpc
|
||||||
):
|
):
|
||||||
client = Client(
|
client = Client(json_rpc)
|
||||||
*control_functions,
|
|
||||||
key_id=key_id,
|
|
||||||
key_secret=key_secret
|
|
||||||
)
|
|
||||||
|
|
||||||
_refresh_token: Optional[str] = None
|
_refresh_token: Optional[str] = None
|
||||||
_access_token: Optional[str] = None
|
_access_token: Optional[str] = None
|
||||||
|
@ -407,7 +452,7 @@ async def get_client(
|
||||||
|
|
||||||
https://docs.deribit.com/?python#authentication-2
|
https://docs.deribit.com/?python#authentication-2
|
||||||
"""
|
"""
|
||||||
renew_time = 240
|
renew_time = 10
|
||||||
access_scope = 'trade:read_write'
|
access_scope = 'trade:read_write'
|
||||||
_expiry_time = time.time()
|
_expiry_time = time.time()
|
||||||
got_access = False
|
got_access = False
|
||||||
|
@ -437,7 +482,7 @@ async def get_client(
|
||||||
'scope': access_scope
|
'scope': access_scope
|
||||||
}
|
}
|
||||||
|
|
||||||
resp = await client.json_rpc('public/auth', params)
|
resp = await json_rpc('public/auth', params)
|
||||||
result = resp.result
|
result = resp.result
|
||||||
|
|
||||||
_expiry_time = time.time() + result['expires_in']
|
_expiry_time = time.time() + result['expires_in']
|
||||||
|
@ -464,76 +509,98 @@ async def get_client(
|
||||||
n.cancel_scope.cancel()
|
n.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def open_feed_handler():
|
||||||
|
fh = FeedHandler(config=get_config())
|
||||||
|
yield fh
|
||||||
|
await to_asyncio.run_task(fh.stop_async)
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
|
||||||
|
async with maybe_open_context(
|
||||||
|
acm_func=open_feed_handler,
|
||||||
|
key='feedhandler',
|
||||||
|
) as (cache_hit, fh):
|
||||||
|
yield fh
|
||||||
|
|
||||||
|
|
||||||
|
async def aio_price_feed_relay(
|
||||||
|
fh: FeedHandler,
|
||||||
|
instrument: Symbol,
|
||||||
|
from_trio: asyncio.Queue,
|
||||||
|
to_trio: trio.abc.SendChannel,
|
||||||
|
) -> None:
|
||||||
|
async def _trade(data: dict, receipt_timestamp):
|
||||||
|
to_trio.send_nowait(('trade', {
|
||||||
|
'symbol': cb_sym_to_deribit_inst(
|
||||||
|
str_to_cb_sym(data.symbol)).lower(),
|
||||||
|
'last': data,
|
||||||
|
'broker_ts': time.time(),
|
||||||
|
'data': data.to_dict(),
|
||||||
|
'receipt': receipt_timestamp
|
||||||
|
}))
|
||||||
|
|
||||||
|
async def _l1(data: dict, receipt_timestamp):
|
||||||
|
to_trio.send_nowait(
|
||||||
|
('l1', {
|
||||||
|
'symbol': cb_sym_to_deribit_inst(
|
||||||
|
str_to_cb_sym(data.symbol)).lower(),
|
||||||
|
'ticks': [
|
||||||
|
|
||||||
|
{'type': 'bid',
|
||||||
|
'price': float(data.bid_price),
|
||||||
|
'size': float(data.bid_size)},
|
||||||
|
|
||||||
|
{'type': 'bsize',
|
||||||
|
'price': float(data.bid_price),
|
||||||
|
'size': float(data.bid_size)},
|
||||||
|
|
||||||
|
{'type': 'ask',
|
||||||
|
'price': float(data.ask_price),
|
||||||
|
'size': float(data.ask_size)},
|
||||||
|
|
||||||
|
{'type': 'asize',
|
||||||
|
'price': float(data.ask_price),
|
||||||
|
'size': float(data.ask_size)}
|
||||||
|
]
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
fh.add_feed(
|
||||||
|
DERIBIT,
|
||||||
|
channels=[TRADES, L1_BOOK],
|
||||||
|
symbols=[piker_sym_to_cb_sym(instrument)],
|
||||||
|
callbacks={
|
||||||
|
TRADES: _trade,
|
||||||
|
L1_BOOK: _l1
|
||||||
|
})
|
||||||
|
|
||||||
|
if not fh.running:
|
||||||
|
fh.run(
|
||||||
|
start_loop=False,
|
||||||
|
install_signal_handlers=False)
|
||||||
|
|
||||||
|
# sync with trio
|
||||||
|
to_trio.send_nowait(None)
|
||||||
|
|
||||||
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_price_feed(
|
async def open_price_feed(
|
||||||
instrument: str
|
instrument: str
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> trio.abc.ReceiveStream:
|
||||||
|
async with maybe_open_feed_handler() as fh:
|
||||||
|
async with to_asyncio.open_channel_from(
|
||||||
|
partial(
|
||||||
|
aio_price_feed_relay,
|
||||||
|
fh,
|
||||||
|
instrument
|
||||||
|
)
|
||||||
|
) as (first, chan):
|
||||||
|
yield chan
|
||||||
|
|
||||||
instrument_db = sym_fmt_piker_to_deribit(instrument)
|
|
||||||
|
|
||||||
trades_chan = f'trades.{instrument_db}.raw'
|
|
||||||
book_chan = f'book.{instrument_db}.none.1.100ms'
|
|
||||||
|
|
||||||
channels = [trades_chan, book_chan]
|
|
||||||
|
|
||||||
send_chann, recv_chann = trio.open_memory_channel(0)
|
|
||||||
async def sub_hook(msg):
|
|
||||||
chan = msg.params['channel']
|
|
||||||
data = msg.params['data']
|
|
||||||
if chan == trades_chan:
|
|
||||||
await send_chann.send((
|
|
||||||
'trade', {
|
|
||||||
'symbol': instrument,
|
|
||||||
'last': data['price'],
|
|
||||||
'brokerd_ts': time.time(),
|
|
||||||
'ticks': [{
|
|
||||||
'type': 'trade',
|
|
||||||
'price': data['price'],
|
|
||||||
'size': data['amount'],
|
|
||||||
'broker_ts': data['timestamp']
|
|
||||||
}]
|
|
||||||
}
|
|
||||||
))
|
|
||||||
return True
|
|
||||||
|
|
||||||
elif chan == book_chan:
|
|
||||||
bid, bsize = data['bids'][0]
|
|
||||||
ask, asize = data['asks'][0]
|
|
||||||
await send_chann.send((
|
|
||||||
'l1', {
|
|
||||||
'symbol': instrument,
|
|
||||||
'ticks': [
|
|
||||||
{'type': 'bid', 'price': bid, 'size': bsize},
|
|
||||||
{'type': 'bsize', 'price': bid, 'size': bsize},
|
|
||||||
{'type': 'ask', 'price': ask, 'size': asize},
|
|
||||||
{'type': 'asize', 'price': ask, 'size': asize}
|
|
||||||
]}
|
|
||||||
))
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
async with open_cached_client('deribit') as client:
|
|
||||||
|
|
||||||
client.append_hooks({
|
|
||||||
'request': [sub_hook]
|
|
||||||
})
|
|
||||||
client.update_types({
|
|
||||||
'request': JSONRPCSubRequest
|
|
||||||
})
|
|
||||||
|
|
||||||
resp = await client.json_rpc(
|
|
||||||
'private/subscribe', {'channels': channels})
|
|
||||||
|
|
||||||
assert not resp.error
|
|
||||||
|
|
||||||
log.info(f'Subscribed to {channels}')
|
|
||||||
|
|
||||||
yield recv_chann
|
|
||||||
|
|
||||||
resp = await client.json_rpc('private/unsubscribe', {'channels': channels})
|
|
||||||
|
|
||||||
assert not resp.error
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_price_feed(
|
async def maybe_open_price_feed(
|
||||||
|
@ -554,67 +621,71 @@ async def maybe_open_price_feed(
|
||||||
yield feed
|
yield feed
|
||||||
|
|
||||||
|
|
||||||
@acm
|
# TODO: order broker support: this is all draft code from @guilledk B)
|
||||||
async def open_ticker_feed(
|
|
||||||
instrument: str
|
|
||||||
) -> trio.abc.ReceiveStream:
|
|
||||||
|
|
||||||
instrument_db = sym_fmt_piker_to_deribit(instrument)
|
# async def aio_order_feed_relay(
|
||||||
|
# fh: FeedHandler,
|
||||||
|
# instrument: Symbol,
|
||||||
|
# from_trio: asyncio.Queue,
|
||||||
|
# to_trio: trio.abc.SendChannel,
|
||||||
|
|
||||||
ticker_chan = f'incremental_ticker.{instrument_db}'
|
# ) -> None:
|
||||||
|
# async def _fill(data: dict, receipt_timestamp):
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
channels = [ticker_chan]
|
# async def _order_info(data: dict, receipt_timestamp):
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
send_chann, recv_chann = trio.open_memory_channel(0)
|
# fh.add_feed(
|
||||||
async def sub_hook(msg):
|
# DERIBIT,
|
||||||
chann = msg.params['channel']
|
# channels=[FILLS, ORDER_INFO],
|
||||||
if chann == ticker_chan:
|
# symbols=[instrument.upper()],
|
||||||
data = msg.params['data']
|
# callbacks={
|
||||||
await send_chann.send((
|
# FILLS: _fill,
|
||||||
'ticker', {
|
# ORDER_INFO: _order_info,
|
||||||
'symbol': instrument,
|
# })
|
||||||
'data': data
|
|
||||||
}
|
|
||||||
))
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
# if not fh.running:
|
||||||
|
# fh.run(
|
||||||
|
# start_loop=False,
|
||||||
|
# install_signal_handlers=False)
|
||||||
|
|
||||||
async with open_cached_client('deribit') as client:
|
# # sync with trio
|
||||||
|
# to_trio.send_nowait(None)
|
||||||
|
|
||||||
client.append_hooks({
|
# await asyncio.sleep(float('inf'))
|
||||||
'request': [sub_hook]
|
|
||||||
})
|
|
||||||
client.update_types({
|
|
||||||
'request': JSONRPCSubRequest
|
|
||||||
})
|
|
||||||
|
|
||||||
resp = await client.json_rpc(
|
|
||||||
'private/subscribe', {'channels': channels})
|
|
||||||
|
|
||||||
assert not resp.error
|
# @acm
|
||||||
|
# async def open_order_feed(
|
||||||
|
# instrument: list[str]
|
||||||
|
# ) -> trio.abc.ReceiveStream:
|
||||||
|
# async with maybe_open_feed_handler() as fh:
|
||||||
|
# async with to_asyncio.open_channel_from(
|
||||||
|
# partial(
|
||||||
|
# aio_order_feed_relay,
|
||||||
|
# fh,
|
||||||
|
# instrument
|
||||||
|
# )
|
||||||
|
# ) as (first, chan):
|
||||||
|
# yield chan
|
||||||
|
|
||||||
log.info(f'Subscribed to {channels}')
|
|
||||||
|
|
||||||
yield recv_chann
|
# @acm
|
||||||
|
# async def maybe_open_order_feed(
|
||||||
|
# instrument: str
|
||||||
|
# ) -> trio.abc.ReceiveStream:
|
||||||
|
|
||||||
resp = await client.json_rpc('private/unsubscribe', {'channels': channels})
|
# # TODO: add a predicate to maybe_open_context
|
||||||
|
# async with maybe_open_context(
|
||||||
assert not resp.error
|
# acm_func=open_order_feed,
|
||||||
|
# kwargs={
|
||||||
@acm
|
# 'instrument': instrument,
|
||||||
async def maybe_open_ticker_feed(
|
# 'fh': fh
|
||||||
instrument: str
|
# },
|
||||||
) -> trio.abc.ReceiveStream:
|
# key=f'{instrument}-order',
|
||||||
|
# ) as (cache_hit, feed):
|
||||||
async with maybe_open_context(
|
# if cache_hit:
|
||||||
acm_func=open_ticker_feed,
|
# yield broadcast_receiver(feed, 10)
|
||||||
kwargs={
|
# else:
|
||||||
'instrument': instrument
|
# yield feed
|
||||||
},
|
|
||||||
key=f'{instrument}-ticker',
|
|
||||||
) as (cache_hit, feed):
|
|
||||||
if cache_hit:
|
|
||||||
yield broadcast_receiver(feed, 10)
|
|
||||||
else:
|
|
||||||
yield feed
|
|
||||||
|
|
|
@ -40,9 +40,16 @@ from piker.brokers._util import (
|
||||||
from .api import (
|
from .api import (
|
||||||
Client,
|
Client,
|
||||||
Trade,
|
Trade,
|
||||||
|
piker_sym_to_cb_sym,
|
||||||
|
cb_sym_to_deribit_inst,
|
||||||
maybe_open_price_feed
|
maybe_open_price_feed
|
||||||
)
|
)
|
||||||
|
|
||||||
|
_spawn_kwargs = {
|
||||||
|
'infect_asyncio': True,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -100,7 +107,10 @@ async def stream_quotes(
|
||||||
|
|
||||||
sym = symbols[0]
|
sym = symbols[0]
|
||||||
|
|
||||||
async with open_cached_client('deribit') as client:
|
async with (
|
||||||
|
open_cached_client('deribit') as client,
|
||||||
|
send_chan as send_chan
|
||||||
|
):
|
||||||
|
|
||||||
init_msgs = {
|
init_msgs = {
|
||||||
# pass back token, and bool, signalling if we're the writer
|
# pass back token, and bool, signalling if we're the writer
|
||||||
|
@ -108,19 +118,22 @@ async def stream_quotes(
|
||||||
sym: {
|
sym: {
|
||||||
'symbol_info': {
|
'symbol_info': {
|
||||||
'asset_type': 'option',
|
'asset_type': 'option',
|
||||||
'price_tick_size': 0.0005,
|
'price_tick_size': 0.0005
|
||||||
'lot_tick_size': 0.1
|
|
||||||
},
|
},
|
||||||
'shm_write_opts': {'sum_tick_vml': False},
|
'shm_write_opts': {'sum_tick_vml': False},
|
||||||
'fqsn': sym,
|
'fqsn': sym,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nsym = piker_sym_to_cb_sym(sym)
|
||||||
last_trades = (await client.last_trades(sym, count=1)).trades
|
|
||||||
|
|
||||||
async with maybe_open_price_feed(sym) as stream:
|
async with maybe_open_price_feed(sym) as stream:
|
||||||
|
|
||||||
|
await client.cache_symbols()
|
||||||
|
|
||||||
|
last_trades = (await client.last_trades(
|
||||||
|
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||||
|
|
||||||
if len(last_trades) == 0:
|
if len(last_trades) == 0:
|
||||||
last_trade = None
|
last_trade = None
|
||||||
async for typ, quote in stream:
|
async for typ, quote in stream:
|
||||||
|
|
|
@ -187,6 +187,7 @@ over a NoBsWs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
class JSONRPCResult(Struct):
|
class JSONRPCResult(Struct):
|
||||||
id: int
|
id: int
|
||||||
jsonrpc: str = '2.0'
|
jsonrpc: str = '2.0'
|
||||||
|
@ -201,41 +202,9 @@ async def open_jsonrpc_session(
|
||||||
response_type: type = JSONRPCResult,
|
response_type: type = JSONRPCResult,
|
||||||
request_type: Optional[type] = None,
|
request_type: Optional[type] = None,
|
||||||
request_hook: Optional[Callable] = None,
|
request_hook: Optional[Callable] = None,
|
||||||
error_hook: Optional[Callable] = None
|
error_hook: Optional[Callable] = None,
|
||||||
) -> Callable[[str, dict], dict]:
|
) -> Callable[[str, dict], dict]:
|
||||||
|
|
||||||
# xor: this two params need to be passed together or not at all
|
|
||||||
if bool(request_type) ^ bool(request_hook):
|
|
||||||
raise ValueError(
|
|
||||||
'Need to path both a request_type and request_hook')
|
|
||||||
|
|
||||||
req_hooks = []
|
|
||||||
if request_hook:
|
|
||||||
req_hooks.append(request_hook)
|
|
||||||
|
|
||||||
err_hooks = []
|
|
||||||
if error_hook:
|
|
||||||
err_hooks.append(error_hook)
|
|
||||||
|
|
||||||
hook_table = {
|
|
||||||
'request': req_hooks,
|
|
||||||
'error': err_hooks
|
|
||||||
}
|
|
||||||
|
|
||||||
types_table = {
|
|
||||||
'response': response_type,
|
|
||||||
'request': request_type
|
|
||||||
}
|
|
||||||
|
|
||||||
def append_hooks(new_hooks: dict):
|
|
||||||
nonlocal hook_table
|
|
||||||
for htype, hooks in new_hooks.items():
|
|
||||||
hook_table[htype] += hooks
|
|
||||||
|
|
||||||
def update_types(new_types: dict):
|
|
||||||
nonlocal types_table
|
|
||||||
types_table.update(new_types)
|
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
open_autorecon_ws(url) as ws
|
open_autorecon_ws(url) as ws
|
||||||
|
@ -243,7 +212,7 @@ async def open_jsonrpc_session(
|
||||||
rpc_id: Iterable = count(start_id)
|
rpc_id: Iterable = count(start_id)
|
||||||
rpc_results: dict[int, dict] = {}
|
rpc_results: dict[int, dict] = {}
|
||||||
|
|
||||||
async def json_rpc(method: str, params: dict = {}) -> dict:
|
async def json_rpc(method: str, params: dict) -> dict:
|
||||||
'''
|
'''
|
||||||
perform a json rpc call and wait for the result, raise exception in
|
perform a json rpc call and wait for the result, raise exception in
|
||||||
case of error field present on response
|
case of error field present on response
|
||||||
|
@ -288,7 +257,8 @@ async def open_jsonrpc_session(
|
||||||
'result': _,
|
'result': _,
|
||||||
'id': mid,
|
'id': mid,
|
||||||
} if res_entry := rpc_results.get(mid):
|
} if res_entry := rpc_results.get(mid):
|
||||||
res_entry['result'] = types_table['response'](**msg)
|
|
||||||
|
res_entry['result'] = response_type(**msg)
|
||||||
res_entry['event'].set()
|
res_entry['event'].set()
|
||||||
|
|
||||||
case {
|
case {
|
||||||
|
@ -299,38 +269,24 @@ async def open_jsonrpc_session(
|
||||||
f'Unexpected ws msg: {json.dumps(msg, indent=4)}'
|
f'Unexpected ws msg: {json.dumps(msg, indent=4)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
case {
|
|
||||||
'error': error,
|
|
||||||
'id': mid
|
|
||||||
} if res_entry := rpc_results.get(mid):
|
|
||||||
|
|
||||||
res_entry['result'] = types_table['response'](**msg)
|
|
||||||
res_entry['event'].set()
|
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'method': _,
|
'method': _,
|
||||||
'params': _,
|
'params': _,
|
||||||
}:
|
}:
|
||||||
log.info(f'Recieved\n{msg}')
|
log.debug(f'Recieved\n{msg}')
|
||||||
if len(hook_table['request']) > 0:
|
if request_hook:
|
||||||
for hook in hook_table['request']:
|
await request_hook(request_type(**msg))
|
||||||
result = await hook(types_table['request'](**msg))
|
|
||||||
if result:
|
|
||||||
break
|
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'error': error,
|
'error': error
|
||||||
}:
|
}:
|
||||||
log.warning(f'Recieved\n{error}')
|
log.warning(f'Recieved\n{error}')
|
||||||
if len(hook_table['error']) > 0:
|
if error_hook:
|
||||||
for hook in hook_table['error']:
|
await error_hook(response_type(**msg))
|
||||||
result = await hook(types_table['response'](**msg))
|
|
||||||
if result:
|
|
||||||
break
|
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
||||||
|
|
||||||
n.start_soon(recv_task)
|
n.start_soon(recv_task)
|
||||||
yield json_rpc, append_hooks, update_types
|
yield json_rpc
|
||||||
n.cancel_scope.cancel()
|
n.cancel_scope.cancel()
|
||||||
|
|
|
@ -1,46 +0,0 @@
|
||||||
import trio
|
|
||||||
import pytest
|
|
||||||
import tractor
|
|
||||||
|
|
||||||
from piker import config
|
|
||||||
|
|
||||||
from piker.brokers.deribit import api as deribit
|
|
||||||
from piker.brokers.deribit.api import _testnet_ws_url
|
|
||||||
|
|
||||||
from piker._cacheables import open_cached_client
|
|
||||||
|
|
||||||
|
|
||||||
TESTNET_KEY_ID: str | None = None
|
|
||||||
TESTNET_KEY_SECRET: str | None = None
|
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
not TESTNET_KEY_ID or not TESTNET_KEY_SECRET,
|
|
||||||
reason='configure a deribit testnet key pair before running this test'
|
|
||||||
)
|
|
||||||
def test_deribit_get_ticker(open_test_pikerd):
|
|
||||||
|
|
||||||
async def _test_main():
|
|
||||||
async with open_test_pikerd() as _:
|
|
||||||
async with open_cached_client('deribit') as client:
|
|
||||||
|
|
||||||
symbols = await client.symbol_info()
|
|
||||||
|
|
||||||
syms = list(symbols.keys())
|
|
||||||
sym = syms[int(len(syms) / 2)]
|
|
||||||
|
|
||||||
async with deribit.maybe_open_ticker_feed(sym) as tick_stream:
|
|
||||||
async for typ, msg in tick_stream:
|
|
||||||
assert typ == 'ticker'
|
|
||||||
assert 'open_interest' in msg['data']
|
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
config.write({
|
|
||||||
'deribit': {
|
|
||||||
'ws_url': _testnet_ws_url,
|
|
||||||
'key_id': TESTNET_KEY_ID,
|
|
||||||
'key_secret': TESTNET_KEY_SECRET
|
|
||||||
}
|
|
||||||
})
|
|
||||||
trio.run(_test_main)
|
|
Loading…
Reference in New Issue