diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 3b19ff59..68f76d4d 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -29,6 +29,7 @@ from decimal import ( ) from functools import partial import time +from pathlib import Path from typing import ( Any, Optional, @@ -37,8 +38,6 @@ from typing import ( from pendulum import now import trio -from trio_typing import TaskStatus -from rapidfuzz import process as fuzzy import numpy as np from tractor.trionics import ( broadcast_receiver, @@ -55,8 +54,10 @@ from cryptofeed.defines import ( OPTION, CALL, PUT ) from cryptofeed.symbols import Symbol + # types for managing the cb callbacks. # from cryptofeed.types import L1Book +from piker.brokers import SymbolNotFound from .venues import ( _ws_url, MarketType, @@ -64,9 +65,9 @@ from .venues import ( Pair, OptionPair, JSONRPCResult, - JSONRPCChannel, + # JSONRPCChannel, KLinesResult, - Trade, + # Trade, LastTradesResult, ) from piker.accounting import ( @@ -77,7 +78,7 @@ from piker.accounting import ( from piker.data import ( def_iohlcv_fields, match_from_pairs, - Struct, + # Struct, ) from piker.data._web_bs import ( open_jsonrpc_session @@ -97,7 +98,7 @@ _spawn_kwargs = { # convert datetime obj timestamp to unixtime in milliseconds -def deribit_timestamp(when): +def deribit_timestamp(when) -> int: return int((when.timestamp() * 1000) + (when.microsecond / 1000)) @@ -179,16 +180,18 @@ def get_config() -> dict[str, Any]: conf: dict path: Path - conf, path = config.load( conf_name='brokers', touch_if_dne=True, ) - section: dict = {} - section = conf.get('deribit') + section: dict|None = conf.get('deribit') if section is None: - log.warning(f'No config section found for deribit in {path}') - return {} + raise ValueError( + f'No `[deribit]` section found in\n' + f'{path!r}\n\n' + f'See the template config from the core repo for samples..\n' + # f'' + ) conf_option = section.get('option', {}) section.clear # clear the dict to reuse it @@ -223,8 +226,12 @@ class Client: self._auth_ts = None self._auth_renew_ts = 5 # seconds to renew auth - async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: - + async def _json_rpc_auth_wrapper( + self, + *args, + **kwargs, + ) -> JSONRPCResult: + """Background task that adquires a first access token and then will refresh the access token. @@ -250,9 +257,6 @@ class Client: return await self.json_rpc(*args, **kwargs) - - - async def get_balances( self, kind: str = 'option' @@ -277,23 +281,29 @@ class Client: venue: str | None = None, ) -> dict[str, Asset]: - """Return the set of asset balances for this account - by symbol. - """ + ''' + Return the set of asset balances for this account + by (deribit's) symbol. + + + ''' assets = {} resp = await self._json_rpc_auth_wrapper( 'public/get_currencies', params={} ) - currencies = resp.result + currencies: list[dict] = resp.result for currency in currencies: - name = currency['currency'] - tx_tick = digits_to_dec(currency['fee_precision']) - atype='crypto_currency' + name: str = currency['currency'] + tx_tick: Decimal = digits_to_dec(currency['fee_precision']) + + # TODO, handling of options, futures, perps etc. more + # specifically with diff `.atype`s? assets[name] = Asset( name=name, - atype=atype, - tx_tick=tx_tick) + atype='crypto_currency', + tx_tick=tx_tick, + ) instruments = await self.symbol_info(currency=name) for instrument in instruments: @@ -301,9 +311,10 @@ class Client: assets[pair.symbol] = Asset( name=pair.symbol, atype=pair.venue, - tx_tick=pair.size_tick) + tx_tick=pair.size_tick, + ) - return assets + return assets async def get_mkt_pairs(self) -> dict[str, Pair]: flat: dict[str, Pair] = {} @@ -381,7 +392,7 @@ class Client: params: dict[str, str] = { 'currency': currency.upper(), 'kind': kind, - 'expired': str(expired).lower() + 'expired': expired, } resp: JSONRPCResult = await self._json_rpc_auth_wrapper( @@ -389,9 +400,9 @@ class Client: params, ) # convert to symbol-keyed table - pair_type: Type = PAIRTYPES[kind] + pair_type: Pair = PAIRTYPES[kind] results: list[dict] | None = resp.result - + instruments: dict[str, Pair] = {} for item in results: symbol=item['instrument_name'].lower() @@ -427,12 +438,15 @@ class Client: mkt_pairs = await self.symbol_info() if not mkt_pairs: - raise SymbolNotFound(f'No market pairs found!?:\n{resp}') + raise SymbolNotFound( + f'No market pairs found!?:\n' + f'{mkt_pairs}' + ) pairs_view_subtable: dict[str, Pair] = {} for instrument in mkt_pairs: - pair_type: Type = PAIRTYPES[venue] + pair_type: Pair|OptionPair = PAIRTYPES[venue] pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) @@ -480,12 +494,14 @@ class Client: if end_dt is None: end_dt = now('UTC') + _orig_start_dt = start_dt if start_dt is None: start_dt = end_dt.start_of( - 'minute').subtract(minutes=limit) + 'minute' + ).subtract(minutes=limit) - start_time = deribit_timestamp(start_dt) - end_time = deribit_timestamp(end_dt) + start_time: int = deribit_timestamp(start_dt) + end_time: int = deribit_timestamp(end_dt) # https://docs.deribit.com/#public-get_tradingview_chart_data resp = await self._json_rpc_auth_wrapper( @@ -499,9 +515,13 @@ class Client: result = KLinesResult(**resp.result) new_bars: list[tuple] = [] - for i in range(len(result.close)): + # if _orig_start_dt is None: + # if not new_bars: + # import tractor + # await tractor.pause() - row = [ + for i in range(len(result.close)): + row = [ (start_time + (i * (60 * 1000))) / 1000.0, # time result.open[i], result.high[i], @@ -668,68 +688,69 @@ async def maybe_open_price_feed( -async def aio_order_feed_relay( - fh: FeedHandler, - instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, -) -> None: - async def _fill(data: dict, receipt_timestamp): - breakpoint() +# TODO, move all to `.broker` submod! +# async def aio_order_feed_relay( +# fh: FeedHandler, +# instrument: Symbol, +# from_trio: asyncio.Queue, +# to_trio: trio.abc.SendChannel, +# ) -> None: +# async def _fill(data: dict, receipt_timestamp): +# breakpoint() - async def _order_info(data: dict, receipt_timestamp): - breakpoint() +# async def _order_info(data: dict, receipt_timestamp): +# breakpoint() - fh.add_feed( - DERIBIT, - channels=[FILLS, ORDER_INFO], - symbols=[instrument.upper()], - callbacks={ - FILLS: _fill, - ORDER_INFO: _order_info, - }) +# fh.add_feed( +# DERIBIT, +# channels=[FILLS, ORDER_INFO], +# symbols=[instrument.upper()], +# callbacks={ +# FILLS: _fill, +# ORDER_INFO: _order_info, +# }) - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) +# if not fh.running: +# fh.run( +# start_loop=False, +# install_signal_handlers=False) - # sync with trio - to_trio.send_nowait(None) +# # sync with trio +# to_trio.send_nowait(None) - await asyncio.sleep(float('inf')) +# await asyncio.sleep(float('inf')) -@acm -async def open_order_feed( - instrument: list[str] -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( - partial( - aio_order_feed_relay, - fh, - instrument - ) - ) as (first, chan): - yield chan +# @acm +# async def open_order_feed( +# instrument: list[str] +# ) -> trio.abc.ReceiveStream: +# async with maybe_open_feed_handler() as fh: +# async with to_asyncio.open_channel_from( +# partial( +# aio_order_feed_relay, +# fh, +# instrument +# ) +# ) as (first, chan): +# yield chan -@acm -async def maybe_open_order_feed( - instrument: str -) -> trio.abc.ReceiveStream: +# @acm +# async def maybe_open_order_feed( +# instrument: str +# ) -> trio.abc.ReceiveStream: - # TODO: add a predicate to maybe_open_context - async with maybe_open_context( - acm_func=open_order_feed, - kwargs={ - 'instrument': instrument.split('.')[0], - 'fh': fh - }, - key=f'{instrument.split('.')[0]}-order', - ) as (cache_hit, feed): - if cache_hit: - yield broadcast_receiver(feed, 10) - else: - yield feed +# # TODO: add a predicate to maybe_open_context +# async with maybe_open_context( +# acm_func=open_order_feed, +# kwargs={ +# 'instrument': instrument.split('.')[0], +# 'fh': fh +# }, +# key=f'{instrument.split('.')[0]}-order', +# ) as (cache_hit, feed): +# if cache_hit: +# yield broadcast_receiver(feed, 10) +# else: +# yield feed