`.deribit.api` bit of tidying/typing

There were some imports missing or unused as well as a variety of spots
that had grokability issues due to missing type hints.

Other tweaks as part some more thorough manual testing:
- always raise when not `brokers.toml` section since the API can never
  work (no free data without keys).
- inline the `Asset.atype='crypto_currency` field despite it maybe not
  being the best value for `OptionPair` instruments..
- tossed in a now-masked pause block for debugging history queries in
  `Client.bars()`.
- commented out all the live order ctl (internal) endpoints for now
  since they're unused.
fix_deribit_hist_queries
Tyler Goodlet 2024-11-19 17:09:16 -05:00
parent be84d0dae1
commit dc2c379d86
1 changed files with 113 additions and 92 deletions

View File

@ -29,6 +29,7 @@ from decimal import (
) )
from functools import partial from functools import partial
import time import time
from pathlib import Path
from typing import ( from typing import (
Any, Any,
Optional, Optional,
@ -37,8 +38,6 @@ from typing import (
from pendulum import now from pendulum import now
import trio import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
@ -55,8 +54,10 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
# types for managing the cb callbacks. # types for managing the cb callbacks.
# from cryptofeed.types import L1Book # from cryptofeed.types import L1Book
from piker.brokers import SymbolNotFound
from .venues import ( from .venues import (
_ws_url, _ws_url,
MarketType, MarketType,
@ -64,9 +65,9 @@ from .venues import (
Pair, Pair,
OptionPair, OptionPair,
JSONRPCResult, JSONRPCResult,
JSONRPCChannel, # JSONRPCChannel,
KLinesResult, KLinesResult,
Trade, # Trade,
LastTradesResult, LastTradesResult,
) )
from piker.accounting import ( from piker.accounting import (
@ -77,7 +78,7 @@ from piker.accounting import (
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
Struct, # Struct,
) )
from piker.data._web_bs import ( from piker.data._web_bs import (
open_jsonrpc_session open_jsonrpc_session
@ -97,7 +98,7 @@ _spawn_kwargs = {
# convert datetime obj timestamp to unixtime in milliseconds # convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when): def deribit_timestamp(when) -> int:
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) return int((when.timestamp() * 1000) + (when.microsecond / 1000))
@ -179,16 +180,18 @@ def get_config() -> dict[str, Any]:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load( conf, path = config.load(
conf_name='brokers', conf_name='brokers',
touch_if_dne=True, touch_if_dne=True,
) )
section: dict = {} section: dict|None = conf.get('deribit')
section = conf.get('deribit')
if section is None: if section is None:
log.warning(f'No config section found for deribit in {path}') raise ValueError(
return {} f'No `[deribit]` section found in\n'
f'{path!r}\n\n'
f'See the template config from the core repo for samples..\n'
# f'<TODO put repo link here??>'
)
conf_option = section.get('option', {}) conf_option = section.get('option', {})
section.clear # clear the dict to reuse it section.clear # clear the dict to reuse it
@ -223,8 +226,12 @@ class Client:
self._auth_ts = None self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: async def _json_rpc_auth_wrapper(
self,
*args,
**kwargs,
) -> JSONRPCResult:
"""Background task that adquires a first access token and then will """Background task that adquires a first access token and then will
refresh the access token. refresh the access token.
@ -250,9 +257,6 @@ class Client:
return await self.json_rpc(*args, **kwargs) return await self.json_rpc(*args, **kwargs)
async def get_balances( async def get_balances(
self, self,
kind: str = 'option' kind: str = 'option'
@ -277,23 +281,29 @@ class Client:
venue: str | None = None, venue: str | None = None,
) -> dict[str, Asset]: ) -> dict[str, Asset]:
"""Return the set of asset balances for this account '''
by symbol. Return the set of asset balances for this account
""" by (deribit's) symbol.
'''
assets = {} assets = {}
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
'public/get_currencies', 'public/get_currencies',
params={} params={}
) )
currencies = resp.result currencies: list[dict] = resp.result
for currency in currencies: for currency in currencies:
name = currency['currency'] name: str = currency['currency']
tx_tick = digits_to_dec(currency['fee_precision']) tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
# TODO, handling of options, futures, perps etc. more
# specifically with diff `.atype`s?
assets[name] = Asset( assets[name] = Asset(
name=name, name=name,
atype=atype, atype='crypto_currency',
tx_tick=tx_tick) tx_tick=tx_tick,
)
instruments = await self.symbol_info(currency=name) instruments = await self.symbol_info(currency=name)
for instrument in instruments: for instrument in instruments:
@ -301,9 +311,10 @@ class Client:
assets[pair.symbol] = Asset( assets[pair.symbol] = Asset(
name=pair.symbol, name=pair.symbol,
atype=pair.venue, atype=pair.venue,
tx_tick=pair.size_tick) tx_tick=pair.size_tick,
)
return assets return assets
async def get_mkt_pairs(self) -> dict[str, Pair]: async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {} flat: dict[str, Pair] = {}
@ -381,7 +392,7 @@ class Client:
params: dict[str, str] = { params: dict[str, str] = {
'currency': currency.upper(), 'currency': currency.upper(),
'kind': kind, 'kind': kind,
'expired': str(expired).lower() 'expired': expired,
} }
resp: JSONRPCResult = await self._json_rpc_auth_wrapper( resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
@ -389,9 +400,9 @@ class Client:
params, params,
) )
# convert to symbol-keyed table # convert to symbol-keyed table
pair_type: Type = PAIRTYPES[kind] pair_type: Pair = PAIRTYPES[kind]
results: list[dict] | None = resp.result results: list[dict] | None = resp.result
instruments: dict[str, Pair] = {} instruments: dict[str, Pair] = {}
for item in results: for item in results:
symbol=item['instrument_name'].lower() symbol=item['instrument_name'].lower()
@ -427,12 +438,15 @@ class Client:
mkt_pairs = await self.symbol_info() mkt_pairs = await self.symbol_info()
if not mkt_pairs: if not mkt_pairs:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}') raise SymbolNotFound(
f'No market pairs found!?:\n'
f'{mkt_pairs}'
)
pairs_view_subtable: dict[str, Pair] = {} pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs: for instrument in mkt_pairs:
pair_type: Type = PAIRTYPES[venue] pair_type: Pair|OptionPair = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
@ -480,12 +494,14 @@ class Client:
if end_dt is None: if end_dt is None:
end_dt = now('UTC') end_dt = now('UTC')
_orig_start_dt = start_dt
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
'minute').subtract(minutes=limit) 'minute'
).subtract(minutes=limit)
start_time = deribit_timestamp(start_dt) start_time: int = deribit_timestamp(start_dt)
end_time = deribit_timestamp(end_dt) end_time: int = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data # https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
@ -499,9 +515,13 @@ class Client:
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars: list[tuple] = [] new_bars: list[tuple] = []
for i in range(len(result.close)): # if _orig_start_dt is None:
# if not new_bars:
# import tractor
# await tractor.pause()
row = [ for i in range(len(result.close)):
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
result.high[i], result.high[i],
@ -668,68 +688,69 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay( # TODO, move all to `.broker` submod!
fh: FeedHandler, # async def aio_order_feed_relay(
instrument: Symbol, # fh: FeedHandler,
from_trio: asyncio.Queue, # instrument: Symbol,
to_trio: trio.abc.SendChannel, # from_trio: asyncio.Queue,
) -> None: # to_trio: trio.abc.SendChannel,
async def _fill(data: dict, receipt_timestamp): # ) -> None:
breakpoint() # async def _fill(data: dict, receipt_timestamp):
# breakpoint()
async def _order_info(data: dict, receipt_timestamp): # async def _order_info(data: dict, receipt_timestamp):
breakpoint() # breakpoint()
fh.add_feed( # fh.add_feed(
DERIBIT, # DERIBIT,
channels=[FILLS, ORDER_INFO], # channels=[FILLS, ORDER_INFO],
symbols=[instrument.upper()], # symbols=[instrument.upper()],
callbacks={ # callbacks={
FILLS: _fill, # FILLS: _fill,
ORDER_INFO: _order_info, # ORDER_INFO: _order_info,
}) # })
if not fh.running: # if not fh.running:
fh.run( # fh.run(
start_loop=False, # start_loop=False,
install_signal_handlers=False) # install_signal_handlers=False)
# sync with trio # # sync with trio
to_trio.send_nowait(None) # to_trio.send_nowait(None)
await asyncio.sleep(float('inf')) # await asyncio.sleep(float('inf'))
@acm # @acm
async def open_order_feed( # async def open_order_feed(
instrument: list[str] # instrument: list[str]
) -> trio.abc.ReceiveStream: # ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: # async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( # async with to_asyncio.open_channel_from(
partial( # partial(
aio_order_feed_relay, # aio_order_feed_relay,
fh, # fh,
instrument # instrument
) # )
) as (first, chan): # ) as (first, chan):
yield chan # yield chan
@acm # @acm
async def maybe_open_order_feed( # async def maybe_open_order_feed(
instrument: str # instrument: str
) -> trio.abc.ReceiveStream: # ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # # TODO: add a predicate to maybe_open_context
async with maybe_open_context( # async with maybe_open_context(
acm_func=open_order_feed, # acm_func=open_order_feed,
kwargs={ # kwargs={
'instrument': instrument.split('.')[0], # 'instrument': instrument.split('.')[0],
'fh': fh # 'fh': fh
}, # },
key=f'{instrument.split('.')[0]}-order', # key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed): # ) as (cache_hit, feed):
if cache_hit: # if cache_hit:
yield broadcast_receiver(feed, 10) # yield broadcast_receiver(feed, 10)
else: # else:
yield feed # yield feed