`.deribit.api` bit of tidying/typing
There were some imports missing or unused as well as a variety of spots that had grokability issues due to missing type hints. Other tweaks as part some more thorough manual testing: - always raise when not `brokers.toml` section since the API can never work (no free data without keys). - inline the `Asset.atype='crypto_currency` field despite it maybe not being the best value for `OptionPair` instruments.. - tossed in a now-masked pause block for debugging history queries in `Client.bars()`. - commented out all the live order ctl (internal) endpoints for now since they're unused.fix_deribit_hist_queries_NEW
							parent
							
								
									d0a12069eb
								
							
						
					
					
						commit
						fc40612ca6
					
				|  | @ -29,6 +29,7 @@ from decimal import ( | |||
| ) | ||||
| from functools import partial | ||||
| import time | ||||
| from pathlib import Path | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Optional, | ||||
|  | @ -37,8 +38,6 @@ from typing import ( | |||
| 
 | ||||
| from pendulum import now | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| from rapidfuzz import process as fuzzy | ||||
| import numpy as np | ||||
| from tractor.trionics import ( | ||||
|     broadcast_receiver, | ||||
|  | @ -55,8 +54,10 @@ from cryptofeed.defines import ( | |||
|     OPTION, CALL, PUT | ||||
| ) | ||||
| from cryptofeed.symbols import Symbol | ||||
| 
 | ||||
| # types for managing the cb callbacks. | ||||
| # from cryptofeed.types import L1Book | ||||
| from piker.brokers import SymbolNotFound | ||||
| from .venues import ( | ||||
|     _ws_url, | ||||
|     MarketType, | ||||
|  | @ -64,9 +65,9 @@ from .venues import ( | |||
|     Pair, | ||||
|     OptionPair, | ||||
|     JSONRPCResult, | ||||
|     JSONRPCChannel, | ||||
|     # JSONRPCChannel, | ||||
|     KLinesResult, | ||||
|     Trade, | ||||
|     # Trade, | ||||
|     LastTradesResult, | ||||
| ) | ||||
| from piker.accounting import ( | ||||
|  | @ -77,7 +78,7 @@ from piker.accounting import ( | |||
| from piker.data import ( | ||||
|     def_iohlcv_fields, | ||||
|     match_from_pairs, | ||||
|     Struct, | ||||
|     # Struct, | ||||
| ) | ||||
| from piker.data._web_bs import ( | ||||
|     open_jsonrpc_session | ||||
|  | @ -97,7 +98,7 @@ _spawn_kwargs = { | |||
| 
 | ||||
| 
 | ||||
| # convert datetime obj timestamp to unixtime in milliseconds | ||||
| def deribit_timestamp(when): | ||||
| def deribit_timestamp(when) -> int: | ||||
|     return int((when.timestamp() * 1000) + (when.microsecond / 1000)) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -179,16 +180,18 @@ def get_config() -> dict[str, Any]: | |||
| 
 | ||||
|     conf: dict | ||||
|     path: Path | ||||
| 
 | ||||
|     conf, path = config.load( | ||||
|         conf_name='brokers', | ||||
|         touch_if_dne=True, | ||||
|     ) | ||||
|     section: dict = {} | ||||
|     section = conf.get('deribit') | ||||
|     section: dict|None = conf.get('deribit') | ||||
|     if section is None: | ||||
|         log.warning(f'No config section found for deribit in {path}') | ||||
|         return {} | ||||
|         raise ValueError( | ||||
|             f'No `[deribit]` section found in\n' | ||||
|             f'{path!r}\n\n' | ||||
|             f'See the template config from the core repo for samples..\n' | ||||
|             # f'<TODO put repo link here??>' | ||||
|         ) | ||||
| 
 | ||||
|     conf_option = section.get('option', {}) | ||||
|     section.clear # clear the dict to reuse it | ||||
|  | @ -223,8 +226,12 @@ class Client: | |||
|         self._auth_ts = None | ||||
|         self._auth_renew_ts = 5 # seconds to renew auth | ||||
| 
 | ||||
|     async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: | ||||
|          | ||||
|     async def _json_rpc_auth_wrapper( | ||||
|         self, | ||||
|         *args, | ||||
|         **kwargs, | ||||
|     ) -> JSONRPCResult: | ||||
| 
 | ||||
|         """Background task that adquires a first access token and then will | ||||
|         refresh the access token. | ||||
| 
 | ||||
|  | @ -250,9 +257,6 @@ class Client: | |||
| 
 | ||||
|         return await self.json_rpc(*args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|     async def get_balances( | ||||
|         self, | ||||
|         kind: str = 'option' | ||||
|  | @ -277,23 +281,29 @@ class Client: | |||
|         venue: str | None = None, | ||||
| 
 | ||||
|     ) -> dict[str, Asset]: | ||||
|         """Return the set of asset balances for this account | ||||
|         by symbol. | ||||
|         """ | ||||
|         ''' | ||||
|         Return the set of asset balances for this account | ||||
|         by (deribit's) symbol. | ||||
| 
 | ||||
| 
 | ||||
|         ''' | ||||
|         assets = {} | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_currencies', | ||||
|             params={} | ||||
|         ) | ||||
|         currencies = resp.result | ||||
|         currencies: list[dict] = resp.result | ||||
|         for currency in currencies: | ||||
|             name = currency['currency'] | ||||
|             tx_tick = digits_to_dec(currency['fee_precision'])  | ||||
|             atype='crypto_currency' | ||||
|             name: str = currency['currency'] | ||||
|             tx_tick: Decimal = digits_to_dec(currency['fee_precision']) | ||||
| 
 | ||||
|             # TODO, handling of options, futures, perps etc. more | ||||
|             # specifically with diff `.atype`s? | ||||
|             assets[name] = Asset( | ||||
|                 name=name, | ||||
|                 atype=atype, | ||||
|                 tx_tick=tx_tick) | ||||
|                 atype='crypto_currency', | ||||
|                 tx_tick=tx_tick, | ||||
|             ) | ||||
| 
 | ||||
|             instruments = await self.symbol_info(currency=name) | ||||
|             for instrument in instruments: | ||||
|  | @ -301,9 +311,10 @@ class Client: | |||
|                 assets[pair.symbol] = Asset( | ||||
|                     name=pair.symbol, | ||||
|                     atype=pair.venue, | ||||
|                     tx_tick=pair.size_tick) | ||||
|                     tx_tick=pair.size_tick, | ||||
|                 ) | ||||
| 
 | ||||
|         return assets  | ||||
|         return assets | ||||
| 
 | ||||
|     async def get_mkt_pairs(self) -> dict[str, Pair]: | ||||
|         flat: dict[str, Pair] = {} | ||||
|  | @ -381,7 +392,7 @@ class Client: | |||
|         params: dict[str, str] = { | ||||
|             'currency': currency.upper(), | ||||
|             'kind': kind, | ||||
|             'expired': str(expired).lower() | ||||
|             'expired': expired, | ||||
|         } | ||||
| 
 | ||||
|         resp: JSONRPCResult = await self._json_rpc_auth_wrapper( | ||||
|  | @ -389,9 +400,9 @@ class Client: | |||
|             params, | ||||
|         ) | ||||
|         # convert to symbol-keyed table | ||||
|         pair_type: Type = PAIRTYPES[kind] | ||||
|         pair_type: Pair = PAIRTYPES[kind] | ||||
|         results: list[dict] | None = resp.result | ||||
|          | ||||
| 
 | ||||
|         instruments: dict[str, Pair] = {} | ||||
|         for item in results: | ||||
|             symbol=item['instrument_name'].lower() | ||||
|  | @ -427,12 +438,15 @@ class Client: | |||
|         mkt_pairs = await self.symbol_info() | ||||
| 
 | ||||
|         if not mkt_pairs: | ||||
|             raise SymbolNotFound(f'No market pairs found!?:\n{resp}') | ||||
|             raise SymbolNotFound( | ||||
|                 f'No market pairs found!?:\n' | ||||
|                 f'{mkt_pairs}' | ||||
|             ) | ||||
| 
 | ||||
|         pairs_view_subtable: dict[str, Pair] = {} | ||||
| 
 | ||||
|         for instrument in mkt_pairs: | ||||
|             pair_type: Type = PAIRTYPES[venue] | ||||
|             pair_type: Pair|OptionPair = PAIRTYPES[venue] | ||||
| 
 | ||||
|             pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) | ||||
| 
 | ||||
|  | @ -480,12 +494,14 @@ class Client: | |||
|         if end_dt is None: | ||||
|             end_dt = now('UTC') | ||||
| 
 | ||||
|         _orig_start_dt = start_dt | ||||
|         if start_dt is None: | ||||
|             start_dt = end_dt.start_of( | ||||
|                 'minute').subtract(minutes=limit) | ||||
|                 'minute' | ||||
|             ).subtract(minutes=limit) | ||||
| 
 | ||||
|         start_time = deribit_timestamp(start_dt) | ||||
|         end_time = deribit_timestamp(end_dt) | ||||
|         start_time: int = deribit_timestamp(start_dt) | ||||
|         end_time: int = deribit_timestamp(end_dt) | ||||
| 
 | ||||
|         # https://docs.deribit.com/#public-get_tradingview_chart_data | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|  | @ -499,9 +515,13 @@ class Client: | |||
| 
 | ||||
|         result = KLinesResult(**resp.result) | ||||
|         new_bars: list[tuple] = [] | ||||
|         for i in range(len(result.close)): | ||||
|         # if _orig_start_dt is None: | ||||
|         # if not new_bars: | ||||
|         #     import tractor | ||||
|         #     await tractor.pause() | ||||
| 
 | ||||
|             row = [  | ||||
|         for i in range(len(result.close)): | ||||
|             row = [ | ||||
|                 (start_time + (i * (60 * 1000))) / 1000.0,  # time | ||||
|                 result.open[i], | ||||
|                 result.high[i], | ||||
|  | @ -668,68 +688,69 @@ async def maybe_open_price_feed( | |||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| async def aio_order_feed_relay( | ||||
|     fh: FeedHandler, | ||||
|     instrument: Symbol, | ||||
|     from_trio: asyncio.Queue, | ||||
|     to_trio: trio.abc.SendChannel, | ||||
| ) -> None: | ||||
|     async def _fill(data: dict, receipt_timestamp): | ||||
|         breakpoint() | ||||
| # TODO, move all to `.broker` submod! | ||||
| # async def aio_order_feed_relay( | ||||
| #     fh: FeedHandler, | ||||
| #     instrument: Symbol, | ||||
| #     from_trio: asyncio.Queue, | ||||
| #     to_trio: trio.abc.SendChannel, | ||||
| # ) -> None: | ||||
| #     async def _fill(data: dict, receipt_timestamp): | ||||
| #         breakpoint() | ||||
| 
 | ||||
|     async def _order_info(data: dict, receipt_timestamp): | ||||
|         breakpoint() | ||||
| #     async def _order_info(data: dict, receipt_timestamp): | ||||
| #         breakpoint() | ||||
| 
 | ||||
|     fh.add_feed( | ||||
|         DERIBIT, | ||||
|         channels=[FILLS, ORDER_INFO], | ||||
|         symbols=[instrument.upper()], | ||||
|         callbacks={ | ||||
|             FILLS: _fill, | ||||
|             ORDER_INFO: _order_info, | ||||
|         }) | ||||
| #     fh.add_feed( | ||||
| #         DERIBIT, | ||||
| #         channels=[FILLS, ORDER_INFO], | ||||
| #         symbols=[instrument.upper()], | ||||
| #         callbacks={ | ||||
| #             FILLS: _fill, | ||||
| #             ORDER_INFO: _order_info, | ||||
| #         }) | ||||
| 
 | ||||
|     if not fh.running: | ||||
|         fh.run( | ||||
|             start_loop=False, | ||||
|             install_signal_handlers=False) | ||||
| #     if not fh.running: | ||||
| #         fh.run( | ||||
| #             start_loop=False, | ||||
| #             install_signal_handlers=False) | ||||
| 
 | ||||
|     # sync with trio | ||||
|     to_trio.send_nowait(None) | ||||
| #     # sync with trio | ||||
| #     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     await asyncio.sleep(float('inf')) | ||||
| #     await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_order_feed( | ||||
|     instrument: list[str] | ||||
| ) -> trio.abc.ReceiveStream: | ||||
|     async with maybe_open_feed_handler() as fh: | ||||
|         async with to_asyncio.open_channel_from( | ||||
|             partial( | ||||
|                 aio_order_feed_relay, | ||||
|                 fh, | ||||
|                 instrument | ||||
|             ) | ||||
|         ) as (first, chan): | ||||
|             yield chan | ||||
| # @acm | ||||
| # async def open_order_feed( | ||||
| #     instrument: list[str] | ||||
| # ) -> trio.abc.ReceiveStream: | ||||
| #     async with maybe_open_feed_handler() as fh: | ||||
| #         async with to_asyncio.open_channel_from( | ||||
| #             partial( | ||||
| #                 aio_order_feed_relay, | ||||
| #                 fh, | ||||
| #                 instrument | ||||
| #             ) | ||||
| #         ) as (first, chan): | ||||
| #             yield chan | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_order_feed( | ||||
|     instrument: str | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| # @acm | ||||
| # async def maybe_open_order_feed( | ||||
| #     instrument: str | ||||
| # ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     # TODO: add a predicate to maybe_open_context | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_order_feed, | ||||
|         kwargs={ | ||||
|             'instrument': instrument.split('.')[0], | ||||
|             'fh': fh | ||||
|         }, | ||||
|         key=f'{instrument.split('.')[0]}-order', | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|         else: | ||||
|             yield feed | ||||
| #     # TODO: add a predicate to maybe_open_context | ||||
| #     async with maybe_open_context( | ||||
| #         acm_func=open_order_feed, | ||||
| #         kwargs={ | ||||
| #             'instrument': instrument.split('.')[0], | ||||
| #             'fh': fh | ||||
| #         }, | ||||
| #         key=f'{instrument.split('.')[0]}-order', | ||||
| #     ) as (cache_hit, feed): | ||||
| #         if cache_hit: | ||||
| #             yield broadcast_receiver(feed, 10) | ||||
| #         else: | ||||
| #             yield feed | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue