Compare commits
	
		
			39 Commits 
		
	
	
		
			6555ccfbba
			...
			11a883892d
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 11a883892d | |
|  | d25864af2d | |
|  | a2e2d4706a | |
|  | ae5bd1f74f | |
|  | 45d24c85bf | |
|  | f19fc01f6b | |
|  | f99d5f5a57 | |
|  | 52aee44838 | |
|  | 104afa80cf | |
|  | d8b5109e47 | |
|  | 781810b4e1 | |
|  | 1459ed9235 | |
|  | ac45e212aa | |
|  | c915c270d5 | |
|  | 063af21180 | |
|  | 98e6d2e436 | |
|  | 4e18b43346 | |
|  | a55bbf429d | |
|  | 3a57d803cd | |
|  | 90681e86e5 | |
|  | 501f269656 | |
|  | b7622d7ffe | |
|  | 857628e43c | |
|  | 5336c79546 | |
|  | 4c992e7569 | |
|  | 02a3fe046d | |
|  | ea919e7b25 | |
|  | d2c7aa5643 | |
|  | 67c9b0fb40 | |
|  | 86896f1b10 | |
|  | 599316cf09 | |
|  | 4a89569f86 | |
|  | 88d5cd1d38 | |
|  | 6613f37156 | |
|  | cb125b793f | |
|  | f6fef07df8 | |
|  | 6c19cc5d55 | |
|  | 1ed40ebbf7 | |
|  | fce1ded121 | 
|  | @ -51,6 +51,7 @@ __brokers__: list[str] = [ | |||
|     'ib', | ||||
|     'kraken', | ||||
|     'kucoin', | ||||
|     'deribit', | ||||
| 
 | ||||
|     # broken but used to work | ||||
|     # 'questrade', | ||||
|  | @ -61,7 +62,6 @@ __brokers__: list[str] = [ | |||
|     # wstrade | ||||
|     # iex | ||||
| 
 | ||||
|     # deribit | ||||
|     # bitso | ||||
| ] | ||||
| 
 | ||||
|  |  | |||
|  | @ -181,7 +181,6 @@ class FutesPair(Pair): | |||
|     quoteAsset: str  # 'USDT', | ||||
|     quotePrecision: int  # 8, | ||||
|     requiredMarginPercent: float  # '5.0000', | ||||
|     settlePlan: int  # 0, | ||||
|     timeInForce: list[str]  # ['GTC', 'IOC', 'FOK', 'GTX'], | ||||
|     triggerProtect: float  # '0.0500', | ||||
|     underlyingSubType: list[str]  # ['PoW'], | ||||
|  |  | |||
|  | @ -25,6 +25,7 @@ from .api import ( | |||
|     get_client, | ||||
| ) | ||||
| from .feed import ( | ||||
|     get_mkt_info, | ||||
|     open_history_client, | ||||
|     open_symbol_search, | ||||
|     stream_quotes, | ||||
|  | @ -34,15 +35,20 @@ from .feed import ( | |||
|     # open_trade_dialog, | ||||
|     # norm_trade_records, | ||||
| # ) | ||||
| from .venues import ( | ||||
|     OptionPair, | ||||
| ) | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| __all__ = [ | ||||
|     'get_client', | ||||
| #    'trades_dialogue', | ||||
|     'get_mkt_info', | ||||
|     'open_history_client', | ||||
|     'open_symbol_search', | ||||
|     'stream_quotes', | ||||
|     'OptionPair', | ||||
| #    'norm_trade_records', | ||||
| ] | ||||
| 
 | ||||
|  |  | |||
|  | @ -19,10 +19,14 @@ Deribit backend. | |||
| 
 | ||||
| ''' | ||||
| import asyncio | ||||
| from collections import ChainMap | ||||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from datetime import datetime | ||||
| from decimal import ( | ||||
|     Decimal, | ||||
| ) | ||||
| from functools import partial | ||||
| import time | ||||
| from typing import ( | ||||
|  | @ -31,7 +35,7 @@ from typing import ( | |||
|     Callable, | ||||
| ) | ||||
| 
 | ||||
| import pendulum | ||||
| from pendulum import now | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| from rapidfuzz import process as fuzzy | ||||
|  | @ -51,7 +55,25 @@ from cryptofeed.defines import ( | |||
|     OPTION, CALL, PUT | ||||
| ) | ||||
| from cryptofeed.symbols import Symbol | ||||
| 
 | ||||
| # types for managing the cb callbacks. | ||||
| # from cryptofeed.types import L1Book | ||||
| from .venues import ( | ||||
|     _ws_url, | ||||
|     MarketType, | ||||
|     PAIRTYPES, | ||||
|     Pair, | ||||
|     OptionPair, | ||||
|     JSONRPCResult, | ||||
|     JSONRPCChannel, | ||||
|     KLinesResult, | ||||
|     Trade, | ||||
|     LastTradesResult, | ||||
| ) | ||||
| from piker.accounting import ( | ||||
|     Asset, | ||||
|     digits_to_dec, | ||||
|     MktPair, | ||||
| ) | ||||
| from piker.data import ( | ||||
|     def_iohlcv_fields, | ||||
|     match_from_pairs, | ||||
|  | @ -74,57 +96,6 @@ _spawn_kwargs = { | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| _url = 'https://www.deribit.com' | ||||
| _ws_url = 'wss://www.deribit.com/ws/api/v2' | ||||
| _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' | ||||
| 
 | ||||
| 
 | ||||
| class JSONRPCResult(Struct): | ||||
|     jsonrpc: str = '2.0' | ||||
|     id: int | ||||
|     result: Optional[list[dict]] = None | ||||
|     error: Optional[dict] = None | ||||
|     usIn: int | ||||
|     usOut: int | ||||
|     usDiff: int | ||||
|     testnet: bool | ||||
| 
 | ||||
| class JSONRPCChannel(Struct): | ||||
|     jsonrpc: str = '2.0' | ||||
|     method: str | ||||
|     params: dict | ||||
| 
 | ||||
| 
 | ||||
| class KLinesResult(Struct): | ||||
|     close: list[float] | ||||
|     cost: list[float] | ||||
|     high: list[float] | ||||
|     low: list[float] | ||||
|     open: list[float] | ||||
|     status: str | ||||
|     ticks: list[int] | ||||
|     volume: list[float] | ||||
| 
 | ||||
| class Trade(Struct): | ||||
|     trade_seq: int | ||||
|     trade_id: str | ||||
|     timestamp: int | ||||
|     tick_direction: int | ||||
|     price: float | ||||
|     mark_price: float | ||||
|     iv: float | ||||
|     instrument_name: str | ||||
|     index_price: float | ||||
|     direction: str | ||||
|     combo_trade_id: Optional[int] = 0, | ||||
|     combo_id: Optional[str] = '', | ||||
|     amount: float | ||||
| 
 | ||||
| class LastTradesResult(Struct): | ||||
|     trades: list[Trade] | ||||
|     has_more: bool | ||||
| 
 | ||||
| 
 | ||||
| # convert datetime obj timestamp to unixtime in milliseconds | ||||
| def deribit_timestamp(when): | ||||
|     return int((when.timestamp() * 1000) + (when.microsecond / 1000)) | ||||
|  | @ -142,13 +113,15 @@ def str_to_cb_sym(name: str) -> Symbol: | |||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
| 
 | ||||
|     new_expiry_date = get_values_from_cb_normalized_date(expiry_date) | ||||
| 
 | ||||
|     return Symbol( | ||||
|         base, quote, | ||||
|         base=base, | ||||
|         quote=quote, | ||||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=expiry_date, | ||||
|         expiry_normalize=False) | ||||
|         expiry_date=new_expiry_date) | ||||
| 
 | ||||
| 
 | ||||
| def piker_sym_to_cb_sym(name: str) -> Symbol: | ||||
|  | @ -159,83 +132,138 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: | |||
| 
 | ||||
|     if option_type == 'P': | ||||
|         option_type = PUT  | ||||
|     elif option_type  == 'C': | ||||
|     elif option_type == 'C': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
| 
 | ||||
|     return Symbol( | ||||
|         base, quote, | ||||
|         base=base, | ||||
|         quote=quote, | ||||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=expiry_date.upper()) | ||||
|         expiry_date=expiry_date) | ||||
| 
 | ||||
| 
 | ||||
| def cb_sym_to_deribit_inst(sym: Symbol): | ||||
|     # cryptofeed normalized | ||||
|     cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] | ||||
| 
 | ||||
|     # deribit specific  | ||||
|     months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] | ||||
| 
 | ||||
|     exp = sym.expiry_date | ||||
| 
 | ||||
|     # YYMDD | ||||
|     # 01234 | ||||
|     year, month, day = ( | ||||
|         exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) | ||||
| 
 | ||||
|     new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) | ||||
|     otype = 'C' if sym.option_type == CALL else 'P' | ||||
| 
 | ||||
|     return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' | ||||
|     return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' | ||||
| 
 | ||||
| 
 | ||||
| def get_values_from_cb_normalized_date(expiry_date: str) -> str: | ||||
|     # deribit specific | ||||
|     cb_norm = [ | ||||
|         'F', 'G', 'H', 'J', | ||||
|         'K', 'M', 'N', 'Q', | ||||
|         'U', 'V', 'X', 'Z' | ||||
|     ] | ||||
|     months = [ | ||||
|         'JAN', 'FEB', 'MAR', 'APR', | ||||
|         'MAY', 'JUN', 'JUL', 'AUG', | ||||
|         'SEP', 'OCT', 'NOV', 'DEC' | ||||
|     ] | ||||
|     # YYMDD | ||||
|     # 01234 | ||||
|     day, month, year = ( | ||||
|         expiry_date[3:], | ||||
|         months[cb_norm.index(expiry_date[2:3])], | ||||
|         expiry_date[:2] | ||||
|     ) | ||||
|     return f'{day}{month}{year}' | ||||
| 
 | ||||
| 
 | ||||
| def get_config() -> dict[str, Any]: | ||||
| 
 | ||||
|     conf, path = config.load() | ||||
|     conf: dict | ||||
|     path: Path | ||||
| 
 | ||||
|     conf, path = config.load( | ||||
|         conf_name='brokers', | ||||
|         touch_if_dne=True, | ||||
|     ) | ||||
|     section: dict = {} | ||||
|     section = conf.get('deribit') | ||||
| 
 | ||||
|     # TODO: document why we send this, basically because logging params for cryptofeed | ||||
|     conf['log'] = {} | ||||
|     conf['log']['disabled'] = True | ||||
| 
 | ||||
|     if section is None: | ||||
|         log.warning(f'No config section found for deribit in {path}') | ||||
|         return {} | ||||
| 
 | ||||
|     return conf  | ||||
|     conf_option = section.get('option', {}) | ||||
|     section.clear # clear the dict to reuse it | ||||
|     section['deribit'] = {} | ||||
|     section['deribit']['key_id'] = conf_option.get('api_key') | ||||
|     section['deribit']['key_secret'] = conf_option.get('api_secret') | ||||
| 
 | ||||
|     section['log'] = {} | ||||
|     section['log']['filename'] = 'feedhandler.log' | ||||
|     section['log']['level'] = 'DEBUG' | ||||
| 
 | ||||
|     return section | ||||
| 
 | ||||
| 
 | ||||
| class Client: | ||||
| 
 | ||||
|     def __init__(self, json_rpc: Callable) -> None: | ||||
|         self._pairs: dict[str, Any] = None | ||||
|     def __init__( | ||||
|         self, | ||||
| 
 | ||||
|         json_rpc: Callable | ||||
| 
 | ||||
|     ) -> None: | ||||
|         self._pairs: ChainMap[str, Pair] = ChainMap() | ||||
| 
 | ||||
|         config = get_config().get('deribit', {}) | ||||
| 
 | ||||
|         if ('key_id' in config) and ('key_secret' in config): | ||||
|             self._key_id = config['key_id'] | ||||
|             self._key_secret = config['key_secret'] | ||||
| 
 | ||||
|         else: | ||||
|             self._key_id = None | ||||
|             self._key_secret = None | ||||
|         self._key_id = config.get('key_id') | ||||
|         self._key_secret = config.get('key_secret') | ||||
| 
 | ||||
|         self.json_rpc = json_rpc | ||||
| 
 | ||||
|     @property | ||||
|     def currencies(self): | ||||
|         return ['btc', 'eth', 'sol', 'usd'] | ||||
|         self._auth_ts = None | ||||
|         self._auth_renew_ts = 5 # seconds to renew auth | ||||
| 
 | ||||
|     async def get_balances(self, kind: str = 'option') -> dict[str, float]: | ||||
|     async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: | ||||
|          | ||||
|         """Background task that adquires a first access token and then will | ||||
|         refresh the access token. | ||||
| 
 | ||||
|         https://docs.deribit.com/?python#authentication-2 | ||||
|         """ | ||||
|         access_scope = 'trade:read_write' | ||||
|         current_ts = time.time() | ||||
| 
 | ||||
|         if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts: | ||||
|             # if we are close to token expiry time | ||||
| 
 | ||||
|             params = { | ||||
|                 'grant_type': 'client_credentials', | ||||
|                 'client_id': self._key_id, | ||||
|                 'client_secret': self._key_secret, | ||||
|                 'scope': access_scope | ||||
|             } | ||||
| 
 | ||||
|             resp = await self.json_rpc('public/auth', params) | ||||
|             result = resp.result | ||||
| 
 | ||||
|             self._auth_ts = time.time() + result['expires_in'] | ||||
| 
 | ||||
|         return await self.json_rpc(*args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|     async def get_balances( | ||||
|         self, | ||||
|         kind: str = 'option' | ||||
|     ) -> dict[str, float]: | ||||
|         """Return the set of positions for this account | ||||
|         by symbol. | ||||
|         """ | ||||
|         balances = {} | ||||
| 
 | ||||
|         for currency in self.currencies: | ||||
|             resp = await self.json_rpc( | ||||
|             resp = await self._json_rpc_auth_wrapper( | ||||
|                 'private/get_positions', params={ | ||||
|                     'currency': currency.upper(), | ||||
|                     'kind': kind}) | ||||
|  | @ -244,20 +272,46 @@ class Client: | |||
| 
 | ||||
|         return balances | ||||
| 
 | ||||
|     async def get_assets(self) -> dict[str, float]: | ||||
|     async def get_assets( | ||||
|         self, | ||||
|         venue: str | None = None, | ||||
| 
 | ||||
|     ) -> dict[str, Asset]: | ||||
|         """Return the set of asset balances for this account | ||||
|         by symbol. | ||||
|         """ | ||||
|         balances = {} | ||||
|         assets = {} | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_currencies', | ||||
|             params={} | ||||
|         ) | ||||
|         currencies = resp.result | ||||
|         for currency in currencies: | ||||
|             name = currency['currency'] | ||||
|             tx_tick = digits_to_dec(currency['fee_precision'])  | ||||
|             atype='crypto_currency' | ||||
|             assets[name] = Asset( | ||||
|                 name=name, | ||||
|                 atype=atype, | ||||
|                 tx_tick=tx_tick) | ||||
| 
 | ||||
|         for currency in self.currencies: | ||||
|             resp = await self.json_rpc( | ||||
|                 'private/get_account_summary', params={ | ||||
|                     'currency': currency.upper()}) | ||||
|             instruments = await self.symbol_info(currency=name) | ||||
|             for instrument in instruments: | ||||
|                 pair = instruments[instrument] | ||||
|                 assets[pair.symbol] = Asset( | ||||
|                     name=pair.symbol, | ||||
|                     atype=pair.venue, | ||||
|                     tx_tick=pair.size_tick) | ||||
| 
 | ||||
|             balances[currency] = resp.result['balance'] | ||||
|         return assets  | ||||
| 
 | ||||
|         return balances | ||||
|     async def get_mkt_pairs(self) -> dict[str, Pair]: | ||||
|         flat: dict[str, Pair] = {} | ||||
|         for key in self._pairs: | ||||
|             item = self._pairs.get(key) | ||||
|             flat[item.bs_fqme] = item | ||||
| 
 | ||||
|         return flat | ||||
| 
 | ||||
|     async def submit_limit( | ||||
|         self, | ||||
|  | @ -274,7 +328,7 @@ class Client: | |||
|             'type': 'limit', | ||||
|             'price': price, | ||||
|         } | ||||
|         resp = await self.json_rpc( | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|             f'private/{action}', params) | ||||
| 
 | ||||
|         return resp.result | ||||
|  | @ -282,10 +336,32 @@ class Client: | |||
|     async def submit_cancel(self, oid: str): | ||||
|         """Send cancel request for order id | ||||
|         """ | ||||
|         resp = await self.json_rpc( | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|             'private/cancel', {'order_id': oid}) | ||||
|         return resp.result | ||||
| 
 | ||||
|     async def exch_info( | ||||
|         self, | ||||
|         sym: str | None = None, | ||||
| 
 | ||||
|         venue: MarketType = 'option', | ||||
|         expiry: str | None = None, | ||||
| 
 | ||||
|     ) -> dict[str, Pair] | Pair: | ||||
| 
 | ||||
|         pair_table: dict[str, Pair] = self._pairs | ||||
| 
 | ||||
|         if ( | ||||
|             sym | ||||
|             and (cached_pair := pair_table.get(sym)) | ||||
|         ): | ||||
|             return cached_pair | ||||
| 
 | ||||
|         if sym: | ||||
|             return pair_table[sym] | ||||
|         else: | ||||
|             return self._pairs | ||||
| 
 | ||||
|     async def symbol_info( | ||||
|         self, | ||||
|         instrument: Optional[str] = None, | ||||
|  | @ -293,7 +369,7 @@ class Client: | |||
|         kind: str = 'option', | ||||
|         expired: bool = False | ||||
| 
 | ||||
|     ) -> dict[str, dict]: | ||||
|     ) -> dict[str, Pair] | Pair: | ||||
|         ''' | ||||
|         Get symbol infos. | ||||
| 
 | ||||
|  | @ -308,28 +384,65 @@ class Client: | |||
|             'expired': str(expired).lower() | ||||
|         } | ||||
| 
 | ||||
|         resp: JSONRPCResult = await self.json_rpc( | ||||
|         resp: JSONRPCResult = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_instruments', | ||||
|             params, | ||||
|         ) | ||||
|         # convert to symbol-keyed table | ||||
|         pair_type: Type = PAIRTYPES[kind] | ||||
|         results: list[dict] | None = resp.result | ||||
|         instruments: dict[str, dict] = { | ||||
|             item['instrument_name'].lower(): item | ||||
|             for item in results | ||||
|         } | ||||
|          | ||||
|         instruments: dict[str, Pair] = {} | ||||
|         for item in results: | ||||
|             symbol=item['instrument_name'].lower() | ||||
|             try: | ||||
|                 pair: Pair = pair_type( | ||||
|                     symbol=symbol, | ||||
|                     **item | ||||
|                 ) | ||||
|             except Exception as e: | ||||
|                 e.add_note( | ||||
|                     "\nDon't panic, prolly stupid deribit changed their symbology schema again..\n" | ||||
|                     'Check out their API docs here:\n\n' | ||||
|                     'https://docs.deribit.com/?python#deribit-api-v2-1-1' | ||||
|                 ) | ||||
|                 raise | ||||
| 
 | ||||
|             instruments[symbol] = pair | ||||
| 
 | ||||
|         if instrument is not None: | ||||
|             return instruments[instrument] | ||||
|             return instruments[instrument.lower()] | ||||
|         else: | ||||
|             return instruments | ||||
| 
 | ||||
|     async def cache_symbols( | ||||
|         self, | ||||
|     ) -> dict: | ||||
|         venue: MarketType = 'option', | ||||
| 
 | ||||
|         if not self._pairs: | ||||
|             self._pairs = await self.symbol_info() | ||||
|     ) -> None: | ||||
|         # lookup internal mkt-specific pair table to update | ||||
|         pair_table: dict[str, Pair] = self._pairs | ||||
| 
 | ||||
|         # make API request(s) | ||||
|         mkt_pairs = await self.symbol_info() | ||||
| 
 | ||||
|         if not mkt_pairs: | ||||
|             raise SymbolNotFound(f'No market pairs found!?:\n{resp}') | ||||
| 
 | ||||
|         pairs_view_subtable: dict[str, Pair] = {} | ||||
| 
 | ||||
|         for instrument in mkt_pairs: | ||||
|             pair_type: Type = PAIRTYPES[venue] | ||||
| 
 | ||||
|             pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) | ||||
| 
 | ||||
|             pair_table[pair.symbol.upper()] = pair | ||||
| 
 | ||||
|             # update an additional top-level-cross-venue-table | ||||
|             # `._pairs: ChainMap` for search B0 | ||||
|             pairs_view_subtable[pair.bs_fqme] = pair | ||||
| 
 | ||||
|         self._pairs.maps.append(pairs_view_subtable) | ||||
| 
 | ||||
|         return self._pairs | ||||
| 
 | ||||
|  | @ -337,37 +450,35 @@ class Client: | |||
|         self, | ||||
|         pattern: str, | ||||
|         limit: int = 30, | ||||
|     ) -> dict[str, Any]: | ||||
|     ) -> dict[str, Pair]: | ||||
|         ''' | ||||
|         Fuzzy search symbology set for pairs matching `pattern`. | ||||
| 
 | ||||
|         ''' | ||||
|         pairs: dict[str, Any] = await self.symbol_info() | ||||
|         matches: dict[str, Pair] = match_from_pairs( | ||||
|         pairs: dict[str, Pair] = await self.exch_info() | ||||
| 
 | ||||
|         return match_from_pairs( | ||||
|             pairs=pairs, | ||||
|             query=pattern.upper(), | ||||
|             score_cutoff=35, | ||||
|             limit=limit | ||||
|         ) | ||||
| 
 | ||||
|        # repack in name-keyed table | ||||
|         return { | ||||
|             pair['instrument_name'].lower(): pair | ||||
|             for pair in matches.values() | ||||
|         } | ||||
| 
 | ||||
|     async def bars( | ||||
|         self, | ||||
|         symbol: str, | ||||
|         mkt: MktPair, | ||||
| 
 | ||||
|         start_dt: Optional[datetime] = None, | ||||
|         end_dt: Optional[datetime] = None, | ||||
| 
 | ||||
|         limit: int = 1000, | ||||
|         as_np: bool = True, | ||||
|     ) -> dict: | ||||
|         instrument = symbol | ||||
| 
 | ||||
|     ) -> list[tuple] | np.ndarray: | ||||
|         instrument: str = mkt.bs_fqme.split('.')[0] | ||||
| 
 | ||||
|         if end_dt is None: | ||||
|             end_dt = pendulum.now('UTC') | ||||
|             end_dt = now('UTC') | ||||
| 
 | ||||
|         if start_dt is None: | ||||
|             start_dt = end_dt.start_of( | ||||
|  | @ -377,7 +488,7 @@ class Client: | |||
|         end_time = deribit_timestamp(end_dt) | ||||
| 
 | ||||
|         # https://docs.deribit.com/#public-get_tradingview_chart_data | ||||
|         resp = await self.json_rpc( | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_tradingview_chart_data', | ||||
|             params={ | ||||
|                 'instrument_name': instrument.upper(), | ||||
|  | @ -387,36 +498,34 @@ class Client: | |||
|             }) | ||||
| 
 | ||||
|         result = KLinesResult(**resp.result) | ||||
|         new_bars = [] | ||||
|         new_bars: list[tuple] = [] | ||||
|         for i in range(len(result.close)): | ||||
| 
 | ||||
|             _open = result.open[i] | ||||
|             high = result.high[i] | ||||
|             low = result.low[i] | ||||
|             close = result.close[i] | ||||
|             volume = result.volume[i] | ||||
| 
 | ||||
|             row = [ | ||||
|             row = [  | ||||
|                 (start_time + (i * (60 * 1000))) / 1000.0,  # time | ||||
|                 result.open[i], | ||||
|                 result.high[i], | ||||
|                 result.low[i], | ||||
|                 result.close[i], | ||||
|                 result.volume[i], | ||||
|                 0 | ||||
|                 result.volume[i] | ||||
|             ] | ||||
| 
 | ||||
|             new_bars.append((i,) + tuple(row)) | ||||
| 
 | ||||
|         array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines | ||||
|         return array | ||||
|         if not as_np: | ||||
|             return result | ||||
| 
 | ||||
|         return np.array( | ||||
|             new_bars, | ||||
|             dtype=def_iohlcv_fields | ||||
|         ) | ||||
| 
 | ||||
|     async def last_trades( | ||||
|         self, | ||||
|         instrument: str, | ||||
|         count: int = 10 | ||||
|     ): | ||||
|         resp = await self.json_rpc( | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_last_trades_by_instrument', | ||||
|             params={ | ||||
|                 'instrument_name': instrument, | ||||
|  | @ -428,78 +537,17 @@ class Client: | |||
| 
 | ||||
| @acm | ||||
| async def get_client( | ||||
|     is_brokercheck: bool = False | ||||
|     is_brokercheck: bool = False, | ||||
|     venue: MarketType = 'option', | ||||
| ) -> Client: | ||||
| 
 | ||||
|     async with ( | ||||
|         trio.open_nursery() as n, | ||||
|         open_jsonrpc_session( | ||||
|             _testnet_ws_url, dtype=JSONRPCResult) as json_rpc | ||||
|             _ws_url, response_type=JSONRPCResult | ||||
|         ) as json_rpc | ||||
|     ): | ||||
|         client = Client(json_rpc) | ||||
| 
 | ||||
|         _refresh_token: Optional[str] = None | ||||
|         _access_token: Optional[str] = None | ||||
| 
 | ||||
|         async def _auth_loop( | ||||
|             task_status: TaskStatus = trio.TASK_STATUS_IGNORED | ||||
|         ): | ||||
|             """Background task that adquires a first access token and then will | ||||
|             refresh the access token while the nursery isn't cancelled. | ||||
| 
 | ||||
|             https://docs.deribit.com/?python#authentication-2 | ||||
|             """ | ||||
|             renew_time = 10 | ||||
|             access_scope = 'trade:read_write' | ||||
|             _expiry_time = time.time() | ||||
|             got_access = False | ||||
|             nonlocal _refresh_token | ||||
|             nonlocal _access_token | ||||
| 
 | ||||
|             while True: | ||||
|                 if time.time() - _expiry_time < renew_time: | ||||
|                     # if we are close to token expiry time | ||||
| 
 | ||||
|                     if _refresh_token != None: | ||||
|                         # if we have a refresh token already dont need to send | ||||
|                         # secret | ||||
|                         params = { | ||||
|                             'grant_type': 'refresh_token', | ||||
|                             'refresh_token': _refresh_token, | ||||
|                             'scope': access_scope | ||||
|                         } | ||||
| 
 | ||||
|                     else: | ||||
|                         # we don't have refresh token, send secret to initialize | ||||
|                         params = { | ||||
|                             'grant_type': 'client_credentials', | ||||
|                             'client_id': client._key_id, | ||||
|                             'client_secret': client._key_secret, | ||||
|                             'scope': access_scope | ||||
|                         } | ||||
| 
 | ||||
|                     resp = await json_rpc('public/auth', params) | ||||
|                     result = resp.result | ||||
| 
 | ||||
|                     _expiry_time = time.time() + result['expires_in'] | ||||
|                     _refresh_token = result['refresh_token'] | ||||
| 
 | ||||
|                     if 'access_token' in result: | ||||
|                         _access_token = result['access_token'] | ||||
| 
 | ||||
|                     if not got_access: | ||||
|                         # first time this loop runs we must indicate task is | ||||
|                         # started, we have auth | ||||
|                         got_access = True | ||||
|                         task_status.started() | ||||
| 
 | ||||
|                 else: | ||||
|                     await trio.sleep(renew_time / 2) | ||||
| 
 | ||||
|         # if we have client creds launch auth loop | ||||
|         if client._key_id is not None: | ||||
|             await n.start(_auth_loop) | ||||
| 
 | ||||
|         await client.cache_symbols() | ||||
|         yield client | ||||
|         n.cancel_scope.cancel() | ||||
|  | @ -523,7 +571,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: | |||
| 
 | ||||
| async def aio_price_feed_relay( | ||||
|     fh: FeedHandler, | ||||
|     instrument: Symbol, | ||||
|     instrument: str, | ||||
|     from_trio: asyncio.Queue, | ||||
|     to_trio: trio.abc.SendChannel, | ||||
| ) -> None: | ||||
|  | @ -542,21 +590,33 @@ async def aio_price_feed_relay( | |||
|             'symbol': cb_sym_to_deribit_inst( | ||||
|                 str_to_cb_sym(data.symbol)).lower(), | ||||
|             'ticks': [ | ||||
|                 {'type': 'bid', | ||||
|                     'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||
|                 {'type': 'bsize', | ||||
|                     'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||
|                 {'type': 'ask', | ||||
|                     'price': float(data.ask_price), 'size': float(data.ask_size)}, | ||||
|                 {'type': 'asize', | ||||
|                     'price': float(data.ask_price), 'size': float(data.ask_size)} | ||||
|                 { | ||||
|                     'type': 'bid', | ||||
|                     'price': float(data.bid_price), | ||||
|                     'size': float(data.bid_size) | ||||
|                 }, | ||||
|                 { | ||||
|                     'type': 'bsize', | ||||
|                     'price': float(data.bid_price), | ||||
|                     'size': float(data.bid_size) | ||||
|                 }, | ||||
|                 { | ||||
|                     'type': 'ask', | ||||
|                     'price': float(data.ask_price), | ||||
|                     'size': float(data.ask_size) | ||||
|                 }, | ||||
|                 { | ||||
|                     'type': 'asize', | ||||
|                     'price': float(data.ask_price), | ||||
|                     'size': float(data.ask_size) | ||||
|                 } | ||||
|             ] | ||||
|         })) | ||||
| 
 | ||||
|     sym: Symbol = piker_sym_to_cb_sym(instrument) | ||||
|     fh.add_feed( | ||||
|         DERIBIT, | ||||
|         channels=[TRADES, L1_BOOK], | ||||
|         symbols=[piker_sym_to_cb_sym(instrument)], | ||||
|         symbols=[sym], | ||||
|         callbacks={ | ||||
|             TRADES: _trade, | ||||
|             L1_BOOK: _l1 | ||||
|  | @ -597,9 +657,9 @@ async def maybe_open_price_feed( | |||
|     async with maybe_open_context( | ||||
|         acm_func=open_price_feed, | ||||
|         kwargs={ | ||||
|             'instrument': instrument | ||||
|             'instrument': instrument.split('.')[0] | ||||
|         }, | ||||
|         key=f'{instrument}-price', | ||||
|         key=f'{instrument.split('.')[0]}-price', | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|  | @ -664,10 +724,10 @@ async def maybe_open_order_feed( | |||
|     async with maybe_open_context( | ||||
|         acm_func=open_order_feed, | ||||
|         kwargs={ | ||||
|             'instrument': instrument, | ||||
|             'instrument': instrument.split('.')[0], | ||||
|             'fh': fh | ||||
|         }, | ||||
|         key=f'{instrument}-order', | ||||
|         key=f'{instrument.split('.')[0]}-order', | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|  |  | |||
|  | @ -21,18 +21,33 @@ Deribit backend. | |||
| from contextlib import asynccontextmanager as acm | ||||
| from datetime import datetime | ||||
| from typing import Any, Optional, Callable | ||||
| from pprint import pformat | ||||
| import time | ||||
| 
 | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| import pendulum | ||||
| from pendulum import ( | ||||
|     from_timestamp, | ||||
|     now, | ||||
| ) | ||||
| from rapidfuzz import process as fuzzy | ||||
| import numpy as np | ||||
| import tractor | ||||
| 
 | ||||
| from piker.brokers import open_cached_client | ||||
| from piker.accounting import ( | ||||
|     MktPair, | ||||
|     unpack_fqme, | ||||
| ) | ||||
| from piker.brokers import ( | ||||
|     open_cached_client, | ||||
|     NoData, | ||||
| ) | ||||
| from piker._cacheables import ( | ||||
|     async_lifo_cache, | ||||
| ) | ||||
| from piker.log import get_logger, get_console_log | ||||
| from piker.data import ShmArray | ||||
| from piker.data.validate import FeedInit | ||||
| from piker.brokers._util import ( | ||||
|     BrokerError, | ||||
|     DataUnavailable, | ||||
|  | @ -47,9 +62,13 @@ from cryptofeed.symbols import Symbol | |||
| from .api import ( | ||||
|     Client, Trade, | ||||
|     get_config, | ||||
|     str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, | ||||
|     piker_sym_to_cb_sym, cb_sym_to_deribit_inst, | ||||
|     maybe_open_price_feed | ||||
| ) | ||||
| from .venues import ( | ||||
|     Pair, | ||||
|     OptionPair, | ||||
| ) | ||||
| 
 | ||||
| _spawn_kwargs = { | ||||
|     'infect_asyncio': True, | ||||
|  | @ -64,36 +83,107 @@ async def open_history_client( | |||
|     mkt: MktPair, | ||||
| ) -> tuple[Callable, int]: | ||||
| 
 | ||||
|     fnstrument: str = mkt.bs_fqme | ||||
|     # TODO implement history getter for the new storage layer. | ||||
|     async with open_cached_client('deribit') as client: | ||||
| 
 | ||||
|         async def get_ohlc( | ||||
|             end_dt: Optional[datetime] = None, | ||||
|             start_dt: Optional[datetime] = None, | ||||
|             timeframe: float, | ||||
|             end_dt: datetime | None = None, | ||||
|             start_dt: datetime | None = None, | ||||
| 
 | ||||
|         ) -> tuple[ | ||||
|             np.ndarray, | ||||
|             datetime,  # start | ||||
|             datetime,  # end | ||||
|         ]: | ||||
|             if timeframe != 60: | ||||
|                 raise DataUnavailable('Only 1m bars are supported') | ||||
| 
 | ||||
|             array = await client.bars( | ||||
|                 instrument, | ||||
|             array: np.ndarray = await client.bars( | ||||
|                 mkt, | ||||
|                 start_dt=start_dt, | ||||
|                 end_dt=end_dt, | ||||
|             ) | ||||
|             if len(array) == 0: | ||||
|                 raise DataUnavailable | ||||
|                 raise NoData( | ||||
|                     f'No frame for {start_dt} -> {end_dt}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             start_dt = pendulum.from_timestamp(array[0]['time']) | ||||
|             end_dt = pendulum.from_timestamp(array[-1]['time']) | ||||
|             start_dt = from_timestamp(array[0]['time']) | ||||
|             end_dt = from_timestamp(array[-1]['time']) | ||||
| 
 | ||||
|             times = array['time'] | ||||
|             if not times.any(): | ||||
|                 raise ValueError( | ||||
|                     'Bad frame with null-times?\n\n' | ||||
|                     f'{times}' | ||||
|                 ) | ||||
| 
 | ||||
|             if end_dt is None: | ||||
|                 inow: int = round(time.time()) | ||||
|                 if (inow - times[-1]) > 60: | ||||
|                     await tractor.pause() | ||||
| 
 | ||||
|             return array, start_dt, end_dt | ||||
| 
 | ||||
|         yield get_ohlc, {'erlangs': 3, 'rate': 3} | ||||
| 
 | ||||
| 
 | ||||
| @async_lifo_cache() | ||||
| async def get_mkt_info( | ||||
|     fqme: str, | ||||
| 
 | ||||
| ) -> tuple[MktPair, Pair] | None: | ||||
| 
 | ||||
|     # uppercase since kraken bs_mktid is always upper | ||||
|     if 'deribit' not in fqme.lower(): | ||||
|         fqme += '.deribit' | ||||
| 
 | ||||
|     mkt_mode: str = '' | ||||
|     broker, mkt_ep, venue, expiry = unpack_fqme(fqme) | ||||
| 
 | ||||
|     # NOTE: we always upper case all tokens to be consistent with | ||||
|     # binance's symbology style for pairs, like `BTCUSDT`, but in | ||||
|     # theory we could also just keep things lower case; as long as | ||||
|     # we're consistent and the symcache matches whatever this func | ||||
|     # returns, always! | ||||
|     expiry: str = expiry.upper() | ||||
|     venue: str = venue.upper() | ||||
|     venue_lower: str = venue.lower() | ||||
| 
 | ||||
|     mkt_mode: str = 'option' | ||||
| 
 | ||||
|     async with open_cached_client( | ||||
|         'deribit', | ||||
|     ) as client: | ||||
| 
 | ||||
|         assets: dict[str, Asset] = await client.get_assets() | ||||
|         pair_str: str = mkt_ep.lower() | ||||
| 
 | ||||
|         pair: Pair = await client.exch_info( | ||||
|             sym=pair_str, | ||||
|         ) | ||||
|         mkt_mode = pair.venue | ||||
|         client.mkt_mode = mkt_mode | ||||
| 
 | ||||
|         dst: Asset | None = assets.get(pair.bs_dst_asset) | ||||
|         src: Asset | None = assets.get(pair.bs_src_asset) | ||||
| 
 | ||||
|         mkt = MktPair( | ||||
|             dst=dst, | ||||
|             src=src, | ||||
|             price_tick=pair.price_tick, | ||||
|             size_tick=pair.size_tick, | ||||
|             bs_mktid=pair.symbol, | ||||
|             expiry=pair.expiry, | ||||
|             venue=mkt_mode, | ||||
|             broker='deribit', | ||||
|             _atype=mkt_mode, | ||||
|             _fqme_without_src=True, | ||||
|         ) | ||||
|         return mkt, pair | ||||
| 
 | ||||
| 
 | ||||
| async def stream_quotes( | ||||
| 
 | ||||
|     send_chan: trio.abc.SendChannel, | ||||
|  | @ -108,31 +198,26 @@ async def stream_quotes( | |||
|     # XXX: required to propagate ``tractor`` loglevel to piker logging | ||||
|     get_console_log(loglevel or tractor.current_actor().loglevel) | ||||
| 
 | ||||
|     sym = symbols[0] | ||||
|     sym = symbols[0].split('.')[0] | ||||
| 
 | ||||
|     init_msgs: list[FeedInit] = [] | ||||
| 
 | ||||
|     async with ( | ||||
|         open_cached_client('deribit') as client, | ||||
|         send_chan as send_chan | ||||
|     ): | ||||
| 
 | ||||
|         init_msgs = { | ||||
|             # pass back token, and bool, signalling if we're the writer | ||||
|             # and that history has been written | ||||
|             sym: { | ||||
|                 'symbol_info': { | ||||
|                     'asset_type': 'option', | ||||
|                     'price_tick_size': 0.0005 | ||||
|                 }, | ||||
|                 'shm_write_opts': {'sum_tick_vml': False}, | ||||
|                 'fqsn': sym, | ||||
|             }, | ||||
|         } | ||||
|         mkt, pair = await get_mkt_info(sym) | ||||
| 
 | ||||
|         # build out init msgs according to latest spec | ||||
|         init_msgs.append( | ||||
|             FeedInit(mkt_info=mkt) | ||||
|         ) | ||||
|         nsym = piker_sym_to_cb_sym(sym) | ||||
| 
 | ||||
|         async with maybe_open_price_feed(sym) as stream: | ||||
| 
 | ||||
|             cache = await client.cache_symbols() | ||||
|             cache = client._pairs | ||||
| 
 | ||||
|             last_trades = (await client.last_trades( | ||||
|                 cb_sym_to_deribit_inst(nsym), count=1)).trades | ||||
|  | @ -174,12 +259,21 @@ async def open_symbol_search( | |||
|     async with open_cached_client('deribit') as client: | ||||
| 
 | ||||
|         # load all symbols locally for fast search | ||||
|         cache = await client.cache_symbols() | ||||
|         cache = client._pairs | ||||
|         await ctx.started() | ||||
| 
 | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             pattern: str | ||||
|             async for pattern in stream: | ||||
|                 # repack in dict form | ||||
|                 await stream.send( | ||||
|                     await client.search_symbols(pattern)) | ||||
|                 # NOTE: pattern fuzzy-matching is done within | ||||
|                 # the methd impl. | ||||
|                 pairs: dict[str, Pair] = await client.search_symbols( | ||||
|                     pattern, | ||||
|                 ) | ||||
|                 # repack in fqme-keyed table | ||||
|                 byfqme: dict[str, Pair] = {} | ||||
|                 for pair in pairs.values(): | ||||
|                     byfqme[pair.bs_fqme] = pair | ||||
| 
 | ||||
|                 await stream.send(byfqme) | ||||
|  |  | |||
|  | @ -0,0 +1,191 @@ | |||
| # piker: trading gear for hackers | ||||
| # Copyright (C) Tyler Goodlet (in stewardship for pikers) | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| """ | ||||
| Per market data-type definitions and schemas types. | ||||
| 
 | ||||
| """ | ||||
| from __future__ import annotations | ||||
| import pendulum | ||||
| from typing import ( | ||||
|     Literal, | ||||
| ) | ||||
| from decimal import Decimal | ||||
| 
 | ||||
| from msgspec import field | ||||
| 
 | ||||
| from piker.types import Struct | ||||
| 
 | ||||
| 
 | ||||
| # API endpoint paths by venue / sub-API | ||||
| _domain: str = 'deribit.com' | ||||
| _url = f'https://www.{_domain}' | ||||
| 
 | ||||
| # WEBsocketz | ||||
| _ws_url: str = f'wss://www.{_domain}/ws/api/v2' | ||||
| 
 | ||||
| # test nets | ||||
| _testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2' | ||||
| 
 | ||||
| MarketType = Literal[ | ||||
|     'option' | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| def get_api_eps(venue: MarketType) -> tuple[str, str]: | ||||
|     ''' | ||||
|     Return API ep root paths per venue. | ||||
| 
 | ||||
|     ''' | ||||
|     return { | ||||
|         'option': ( | ||||
|             _ws_url, | ||||
|         ), | ||||
|     }[venue] | ||||
| 
 | ||||
| 
 | ||||
| class Pair(Struct, frozen=True, kw_only=True): | ||||
| 
 | ||||
|     symbol: str | ||||
| 
 | ||||
|     # src | ||||
|     quote_currency: str # 'BTC' | ||||
| 
 | ||||
|     # dst | ||||
|     base_currency: str # "BTC", | ||||
| 
 | ||||
|     tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}] | ||||
|     tick_size_steps: list[dict[str, float]]  | ||||
| 
 | ||||
|     @property | ||||
|     def price_tick(self) -> Decimal: | ||||
|         return Decimal(str(self.tick_size_steps[0]['above_price'])) | ||||
| 
 | ||||
|     @property | ||||
|     def size_tick(self) -> Decimal: | ||||
|         return Decimal(str(self.tick_size)) | ||||
| 
 | ||||
|     @property | ||||
|     def bs_fqme(self) -> str: | ||||
|         return f'{self.symbol}' | ||||
| 
 | ||||
|     @property | ||||
|     def bs_mktid(self) -> str: | ||||
|         return f'{self.symbol}.{self.venue}' | ||||
| 
 | ||||
| 
 | ||||
| class OptionPair(Pair, frozen=True): | ||||
| 
 | ||||
|     taker_commission: float # 0.0003 | ||||
|     strike: float # 5000.0 | ||||
|     settlement_period: str # 'day' | ||||
|     settlement_currency: str # "BTC", | ||||
|     rfq: bool # false | ||||
|     price_index: str # 'btc_usd' | ||||
|     option_type: str # 'call' | ||||
|     min_trade_amount: float # 0.1 | ||||
|     maker_commission: float # 0.0003 | ||||
|     kind: str # 'option' | ||||
|     is_active: bool # true | ||||
|     instrument_type: str # 'reversed' | ||||
|     instrument_name: str # 'BTC-1SEP24-55000-C' | ||||
|     instrument_id: int # 364671 | ||||
|     expiration_timestamp: int # 1725177600000 | ||||
|     creation_timestamp: int # 1724918461000 | ||||
|     counter_currency: str # 'USD'  | ||||
|     contract_size: float # '1.0' | ||||
|     block_trade_tick_size: float # '0.0001' | ||||
|     block_trade_min_trade_amount: int # '25' | ||||
|     block_trade_commission: float # '0.003' | ||||
| 
 | ||||
| 
 | ||||
|     # NOTE: see `.data._symcache.SymbologyCache.load()` for why | ||||
|     ns_path: str = 'piker.brokers.deribit:OptionPair' | ||||
| 
 | ||||
|     @property | ||||
|     def expiry(self) -> str: | ||||
|         iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat() | ||||
|         return iso_date  | ||||
| 
 | ||||
|     @property | ||||
|     def venue(self) -> str: | ||||
|         return 'option' | ||||
| 
 | ||||
|     @property | ||||
|     def bs_fqme(self) -> str: | ||||
|         return f'{self.symbol}' | ||||
| 
 | ||||
|     @property | ||||
|     def bs_src_asset(self) -> str: | ||||
|         return f'{self.quote_currency}' | ||||
| 
 | ||||
|     @property | ||||
|     def bs_dst_asset(self) -> str: | ||||
|         return f'{self.symbol}' | ||||
| 
 | ||||
| 
 | ||||
| PAIRTYPES: dict[MarketType, Pair] = { | ||||
|     'option': OptionPair, | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| class JSONRPCResult(Struct): | ||||
|     id: int | ||||
|     usIn: int | ||||
|     usOut: int | ||||
|     usDiff: int | ||||
|     testnet: bool | ||||
|     jsonrpc: str = '2.0' | ||||
|     error: Optional[dict] = None | ||||
|     result: Optional[list[dict]] = None | ||||
| 
 | ||||
| class JSONRPCChannel(Struct): | ||||
|     method: str | ||||
|     params: dict | ||||
|     jsonrpc: str = '2.0' | ||||
| 
 | ||||
| 
 | ||||
| class KLinesResult(Struct): | ||||
|     low: list[float] | ||||
|     cost: list[float] | ||||
|     high: list[float] | ||||
|     open: list[float] | ||||
|     close: list[float] | ||||
|     ticks: list[int] | ||||
|     status: str | ||||
|     volume: list[float] | ||||
| 
 | ||||
| class Trade(Struct): | ||||
|     iv: float | ||||
|     price: float | ||||
|     amount: float | ||||
|     trade_id: str | ||||
|     contracts: float | ||||
|     direction: str | ||||
|     trade_seq: int | ||||
|     timestamp: int | ||||
|     mark_price: float | ||||
|     index_price: float | ||||
|     tick_direction: int | ||||
|     instrument_name: str | ||||
|     combo_id: Optional[str] = '', | ||||
|     combo_trade_id: Optional[int] = 0, | ||||
|     block_trade_id: Optional[str] = '', | ||||
|     block_trade_leg_count: Optional[int] = 0, | ||||
| 
 | ||||
| class LastTradesResult(Struct): | ||||
|     trades: list[Trade] | ||||
|     has_more: bool | ||||
|  | @ -111,6 +111,10 @@ class KucoinMktPair(Struct, frozen=True): | |||
|     quoteMaxSize: float | ||||
|     quoteMinSize: float | ||||
|     symbol: str  # our bs_mktid, kucoin's internal id | ||||
|     feeCategory: int | ||||
|     makerFeeCoefficient: float | ||||
|     takerFeeCoefficient: float | ||||
|     st: bool | ||||
| 
 | ||||
| 
 | ||||
| class AccountTrade(Struct, frozen=True): | ||||
|  | @ -593,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]: | |||
|     ''' | ||||
|     async with ( | ||||
|         httpx.AsyncClient( | ||||
|             base_url=f'https://api.kucoin.com/api', | ||||
|             base_url='https://api.kucoin.com/api', | ||||
|         ) as trio_client, | ||||
|     ): | ||||
|         client = Client(httpx_client=trio_client) | ||||
|  | @ -637,7 +641,7 @@ async def open_ping_task( | |||
|                 await trio.sleep((ping_interval - 1000) / 1000) | ||||
|                 await ws.send_msg({'id': connect_id, 'type': 'ping'}) | ||||
| 
 | ||||
|         log.info('Starting ping task for kucoin ws connection') | ||||
|         log.warning('Starting ping task for kucoin ws connection') | ||||
|         n.start_soon(ping_server) | ||||
| 
 | ||||
|         yield | ||||
|  | @ -649,9 +653,14 @@ async def open_ping_task( | |||
| async def get_mkt_info( | ||||
|     fqme: str, | ||||
| 
 | ||||
| ) -> tuple[MktPair, KucoinMktPair]: | ||||
| ) -> tuple[ | ||||
|     MktPair, | ||||
|     KucoinMktPair, | ||||
| ]: | ||||
|     ''' | ||||
|     Query for and return a `MktPair` and `KucoinMktPair`. | ||||
|     Query for and return both a `piker.accounting.MktPair` and | ||||
|     `KucoinMktPair` from provided `fqme: str` | ||||
|     (fully-qualified-market-endpoint). | ||||
| 
 | ||||
|     ''' | ||||
|     async with open_cached_client('kucoin') as client: | ||||
|  | @ -726,6 +735,8 @@ async def stream_quotes( | |||
| 
 | ||||
|         log.info(f'Starting up quote stream(s) for {symbols}') | ||||
|         for sym_str in symbols: | ||||
|             mkt: MktPair | ||||
|             pair: KucoinMktPair | ||||
|             mkt, pair = await get_mkt_info(sym_str) | ||||
|             init_msgs.append( | ||||
|                 FeedInit(mkt_info=mkt) | ||||
|  | @ -733,7 +744,11 @@ async def stream_quotes( | |||
| 
 | ||||
|         ws: NoBsWs | ||||
|         token, ping_interval = await client._get_ws_token() | ||||
|         connect_id = str(uuid4()) | ||||
|         log.info('API reported ping_interval: {ping_interval}\n') | ||||
| 
 | ||||
|         connect_id: str = str(uuid4()) | ||||
|         typ: str | ||||
|         quote: dict | ||||
|         async with ( | ||||
|             open_autorecon_ws( | ||||
|                 ( | ||||
|  | @ -747,20 +762,37 @@ async def stream_quotes( | |||
|                 ), | ||||
|             ) as ws, | ||||
|             open_ping_task(ws, ping_interval, connect_id), | ||||
|             aclosing(stream_messages(ws, sym_str)) as msg_gen, | ||||
|             aclosing( | ||||
|                 iter_normed_quotes( | ||||
|                     ws, sym_str | ||||
|                 ) | ||||
|             ) as iter_quotes, | ||||
|         ): | ||||
|             typ, quote = await anext(msg_gen) | ||||
|             typ, quote = await anext(iter_quotes) | ||||
| 
 | ||||
|             while typ != 'trade': | ||||
|                 # take care to not unblock here until we get a real | ||||
|                 # trade quote | ||||
|                 typ, quote = await anext(msg_gen) | ||||
|             # take care to not unblock here until we get a real | ||||
|             # trade quote? | ||||
|             # ^TODO, remove this right? | ||||
|             # -[ ] what often blocks chart boot/new-feed switching | ||||
|             #   since we'ere waiting for a live quote instead of just | ||||
|             #   loading history afap.. | ||||
|             #  |_ XXX, not sure if we require a bit of rework to core | ||||
|             #    feed init logic or if backends justg gotta be | ||||
|             #    changed up.. feel like there was some causality | ||||
|             #    dilema prolly only seen with IB too.. | ||||
|             # while typ != 'trade': | ||||
|             #     typ, quote = await anext(iter_quotes) | ||||
| 
 | ||||
|             task_status.started((init_msgs, quote)) | ||||
|             feed_is_live.set() | ||||
| 
 | ||||
|             async for typ, msg in msg_gen: | ||||
|                 await send_chan.send({sym_str: msg}) | ||||
|             # XXX NOTE, DO NOT include the `.<backend>` suffix! | ||||
|             # OW the sampling loop will not broadcast correctly.. | ||||
|             # since `bus._subscribers.setdefault(bs_fqme, set())` | ||||
|             # is used inside `.data.open_feed_bus()` !!! | ||||
|             topic: str = mkt.bs_fqme | ||||
|             async for typ, quote in iter_quotes: | ||||
|                 await send_chan.send({topic: quote}) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  | @ -815,7 +847,7 @@ async def subscribe( | |||
|             ) | ||||
| 
 | ||||
| 
 | ||||
| async def stream_messages( | ||||
| async def iter_normed_quotes( | ||||
|     ws: NoBsWs, | ||||
|     sym: str, | ||||
| 
 | ||||
|  | @ -846,6 +878,9 @@ async def stream_messages( | |||
| 
 | ||||
|                 yield 'trade', { | ||||
|                     'symbol': sym, | ||||
|                     # TODO, is 'last' even used elsewhere/a-good | ||||
|                     # semantic? can't we just read the ticks with our | ||||
|                     # .data.ticktools.frame_ticks()`/ | ||||
|                     'last': trade_data.price, | ||||
|                     'brokerd_ts': last_trade_ts, | ||||
|                     'ticks': [ | ||||
|  | @ -938,7 +973,7 @@ async def open_history_client( | |||
|             if end_dt is None: | ||||
|                 inow = round(time.time()) | ||||
| 
 | ||||
|                 print( | ||||
|                 log.debug( | ||||
|                     f'difference in time between load and processing' | ||||
|                     f'{inow - times[-1]}' | ||||
|                 ) | ||||
|  |  | |||
|  | @ -653,7 +653,11 @@ class Router(Struct): | |||
|             flume = feed.flumes[fqme] | ||||
|             first_quote: dict = flume.first_quote | ||||
|             book: DarkBook = self.get_dark_book(broker) | ||||
|             book.lasts[fqme]: float = float(first_quote['last']) | ||||
| 
 | ||||
|             if not (last := first_quote.get('last')): | ||||
|                 last: float = flume.rt_shm.array[-1]['close'] | ||||
| 
 | ||||
|             book.lasts[fqme]: float = float(last) | ||||
| 
 | ||||
|             async with self.maybe_open_brokerd_dialog( | ||||
|                 brokermod=brokermod, | ||||
|  | @ -716,7 +720,7 @@ class Router(Struct): | |||
|             subs = self.subscribers[sub_key] | ||||
| 
 | ||||
|         sent_some: bool = False | ||||
|         for client_stream in subs: | ||||
|         for client_stream in subs.copy(): | ||||
|             try: | ||||
|                 await client_stream.send(msg) | ||||
|                 sent_some = True | ||||
|  | @ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events( | |||
|                 status_msg.brokerd_msg = msg | ||||
|                 status_msg.src = msg.broker_details['name'] | ||||
| 
 | ||||
|                 await router.client_broadcast( | ||||
|                     status_msg.req.symbol, | ||||
|                     status_msg, | ||||
|                 ) | ||||
|                 if not status_msg.req: | ||||
|                     # likely some order change state? | ||||
|                     await tractor.pause() | ||||
|                 else: | ||||
|                     await router.client_broadcast( | ||||
|                         status_msg.req.symbol, | ||||
|                         status_msg, | ||||
|                     ) | ||||
| 
 | ||||
|                 if status == 'closed': | ||||
|                     log.info(f'Execution for {oid} is complete!') | ||||
|  |  | |||
|  | @ -273,7 +273,7 @@ async def _reconnect_forever( | |||
|                 nobsws._connected.set() | ||||
|                 await trio.sleep_forever() | ||||
|         except HandshakeError: | ||||
|             log.exception(f'Retrying connection') | ||||
|             log.exception('Retrying connection') | ||||
| 
 | ||||
|         # ws & nursery block ends | ||||
| 
 | ||||
|  | @ -359,8 +359,8 @@ async def open_autorecon_ws( | |||
| 
 | ||||
| 
 | ||||
| ''' | ||||
| JSONRPC response-request style machinery for transparent multiplexing of msgs | ||||
| over a NoBsWs. | ||||
| JSONRPC response-request style machinery for transparent multiplexing | ||||
| of msgs over a NoBsWs. | ||||
| 
 | ||||
| ''' | ||||
| 
 | ||||
|  | @ -377,16 +377,20 @@ async def open_jsonrpc_session( | |||
|     url: str, | ||||
|     start_id: int = 0, | ||||
|     response_type: type = JSONRPCResult, | ||||
|     request_type: Optional[type] = None, | ||||
|     request_hook: Optional[Callable] = None, | ||||
|     error_hook: Optional[Callable] = None, | ||||
|     # request_type: Optional[type] = None, | ||||
|     # request_hook: Optional[Callable] = None, | ||||
|     # error_hook: Optional[Callable] = None, | ||||
| ) -> Callable[[str, dict], dict]: | ||||
| 
 | ||||
|     # NOTE, store all request msgs so we can raise errors on the | ||||
|     # caller side! | ||||
|     req_msgs: dict[int, dict] = {} | ||||
| 
 | ||||
|     async with ( | ||||
|         trio.open_nursery() as n, | ||||
|         open_autorecon_ws(url) as ws | ||||
|     ): | ||||
|         rpc_id: Iterable = count(start_id) | ||||
|         rpc_id: Iterable[int] = count(start_id) | ||||
|         rpc_results: dict[int, dict] = {} | ||||
| 
 | ||||
|         async def json_rpc(method: str, params: dict) -> dict: | ||||
|  | @ -394,26 +398,40 @@ async def open_jsonrpc_session( | |||
|             perform a json rpc call and wait for the result, raise exception in | ||||
|             case of error field present on response | ||||
|             ''' | ||||
|             nonlocal req_msgs | ||||
| 
 | ||||
|             req_id: int = next(rpc_id) | ||||
|             msg = { | ||||
|                 'jsonrpc': '2.0', | ||||
|                 'id': next(rpc_id), | ||||
|                 'id': req_id, | ||||
|                 'method': method, | ||||
|                 'params': params | ||||
|             } | ||||
|             _id = msg['id'] | ||||
| 
 | ||||
|             rpc_results[_id] = { | ||||
|             result = rpc_results[_id] = { | ||||
|                 'result': None, | ||||
|                 'event': trio.Event() | ||||
|                 'error': None, | ||||
|                 'event': trio.Event(),  # signal caller resp arrived | ||||
|             } | ||||
|             req_msgs[_id] = msg | ||||
| 
 | ||||
|             await ws.send_msg(msg) | ||||
| 
 | ||||
|             # wait for reponse before unblocking requester code | ||||
|             await rpc_results[_id]['event'].wait() | ||||
| 
 | ||||
|             ret = rpc_results[_id]['result'] | ||||
|             if (maybe_result := result['result']): | ||||
|                 ret = maybe_result | ||||
|                 del rpc_results[_id] | ||||
| 
 | ||||
|             del rpc_results[_id] | ||||
|             else: | ||||
|                 err = result['error'] | ||||
|                 raise Exception( | ||||
|                     f'JSONRPC request failed\n' | ||||
|                     f'req: {msg}\n' | ||||
|                     f'resp: {err}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             if ret.error is not None: | ||||
|                 raise Exception(json.dumps(ret.error, indent=4)) | ||||
|  | @ -428,6 +446,7 @@ async def open_jsonrpc_session( | |||
|             the server side. | ||||
| 
 | ||||
|             ''' | ||||
|             nonlocal req_msgs | ||||
|             async for msg in ws: | ||||
|                 match msg: | ||||
|                     case { | ||||
|  | @ -451,15 +470,29 @@ async def open_jsonrpc_session( | |||
|                         'params': _, | ||||
|                     }: | ||||
|                         log.debug(f'Recieved\n{msg}') | ||||
|                         if request_hook: | ||||
|                             await request_hook(request_type(**msg)) | ||||
|                         # if request_hook: | ||||
|                         #     await request_hook(request_type(**msg)) | ||||
| 
 | ||||
|                     case { | ||||
|                         'error': error | ||||
|                     }: | ||||
|                         log.warning(f'Recieved\n{error}') | ||||
|                         if error_hook: | ||||
|                             await error_hook(response_type(**msg)) | ||||
|                         # if error_hook: | ||||
|                         #     await error_hook(response_type(**msg)) | ||||
| 
 | ||||
|                         # retreive orig request msg, set error | ||||
|                         # response in original "result" msg, | ||||
|                         # THEN FINALLY set the event to signal caller | ||||
|                         # to raise the error in the parent task. | ||||
|                         req_id: int = msg['id'] | ||||
|                         req_msg: dict = req_msgs[req_id] | ||||
|                         result: dict = rpc_results[req_id] | ||||
|                         result['error'] = error | ||||
|                         result['event'].set() | ||||
|                         log.error( | ||||
|                             f'JSONRPC request failed\n' | ||||
|                             f'req: {req_msg}\n' | ||||
|                             f'resp: {error}\n' | ||||
|                         ) | ||||
| 
 | ||||
|                     case _: | ||||
|                         log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') | ||||
|  |  | |||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							|  | @ -71,6 +71,8 @@ pdbp = "^1.5.0" | |||
| trio = "^0.24" | ||||
| pendulum = "^3.0.0" | ||||
| httpx = "^0.27.0" | ||||
| cryptofeed = "^2.4.0" | ||||
| pyarrow = "^17.0.0" | ||||
| 
 | ||||
| [tool.poetry.dependencies.tractor] | ||||
| develop = true | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue