Compare commits
	
		
			14 Commits 
		
	
	
		
			d475347e53
			...
			3d00d27a12
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 3d00d27a12 | |
|  | d545641d06 | |
|  | 54d4d9fd56 | |
|  | 5fc56cff98 | |
|  | f04d2d27fd | |
|  | 1d19f01b79 | |
|  | bceab54adb | |
|  | 84b1189e1a | |
|  | 1e16642c12 | |
|  | 7eb779a682 | |
|  | a82f2fa578 | |
|  | 193f5c1ae9 | |
|  | 8901360695 | |
|  | a14b599835 | 
|  | @ -28,6 +28,8 @@ from decimal import ( | |||
|     Decimal, | ||||
| ) | ||||
| from functools import partial | ||||
| from pathlib import Path | ||||
| from pprint import pformat | ||||
| import time | ||||
| from typing import ( | ||||
|     Any, | ||||
|  | @ -37,8 +39,6 @@ from typing import ( | |||
| 
 | ||||
| from pendulum import now | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| from rapidfuzz import process as fuzzy | ||||
| import numpy as np | ||||
| from tractor.trionics import ( | ||||
|     broadcast_receiver, | ||||
|  | @ -52,11 +52,16 @@ from cryptofeed import FeedHandler | |||
| from cryptofeed.defines import ( | ||||
|     DERIBIT, | ||||
|     L1_BOOK, TRADES, | ||||
|     OPTION, CALL, PUT | ||||
|     OPTION, CALL, PUT, | ||||
|     OPEN_INTEREST, | ||||
| ) | ||||
| from cryptofeed.symbols import Symbol | ||||
| # types for managing the cb callbacks. | ||||
| # from cryptofeed.types import L1Book | ||||
| from cryptofeed.types import ( | ||||
|     L1Book, | ||||
|     Trade, | ||||
|     OpenInterest, | ||||
| ) | ||||
| from piker.brokers import SymbolNotFound | ||||
| from .venues import ( | ||||
|     _ws_url, | ||||
|     MarketType, | ||||
|  | @ -64,9 +69,7 @@ from .venues import ( | |||
|     Pair, | ||||
|     OptionPair, | ||||
|     JSONRPCResult, | ||||
|     JSONRPCChannel, | ||||
|     KLinesResult, | ||||
|     Trade, | ||||
|     LastTradesResult, | ||||
| ) | ||||
| from piker.accounting import ( | ||||
|  | @ -77,7 +80,7 @@ from piker.accounting import ( | |||
| from piker.data import ( | ||||
|     def_iohlcv_fields, | ||||
|     match_from_pairs, | ||||
|     Struct, | ||||
|     # Struct, | ||||
| ) | ||||
| from piker.data._web_bs import ( | ||||
|     open_jsonrpc_session | ||||
|  | @ -96,9 +99,21 @@ _spawn_kwargs = { | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| # convert datetime obj timestamp to unixtime in milliseconds | ||||
| def deribit_timestamp(when): | ||||
|     return int((when.timestamp() * 1000) + (when.microsecond / 1000)) | ||||
| def deribit_timestamp(when: datetime) -> int: | ||||
|     ''' | ||||
|     Convert conventional epoch timestamp, in secs, to unixtime in | ||||
|     milliseconds. | ||||
| 
 | ||||
|     ''' | ||||
|     return int( | ||||
|         (when.timestamp() * 1000) | ||||
|         + | ||||
|         (when.microsecond / 1000) | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def get_timestamp_int(expiry_date: str) -> int: | ||||
|     return int(time.mktime(time.strptime(expiry_date, '%d%b%y'))) | ||||
| 
 | ||||
| 
 | ||||
| def str_to_cb_sym(name: str) -> Symbol: | ||||
|  | @ -107,32 +122,40 @@ def str_to_cb_sym(name: str) -> Symbol: | |||
|     quote = base | ||||
| 
 | ||||
|     if option_type == 'put': | ||||
|         option_type = PUT  | ||||
|     elif option_type  == 'call': | ||||
|         option_type = PUT | ||||
|     elif option_type == 'call': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
| 
 | ||||
|     new_expiry_date = get_values_from_cb_normalized_date(expiry_date) | ||||
| 
 | ||||
|     new_expiry_date: int = get_timestamp_int( | ||||
|         get_values_from_cb_normalized_date(expiry_date) | ||||
|     ) | ||||
|     return Symbol( | ||||
|         base=base, | ||||
|         quote=quote, | ||||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=new_expiry_date) | ||||
|         expiry_date=new_expiry_date | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def piker_sym_to_cb_sym(name: str) -> Symbol: | ||||
|     base, expiry_date, strike_price, option_type = tuple( | ||||
|     ( | ||||
|         base, | ||||
|         expiry_date, | ||||
|         strike_price, | ||||
|         option_type, | ||||
|     )= tuple( | ||||
|         name.upper().split('-')) | ||||
| 
 | ||||
|     quote = base | ||||
|     new_expiry_date = get_timestamp_int(expiry_date) | ||||
|     quote: str = base | ||||
| 
 | ||||
|     if option_type == 'P': | ||||
|         option_type = PUT  | ||||
|     elif option_type == 'C': | ||||
|     if option_type == 'P' or option_type == 'PUT': | ||||
|         option_type = PUT | ||||
|     elif option_type == 'C' or option_type == 'CALL': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
|  | @ -143,14 +166,32 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: | |||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=expiry_date) | ||||
|         expiry_date=new_expiry_date | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def cb_sym_to_deribit_inst(sym: Symbol): | ||||
| # TODO, instead can't we just lookup the `MktPair` directly | ||||
| # and pass it upward to `stream_quotes()`?? | ||||
| def cb_sym_to_deribit_inst(sym: Symbol) -> str: | ||||
|     ''' | ||||
|     Generate our own internal `str`-repr for a `cryptofeed.Symbol` | ||||
|     uniquely from its fields. | ||||
| 
 | ||||
|     This is the equiv of generating a `Pair.fmqe` from `cryptofeed` | ||||
|     for now i suppose..? | ||||
| 
 | ||||
|     ''' | ||||
|     new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) | ||||
|     otype = 'C' if sym.option_type == CALL else 'P' | ||||
| 
 | ||||
|     return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' | ||||
|     otype = ( | ||||
|         'C' if sym.option_type == CALL | ||||
|         else 'P' | ||||
|     ) | ||||
|     return ( | ||||
|         f'{sym.base}-' | ||||
|         f'{new_expiry_date}-' | ||||
|         f'{sym.strike_price}-' | ||||
|         f'{otype}' | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def get_values_from_cb_normalized_date(expiry_date: str) -> str: | ||||
|  | @ -179,32 +220,39 @@ def get_config() -> dict[str, Any]: | |||
| 
 | ||||
|     conf: dict | ||||
|     path: Path | ||||
| 
 | ||||
|     conf, path = config.load( | ||||
|         conf_name='brokers', | ||||
|         touch_if_dne=True, | ||||
|     ) | ||||
|     section: dict = {} | ||||
|     section = conf.get('deribit') | ||||
|     section: dict|None = conf.get('deribit') | ||||
|     if section is None: | ||||
|         log.warning(f'No config section found for deribit in {path}') | ||||
|         return {} | ||||
|         raise ValueError( | ||||
|             f'No `[deribit]` section found in\n' | ||||
|             f'{path!r}\n\n' | ||||
|             f'See the template config from the core repo for samples..\n' | ||||
|             # f'<TODO put repo link here??>' | ||||
|         ) | ||||
| 
 | ||||
|     conf_option = section.get('option', {}) | ||||
|     section.clear # clear the dict to reuse it | ||||
|     section['deribit'] = {} | ||||
|     section['deribit']['key_id'] = conf_option.get('api_key') | ||||
|     section['deribit']['key_secret'] = conf_option.get('api_secret') | ||||
| 
 | ||||
|     section['log'] = {} | ||||
|     section['log']['filename'] = 'feedhandler.log' | ||||
|     section['log']['level'] = 'DEBUG' | ||||
| 
 | ||||
|     return section | ||||
|     conf_log = conf_option.get('log', {}) | ||||
|     return { | ||||
|         'deribit': { | ||||
|             'key_id': conf_option['key_id'], | ||||
|             'key_secret': conf_option['key_secret'], | ||||
|         }, | ||||
|         'log': { | ||||
|             'filename': conf_log['filename'], | ||||
|             'level': conf_log['level'], | ||||
|             'disabled': conf_log['disabled'], | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class Client: | ||||
|     ''' | ||||
|     Hi-level interface for the jsron-RPC over websocket API. | ||||
| 
 | ||||
|     ''' | ||||
|     def __init__( | ||||
|         self, | ||||
| 
 | ||||
|  | @ -223,8 +271,12 @@ class Client: | |||
|         self._auth_ts = None | ||||
|         self._auth_renew_ts = 5 # seconds to renew auth | ||||
| 
 | ||||
|     async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: | ||||
|          | ||||
|     async def _json_rpc_auth_wrapper( | ||||
|         self, | ||||
|         *args, | ||||
|         **kwargs, | ||||
|     ) -> JSONRPCResult: | ||||
| 
 | ||||
|         """Background task that adquires a first access token and then will | ||||
|         refresh the access token. | ||||
| 
 | ||||
|  | @ -250,9 +302,6 @@ class Client: | |||
| 
 | ||||
|         return await self.json_rpc(*args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|     async def get_balances( | ||||
|         self, | ||||
|         kind: str = 'option' | ||||
|  | @ -272,28 +321,44 @@ class Client: | |||
| 
 | ||||
|         return balances | ||||
| 
 | ||||
|     async def get_assets( | ||||
|     async def get_currencies( | ||||
|         self, | ||||
|         venue: str | None = None, | ||||
| 
 | ||||
|     ) -> dict[str, Asset]: | ||||
|         """Return the set of asset balances for this account | ||||
|         by symbol. | ||||
|         """ | ||||
|     ) -> list[dict]: | ||||
|         ''' | ||||
|         Return the set of currencies for deribit. | ||||
|         ''' | ||||
|         assets = {} | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_currencies', | ||||
|             params={} | ||||
|         ) | ||||
|         currencies = resp.result | ||||
|         return resp.result | ||||
| 
 | ||||
|     async def get_assets( | ||||
|         self, | ||||
|         venue: str | None = None, | ||||
| 
 | ||||
|     ) -> dict[str, Asset]: | ||||
|         ''' | ||||
|         Return the set of asset balances for this account | ||||
|         by (deribit's) symbol. | ||||
| 
 | ||||
| 
 | ||||
|         ''' | ||||
|         assets = {} | ||||
|         currencies = await self.get_currencies() | ||||
|         for currency in currencies: | ||||
|             name = currency['currency'] | ||||
|             tx_tick = digits_to_dec(currency['fee_precision'])  | ||||
|             atype='crypto_currency' | ||||
|             name: str = currency['currency'] | ||||
|             tx_tick: Decimal = digits_to_dec(currency['fee_precision']) | ||||
| 
 | ||||
|             # TODO, handling of options, futures, perps etc. more | ||||
|             # specifically with diff `.atype`s? | ||||
|             assets[name] = Asset( | ||||
|                 name=name, | ||||
|                 atype=atype, | ||||
|                 tx_tick=tx_tick) | ||||
|                 atype='crypto_currency', | ||||
|                 tx_tick=tx_tick, | ||||
|             ) | ||||
| 
 | ||||
|             instruments = await self.symbol_info(currency=name) | ||||
|             for instrument in instruments: | ||||
|  | @ -301,9 +366,10 @@ class Client: | |||
|                 assets[pair.symbol] = Asset( | ||||
|                     name=pair.symbol, | ||||
|                     atype=pair.venue, | ||||
|                     tx_tick=pair.size_tick) | ||||
|                     tx_tick=pair.size_tick, | ||||
|                 ) | ||||
| 
 | ||||
|         return assets  | ||||
|         return assets | ||||
| 
 | ||||
|     async def get_mkt_pairs(self) -> dict[str, Pair]: | ||||
|         flat: dict[str, Pair] = {} | ||||
|  | @ -358,6 +424,19 @@ class Client: | |||
|             return cached_pair | ||||
| 
 | ||||
|         if sym: | ||||
|             opt: OptionPair|None = pair_table.get(sym) | ||||
|             if not opt: | ||||
|                 closest_matches: dict[str, Pair] = match_from_pairs( | ||||
|                     pairs=pair_table, | ||||
|                     query=sym, | ||||
|                     score_cutoff=40, | ||||
|                 ) | ||||
|                 closest_syms: list[str] = list(closest_matches.keys()) | ||||
|                 raise ValueError( | ||||
|                     f'No contract found for {sym!r}\n\n' | ||||
|                     f'Closest {len(closest_syms)} available contracts:\n\n' | ||||
|                     f'{pformat(closest_syms)}\n' | ||||
|                 ) | ||||
|             return pair_table[sym] | ||||
|         else: | ||||
|             return self._pairs | ||||
|  | @ -381,7 +460,7 @@ class Client: | |||
|         params: dict[str, str] = { | ||||
|             'currency': currency.upper(), | ||||
|             'kind': kind, | ||||
|             'expired': str(expired).lower() | ||||
|             'expired': expired, | ||||
|         } | ||||
| 
 | ||||
|         resp: JSONRPCResult = await self._json_rpc_auth_wrapper( | ||||
|  | @ -389,9 +468,9 @@ class Client: | |||
|             params, | ||||
|         ) | ||||
|         # convert to symbol-keyed table | ||||
|         pair_type: Type = PAIRTYPES[kind] | ||||
|         pair_type: Pair = PAIRTYPES[kind] | ||||
|         results: list[dict] | None = resp.result | ||||
|          | ||||
| 
 | ||||
|         instruments: dict[str, Pair] = {} | ||||
|         for item in results: | ||||
|             symbol=item['instrument_name'].lower() | ||||
|  | @ -427,12 +506,15 @@ class Client: | |||
|         mkt_pairs = await self.symbol_info() | ||||
| 
 | ||||
|         if not mkt_pairs: | ||||
|             raise SymbolNotFound(f'No market pairs found!?:\n{resp}') | ||||
|             raise SymbolNotFound( | ||||
|                 f'No market pairs found!?:\n' | ||||
|                 f'{mkt_pairs}' | ||||
|             ) | ||||
| 
 | ||||
|         pairs_view_subtable: dict[str, Pair] = {} | ||||
| 
 | ||||
|         for instrument in mkt_pairs: | ||||
|             pair_type: Type = PAIRTYPES[venue] | ||||
|             pair_type: Pair|OptionPair = PAIRTYPES[venue] | ||||
| 
 | ||||
|             pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) | ||||
| 
 | ||||
|  | @ -480,12 +562,14 @@ class Client: | |||
|         if end_dt is None: | ||||
|             end_dt = now('UTC') | ||||
| 
 | ||||
|         _orig_start_dt = start_dt | ||||
|         if start_dt is None: | ||||
|             start_dt = end_dt.start_of( | ||||
|                 'minute').subtract(minutes=limit) | ||||
|                 'minute' | ||||
|             ).subtract(minutes=limit) | ||||
| 
 | ||||
|         start_time = deribit_timestamp(start_dt) | ||||
|         end_time = deribit_timestamp(end_dt) | ||||
|         start_time: int = deribit_timestamp(start_dt) | ||||
|         end_time: int = deribit_timestamp(end_dt) | ||||
| 
 | ||||
|         # https://docs.deribit.com/#public-get_tradingview_chart_data | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|  | @ -499,9 +583,13 @@ class Client: | |||
| 
 | ||||
|         result = KLinesResult(**resp.result) | ||||
|         new_bars: list[tuple] = [] | ||||
|         for i in range(len(result.close)): | ||||
|         # if _orig_start_dt is None: | ||||
|         # if not new_bars: | ||||
|         #     import tractor | ||||
|         #     await tractor.pause() | ||||
| 
 | ||||
|             row = [  | ||||
|         for i in range(len(result.close)): | ||||
|             row = [ | ||||
|                 (start_time + (i * (60 * 1000))) / 1000.0,  # time | ||||
|                 result.open[i], | ||||
|                 result.high[i], | ||||
|  | @ -554,7 +642,7 @@ async def get_client( | |||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_feed_handler(): | ||||
| async def open_feed_handler() -> FeedHandler: | ||||
|     fh = FeedHandler(config=get_config()) | ||||
|     yield fh | ||||
|     await to_asyncio.run_task(fh.stop_async) | ||||
|  | @ -575,43 +663,37 @@ async def aio_price_feed_relay( | |||
|     from_trio: asyncio.Queue, | ||||
|     to_trio: trio.abc.SendChannel, | ||||
| ) -> None: | ||||
|     async def _trade(data: dict, receipt_timestamp): | ||||
|         to_trio.send_nowait(('trade', { | ||||
|             'symbol': cb_sym_to_deribit_inst( | ||||
|                 str_to_cb_sym(data.symbol)).lower(), | ||||
|             'last': data, | ||||
|             'broker_ts': time.time(), | ||||
|             'data': data.to_dict(), | ||||
|             'receipt': receipt_timestamp | ||||
|         })) | ||||
|     ''' | ||||
|     Relay price feed quotes from the `cryptofeed.FeedHandler` to | ||||
|     the `piker`-side `trio.task` consumers for delivery to consumer | ||||
|     sub-actors for various subsystems. | ||||
| 
 | ||||
|     ''' | ||||
|     async def _trade( | ||||
|         trade: Trade,  # cryptofeed, NOT ours from `.venues`! | ||||
|         receipt_timestamp: int, | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. | ||||
| 
 | ||||
|         ''' | ||||
|         to_trio.send_nowait(('trade', trade)) | ||||
| 
 | ||||
|     async def _l1( | ||||
|         book: L1Book, | ||||
|         receipt_timestamp: int, | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Relay-thru "l1 book" updates. | ||||
| 
 | ||||
|         ''' | ||||
| 
 | ||||
|         to_trio.send_nowait(('l1', book)) | ||||
| 
 | ||||
|         # TODO, make this work! | ||||
|         # -[ ] why isn't this working in `tractor.pause_from_sync()`?? | ||||
|         # breakpoint() | ||||
| 
 | ||||
|     async def _l1(data: dict, receipt_timestamp): | ||||
|         to_trio.send_nowait(('l1', { | ||||
|             'symbol': cb_sym_to_deribit_inst( | ||||
|                 str_to_cb_sym(data.symbol)).lower(), | ||||
|             'ticks': [ | ||||
|                 { | ||||
|                     'type': 'bid', | ||||
|                     'price': float(data.bid_price), | ||||
|                     'size': float(data.bid_size) | ||||
|                 }, | ||||
|                 { | ||||
|                     'type': 'bsize', | ||||
|                     'price': float(data.bid_price), | ||||
|                     'size': float(data.bid_size) | ||||
|                 }, | ||||
|                 { | ||||
|                     'type': 'ask', | ||||
|                     'price': float(data.ask_price), | ||||
|                     'size': float(data.ask_size) | ||||
|                 }, | ||||
|                 { | ||||
|                     'type': 'asize', | ||||
|                     'price': float(data.ask_price), | ||||
|                     'size': float(data.ask_size) | ||||
|                 } | ||||
|             ] | ||||
|         })) | ||||
|     sym: Symbol = piker_sym_to_cb_sym(instrument) | ||||
|     fh.add_feed( | ||||
|         DERIBIT, | ||||
|  | @ -625,27 +707,35 @@ async def aio_price_feed_relay( | |||
|     if not fh.running: | ||||
|         fh.run( | ||||
|             start_loop=False, | ||||
|             install_signal_handlers=False) | ||||
|             install_signal_handlers=False | ||||
|         ) | ||||
| 
 | ||||
|     # sync with trio | ||||
|     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     # run until cancelled | ||||
|     await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_price_feed( | ||||
|     instrument: str | ||||
| ) -> trio.abc.ReceiveStream: | ||||
|     async with maybe_open_feed_handler() as fh: | ||||
|         async with to_asyncio.open_channel_from( | ||||
| ) -> to_asyncio.LinkedTaskChannel: | ||||
| 
 | ||||
|     fh: FeedHandler | ||||
|     first: None | ||||
|     chan: to_asyncio.LinkedTaskChannel | ||||
|     async with ( | ||||
|         maybe_open_feed_handler() as fh, | ||||
|         to_asyncio.open_channel_from( | ||||
|             partial( | ||||
|                 aio_price_feed_relay, | ||||
|                 fh, | ||||
|                 instrument | ||||
|             ) | ||||
|         ) as (first, chan): | ||||
|             yield chan | ||||
|         ) as (first, chan) | ||||
|     ): | ||||
|         yield chan | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  | @ -654,6 +744,7 @@ async def maybe_open_price_feed( | |||
| ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     # TODO: add a predicate to maybe_open_context | ||||
|     feed: to_asyncio.LinkedTaskChannel | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_price_feed, | ||||
|         kwargs={ | ||||
|  | @ -668,68 +759,69 @@ async def maybe_open_price_feed( | |||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| async def aio_order_feed_relay( | ||||
|     fh: FeedHandler, | ||||
|     instrument: Symbol, | ||||
|     from_trio: asyncio.Queue, | ||||
|     to_trio: trio.abc.SendChannel, | ||||
| ) -> None: | ||||
|     async def _fill(data: dict, receipt_timestamp): | ||||
|         breakpoint() | ||||
| # TODO, move all to `.broker` submod! | ||||
| # async def aio_order_feed_relay( | ||||
| #     fh: FeedHandler, | ||||
| #     instrument: Symbol, | ||||
| #     from_trio: asyncio.Queue, | ||||
| #     to_trio: trio.abc.SendChannel, | ||||
| # ) -> None: | ||||
| #     async def _fill(data: dict, receipt_timestamp): | ||||
| #         breakpoint() | ||||
| 
 | ||||
|     async def _order_info(data: dict, receipt_timestamp): | ||||
|         breakpoint() | ||||
| #     async def _order_info(data: dict, receipt_timestamp): | ||||
| #         breakpoint() | ||||
| 
 | ||||
|     fh.add_feed( | ||||
|         DERIBIT, | ||||
|         channels=[FILLS, ORDER_INFO], | ||||
|         symbols=[instrument.upper()], | ||||
|         callbacks={ | ||||
|             FILLS: _fill, | ||||
|             ORDER_INFO: _order_info, | ||||
|         }) | ||||
| #     fh.add_feed( | ||||
| #         DERIBIT, | ||||
| #         channels=[FILLS, ORDER_INFO], | ||||
| #         symbols=[instrument.upper()], | ||||
| #         callbacks={ | ||||
| #             FILLS: _fill, | ||||
| #             ORDER_INFO: _order_info, | ||||
| #         }) | ||||
| 
 | ||||
|     if not fh.running: | ||||
|         fh.run( | ||||
|             start_loop=False, | ||||
|             install_signal_handlers=False) | ||||
| #     if not fh.running: | ||||
| #         fh.run( | ||||
| #             start_loop=False, | ||||
| #             install_signal_handlers=False) | ||||
| 
 | ||||
|     # sync with trio | ||||
|     to_trio.send_nowait(None) | ||||
| #     # sync with trio | ||||
| #     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     await asyncio.sleep(float('inf')) | ||||
| #     await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_order_feed( | ||||
|     instrument: list[str] | ||||
| ) -> trio.abc.ReceiveStream: | ||||
|     async with maybe_open_feed_handler() as fh: | ||||
|         async with to_asyncio.open_channel_from( | ||||
|             partial( | ||||
|                 aio_order_feed_relay, | ||||
|                 fh, | ||||
|                 instrument | ||||
|             ) | ||||
|         ) as (first, chan): | ||||
|             yield chan | ||||
| # @acm | ||||
| # async def open_order_feed( | ||||
| #     instrument: list[str] | ||||
| # ) -> trio.abc.ReceiveStream: | ||||
| #     async with maybe_open_feed_handler() as fh: | ||||
| #         async with to_asyncio.open_channel_from( | ||||
| #             partial( | ||||
| #                 aio_order_feed_relay, | ||||
| #                 fh, | ||||
| #                 instrument | ||||
| #             ) | ||||
| #         ) as (first, chan): | ||||
| #             yield chan | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_order_feed( | ||||
|     instrument: str | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| # @acm | ||||
| # async def maybe_open_order_feed( | ||||
| #     instrument: str | ||||
| # ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     # TODO: add a predicate to maybe_open_context | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_order_feed, | ||||
|         kwargs={ | ||||
|             'instrument': instrument.split('.')[0], | ||||
|             'fh': fh | ||||
|         }, | ||||
|         key=f'{instrument.split('.')[0]}-order', | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|         else: | ||||
|             yield feed | ||||
| #     # TODO: add a predicate to maybe_open_context | ||||
| #     async with maybe_open_context( | ||||
| #         acm_func=open_order_feed, | ||||
| #         kwargs={ | ||||
| #             'instrument': instrument.split('.')[0], | ||||
| #             'fh': fh | ||||
| #         }, | ||||
| #         key=f'{instrument.split('.')[0]}-order', | ||||
| #     ) as (cache_hit, feed): | ||||
| #         if cache_hit: | ||||
| #             yield broadcast_receiver(feed, 10) | ||||
| #         else: | ||||
| #             yield feed | ||||
|  |  | |||
|  | @ -18,56 +18,58 @@ | |||
| Deribit backend. | ||||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from datetime import datetime | ||||
| from typing import Any, Optional, Callable | ||||
| from pprint import pformat | ||||
| from typing import ( | ||||
|     # Any, | ||||
|     # Optional, | ||||
|     Callable, | ||||
| ) | ||||
| # from pprint import pformat | ||||
| import time | ||||
| 
 | ||||
| import cryptofeed | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| from pendulum import ( | ||||
|     from_timestamp, | ||||
|     now, | ||||
| ) | ||||
| from rapidfuzz import process as fuzzy | ||||
| import numpy as np | ||||
| import tractor | ||||
| 
 | ||||
| from piker.accounting import ( | ||||
|     Asset, | ||||
|     MktPair, | ||||
|     unpack_fqme, | ||||
| ) | ||||
| from piker.brokers import ( | ||||
|     open_cached_client, | ||||
|     NoData, | ||||
|     DataUnavailable, | ||||
| ) | ||||
| from piker._cacheables import ( | ||||
|     async_lifo_cache, | ||||
| ) | ||||
| from piker.log import get_logger, get_console_log | ||||
| from piker.data import ShmArray | ||||
| from piker.log import ( | ||||
|     get_logger, | ||||
|     mk_repr, | ||||
| ) | ||||
| from piker.data.validate import FeedInit | ||||
| from piker.brokers._util import ( | ||||
|     BrokerError, | ||||
|     DataUnavailable, | ||||
| ) | ||||
| 
 | ||||
| from cryptofeed import FeedHandler | ||||
| from cryptofeed.defines import ( | ||||
|     DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT | ||||
| ) | ||||
| from cryptofeed.symbols import Symbol | ||||
| 
 | ||||
| from .api import ( | ||||
|     Client, Trade, | ||||
|     get_config, | ||||
|     piker_sym_to_cb_sym, cb_sym_to_deribit_inst, | ||||
|     Client, | ||||
|     # get_config, | ||||
|     piker_sym_to_cb_sym, | ||||
|     cb_sym_to_deribit_inst, | ||||
|     str_to_cb_sym, | ||||
|     maybe_open_price_feed | ||||
| ) | ||||
| from .venues import ( | ||||
|     Pair, | ||||
|     OptionPair, | ||||
|     Trade, | ||||
| ) | ||||
| 
 | ||||
| _spawn_kwargs = { | ||||
|  | @ -86,6 +88,10 @@ async def open_history_client( | |||
|     # TODO implement history getter for the new storage layer. | ||||
|     async with open_cached_client('deribit') as client: | ||||
| 
 | ||||
|         pair: OptionPair = client._pairs[mkt.dst.name] | ||||
|         # XXX NOTE, the cuckers use ms !!! | ||||
|         creation_time_s: int = pair.creation_timestamp/1000 | ||||
| 
 | ||||
|         async def get_ohlc( | ||||
|             timeframe: float, | ||||
|             end_dt: datetime | None = None, | ||||
|  | @ -105,6 +111,31 @@ async def open_history_client( | |||
|                 end_dt=end_dt, | ||||
|             ) | ||||
|             if len(array) == 0: | ||||
|                 if ( | ||||
|                     end_dt is None | ||||
|                 ): | ||||
|                     raise DataUnavailable( | ||||
|                         'No history seems to exist yet?\n\n' | ||||
|                         f'{mkt}' | ||||
|                     ) | ||||
|                 elif ( | ||||
|                     end_dt | ||||
|                     and | ||||
|                     end_dt.timestamp() < creation_time_s | ||||
|                 ): | ||||
|                     # the contract can't have history | ||||
|                     # before it was created. | ||||
|                     pair_type_str: str = type(pair).__name__ | ||||
|                     create_dt: datetime = from_timestamp(creation_time_s) | ||||
|                     raise DataUnavailable( | ||||
|                         f'No history prior to\n' | ||||
|                         f'`{pair_type_str}.creation_timestamp: int = ' | ||||
|                         f'{pair.creation_timestamp}\n\n' | ||||
|                         f'------ deribit sux ------\n' | ||||
|                         f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n' | ||||
|                         f'creation_time_s: {creation_time_s}\n' | ||||
|                         f'create_dt: {create_dt}\n' | ||||
|                     ) | ||||
|                 raise NoData( | ||||
|                     f'No frame for {start_dt} -> {end_dt}\n' | ||||
|                 ) | ||||
|  | @ -126,14 +157,20 @@ async def open_history_client( | |||
| 
 | ||||
|             return array, start_dt, end_dt | ||||
| 
 | ||||
|         yield get_ohlc, {'erlangs': 3, 'rate': 3} | ||||
|         yield ( | ||||
|             get_ohlc, | ||||
|             {  # backfill config | ||||
|                 'erlangs': 3, | ||||
|                 'rate': 3, | ||||
|             } | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @async_lifo_cache() | ||||
| async def get_mkt_info( | ||||
|     fqme: str, | ||||
| 
 | ||||
| ) -> tuple[MktPair, Pair] | None: | ||||
| ) -> tuple[MktPair, Pair|OptionPair] | None: | ||||
| 
 | ||||
|     # uppercase since kraken bs_mktid is always upper | ||||
|     if 'deribit' not in fqme.lower(): | ||||
|  | @ -149,7 +186,7 @@ async def get_mkt_info( | |||
|     # returns, always! | ||||
|     expiry: str = expiry.upper() | ||||
|     venue: str = venue.upper() | ||||
|     venue_lower: str = venue.lower() | ||||
|     # venue_lower: str = venue.lower() | ||||
| 
 | ||||
|     mkt_mode: str = 'option' | ||||
| 
 | ||||
|  | @ -175,64 +212,88 @@ async def get_mkt_info( | |||
|             price_tick=pair.price_tick, | ||||
|             size_tick=pair.size_tick, | ||||
|             bs_mktid=pair.symbol, | ||||
|             expiry=pair.expiry, | ||||
|             venue=mkt_mode, | ||||
|             broker='deribit', | ||||
|             _atype=mkt_mode, | ||||
|             _fqme_without_src=True, | ||||
| 
 | ||||
|             # expiry=pair.expiry, | ||||
|             # XXX TODO, currently we don't use it since it's | ||||
|             # already "described" in the `OptionPair.symbol: str` | ||||
|             # and if we slap in the ISO repr it's kinda hideous.. | ||||
|             # -[ ] figure out the best either std | ||||
|         ) | ||||
|         return mkt, pair | ||||
| 
 | ||||
| 
 | ||||
| async def stream_quotes( | ||||
| 
 | ||||
|     send_chan: trio.abc.SendChannel, | ||||
|     symbols: list[str], | ||||
|     feed_is_live: trio.Event, | ||||
|     loglevel: str = None, | ||||
| 
 | ||||
|     # startup sync | ||||
|     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
| ) -> None: | ||||
|     # XXX: required to propagate ``tractor`` loglevel to piker logging | ||||
|     get_console_log(loglevel or tractor.current_actor().loglevel) | ||||
|     ''' | ||||
|     Open a live quote stream for the market set defined by `symbols`. | ||||
| 
 | ||||
|     Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side | ||||
|     task and relays through L1 and `Trade` msgs here to our `trio.Task`. | ||||
| 
 | ||||
|     ''' | ||||
|     sym = symbols[0].split('.')[0] | ||||
| 
 | ||||
|     init_msgs: list[FeedInit] = [] | ||||
| 
 | ||||
|     # multiline nested `dict` formatter (since rn quote-msgs are | ||||
|     # just that). | ||||
|     pfmt: Callable[[str], str] = mk_repr( | ||||
|         # so we can see `deribit`'s delightfully mega-long bs fields.. | ||||
|         maxstring=100, | ||||
|     ) | ||||
| 
 | ||||
|     async with ( | ||||
|         open_cached_client('deribit') as client, | ||||
|         send_chan as send_chan | ||||
|     ): | ||||
| 
 | ||||
|         mkt: MktPair | ||||
|         pair: Pair | ||||
|         mkt, pair = await get_mkt_info(sym) | ||||
| 
 | ||||
|         # build out init msgs according to latest spec | ||||
|         init_msgs.append( | ||||
|             FeedInit(mkt_info=mkt) | ||||
|             FeedInit( | ||||
|                 mkt_info=mkt, | ||||
|             ) | ||||
|         ) | ||||
|         nsym = piker_sym_to_cb_sym(sym) | ||||
|         # build `cryptofeed` feed-handle | ||||
|         cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) | ||||
| 
 | ||||
|         async with maybe_open_price_feed(sym) as stream: | ||||
|         from_cf: tractor.to_asyncio.LinkedTaskChannel | ||||
|         async with maybe_open_price_feed(sym) as from_cf: | ||||
| 
 | ||||
|             cache = client._pairs | ||||
|             # load the "last trades" summary | ||||
|             last_trades_res: cryptofeed.LastTradesResult = await client.last_trades( | ||||
|                 cb_sym_to_deribit_inst(cf_sym), | ||||
|                 count=1, | ||||
|             ) | ||||
|             last_trades: list[Trade] = last_trades_res.trades | ||||
| 
 | ||||
|             last_trades = (await client.last_trades( | ||||
|                 cb_sym_to_deribit_inst(nsym), count=1)).trades | ||||
|             # TODO, do we even need this or will the above always | ||||
|             # work? | ||||
|             # if not last_trades: | ||||
|             #     await tractor.pause() | ||||
|             #     async for typ, quote in from_cf: | ||||
|             #         if typ == 'trade': | ||||
|             #             last_trade = Trade(**(quote['data'])) | ||||
|             #             break | ||||
| 
 | ||||
|             if len(last_trades) == 0: | ||||
|                 last_trade = None | ||||
|                 async for typ, quote in stream: | ||||
|                     if typ == 'trade': | ||||
|                         last_trade = Trade(**(quote['data'])) | ||||
|                         break | ||||
|             # else: | ||||
|             last_trade = Trade( | ||||
|                 **(last_trades[0]) | ||||
|             ) | ||||
| 
 | ||||
|             else: | ||||
|                 last_trade = Trade(**(last_trades[0])) | ||||
| 
 | ||||
|             first_quote = { | ||||
|             first_quote: dict = { | ||||
|                 'symbol': sym, | ||||
|                 'last': last_trade.price, | ||||
|                 'brokerd_ts': last_trade.timestamp, | ||||
|  | @ -243,13 +304,84 @@ async def stream_quotes( | |||
|                     'broker_ts': last_trade.timestamp | ||||
|                 }] | ||||
|             } | ||||
|             task_status.started((init_msgs,  first_quote)) | ||||
|             task_status.started(( | ||||
|                 init_msgs, | ||||
|                 first_quote, | ||||
|             )) | ||||
| 
 | ||||
|             feed_is_live.set() | ||||
| 
 | ||||
|             async for typ, quote in stream: | ||||
|                 topic = quote['symbol'] | ||||
|                 await send_chan.send({topic: quote}) | ||||
|             # NOTE XXX, static for now! | ||||
|             # => since this only handles ONE mkt feed at a time we | ||||
|             # don't need a lookup table to map interleaved quotes | ||||
|             # from multiple possible mkt-pairs | ||||
|             topic: str = mkt.bs_fqme | ||||
| 
 | ||||
|             # deliver until cancelled | ||||
|             async for typ, ref in from_cf: | ||||
|                 match typ: | ||||
|                     case 'trade': | ||||
|                         trade: cryptofeed.types.Trade = ref | ||||
| 
 | ||||
|                         # TODO, re-impl this according to teh ideal | ||||
|                         # fqme for opts that we choose!! | ||||
|                         bs_fqme: str = cb_sym_to_deribit_inst( | ||||
|                             str_to_cb_sym(trade.symbol) | ||||
|                         ).lower() | ||||
| 
 | ||||
|                         piker_quote: dict = { | ||||
|                             'symbol': bs_fqme, | ||||
|                             'last': trade.price, | ||||
|                             'broker_ts': time.time(), | ||||
|                             # ^TODO, name this `brokerd/datad_ts` and | ||||
|                             # use `time.time_ns()` ?? | ||||
|                             'ticks': [{ | ||||
|                                 'type': 'trade', | ||||
|                                 'price': float(trade.price), | ||||
|                                 'size': float(trade.amount), | ||||
|                                 'broker_ts': trade.timestamp, | ||||
|                             }], | ||||
|                         } | ||||
|                         log.info( | ||||
|                             f'deribit {typ!r} quote for {sym!r}\n\n' | ||||
|                             f'{trade}\n\n' | ||||
|                             f'{pfmt(piker_quote)}\n' | ||||
|                         ) | ||||
| 
 | ||||
|                     case 'l1': | ||||
|                         book: cryptofeed.types.L1Book = ref | ||||
| 
 | ||||
|                         # TODO, so this is where we can possibly change things | ||||
|                         # and instead lever the `MktPair.bs_fqme: str` output? | ||||
|                         bs_fqme: str = cb_sym_to_deribit_inst( | ||||
|                             str_to_cb_sym(book.symbol) | ||||
|                         ).lower() | ||||
| 
 | ||||
|                         piker_quote: dict = { | ||||
|                             'symbol': bs_fqme, | ||||
|                             'ticks': [ | ||||
| 
 | ||||
|                                 {'type': 'bid', | ||||
|                                  'price': float(book.bid_price), | ||||
|                                  'size': float(book.bid_size)}, | ||||
| 
 | ||||
|                                 {'type': 'bsize', | ||||
|                                  'price': float(book.bid_price), | ||||
|                                  'size': float(book.bid_size),}, | ||||
| 
 | ||||
|                                 {'type': 'ask', | ||||
|                                  'price': float(book.ask_price), | ||||
|                                  'size': float(book.ask_size),}, | ||||
| 
 | ||||
|                                 {'type': 'asize', | ||||
|                                  'price': float(book.ask_price), | ||||
|                                  'size': float(book.ask_size),} | ||||
|                             ] | ||||
|                         } | ||||
| 
 | ||||
|                 await send_chan.send({ | ||||
|                     topic: piker_quote, | ||||
|                 }) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  | @ -259,13 +391,13 @@ async def open_symbol_search( | |||
|     async with open_cached_client('deribit') as client: | ||||
| 
 | ||||
|         # load all symbols locally for fast search | ||||
|         cache = client._pairs | ||||
|         # cache = client._pairs | ||||
|         await ctx.started() | ||||
| 
 | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             pattern: str | ||||
|             async for pattern in stream: | ||||
| 
 | ||||
|                 # NOTE: pattern fuzzy-matching is done within | ||||
|                 # the methd impl. | ||||
|                 pairs: dict[str, Pair] = await client.search_symbols( | ||||
|  |  | |||
|  | @ -22,11 +22,10 @@ from __future__ import annotations | |||
| import pendulum | ||||
| from typing import ( | ||||
|     Literal, | ||||
|     Optional, | ||||
| ) | ||||
| from decimal import Decimal | ||||
| 
 | ||||
| from msgspec import field | ||||
| 
 | ||||
| from piker.types import Struct | ||||
| 
 | ||||
| 
 | ||||
|  | @ -111,18 +110,21 @@ class OptionPair(Pair, frozen=True): | |||
|     block_trade_min_trade_amount: int # '25' | ||||
|     block_trade_commission: float # '0.003' | ||||
| 
 | ||||
| 
 | ||||
|     # NOTE: see `.data._symcache.SymbologyCache.load()` for why | ||||
|     ns_path: str = 'piker.brokers.deribit:OptionPair' | ||||
| 
 | ||||
|     # TODO, impl this without the MM:SS part of | ||||
|     # the `'THH:MM:SS..'` etc.. | ||||
|     @property | ||||
|     def expiry(self) -> str: | ||||
|         iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat() | ||||
|         iso_date = pendulum.from_timestamp( | ||||
|             self.expiration_timestamp / 1000 | ||||
|         ).isoformat() | ||||
|         return iso_date  | ||||
| 
 | ||||
|     @property | ||||
|     def venue(self) -> str: | ||||
|         return 'option' | ||||
|         return f'{self.instrument_type}_option' | ||||
| 
 | ||||
|     @property | ||||
|     def bs_fqme(self) -> str: | ||||
|  | @ -152,6 +154,7 @@ class JSONRPCResult(Struct): | |||
|     error: Optional[dict] = None | ||||
|     result: Optional[list[dict]] = None | ||||
| 
 | ||||
| 
 | ||||
| class JSONRPCChannel(Struct): | ||||
|     method: str | ||||
|     params: dict | ||||
|  | @ -168,6 +171,7 @@ class KLinesResult(Struct): | |||
|     status: str | ||||
|     volume: list[float] | ||||
| 
 | ||||
| 
 | ||||
| class Trade(Struct): | ||||
|     iv: float | ||||
|     price: float | ||||
|  | @ -186,6 +190,7 @@ class Trade(Struct): | |||
|     block_trade_id: Optional[str] = '', | ||||
|     block_trade_leg_count: Optional[int] = 0, | ||||
| 
 | ||||
| 
 | ||||
| class LastTradesResult(Struct): | ||||
|     trades: list[Trade] | ||||
|     has_more: bool | ||||
|  |  | |||
|  | @ -29,6 +29,7 @@ import time | |||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncIterator, | ||||
|     Callable, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
|  | @ -53,6 +54,9 @@ from ._util import ( | |||
|     get_console_log, | ||||
| ) | ||||
| from ..service import maybe_spawn_daemon | ||||
| from piker.log import ( | ||||
|     mk_repr, | ||||
| ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._sharedmem import ( | ||||
|  | @ -561,7 +565,6 @@ async def open_sample_stream( | |||
| 
 | ||||
| 
 | ||||
| async def sample_and_broadcast( | ||||
| 
 | ||||
|     bus: _FeedsBus,  # noqa | ||||
|     rt_shm: ShmArray, | ||||
|     hist_shm: ShmArray, | ||||
|  | @ -582,11 +585,22 @@ async def sample_and_broadcast( | |||
| 
 | ||||
|     overruns = Counter() | ||||
| 
 | ||||
|     # multiline nested `dict` formatter (since rn quote-msgs are | ||||
|     # just that). | ||||
|     pfmt: Callable[[str], str] = mk_repr() | ||||
| 
 | ||||
|     # iterate stream delivered by broker | ||||
|     async for quotes in quote_stream: | ||||
|         # print(quotes) | ||||
| 
 | ||||
|         # TODO: ``numba`` this! | ||||
|         # XXX WARNING XXX only enable for debugging bc ow can cost | ||||
|         # ALOT of perf with HF-feedz!!! | ||||
|         # | ||||
|         # log.info( | ||||
|         #     'Rx live quotes:\n' | ||||
|         #     f'{pfmt(quotes)}' | ||||
|         # ) | ||||
| 
 | ||||
|         # TODO: `numba` this! | ||||
|         for broker_symbol, quote in quotes.items(): | ||||
|             # TODO: in theory you can send the IPC msg *before* writing | ||||
|             # to the sharedmem array to decrease latency, however, that | ||||
|  | @ -659,6 +673,18 @@ async def sample_and_broadcast( | |||
|             sub_key: str = broker_symbol.lower() | ||||
|             subs: set[Sub] = bus.get_subs(sub_key) | ||||
| 
 | ||||
|             if not subs: | ||||
|                 all_bs_fqmes: list[str] = list( | ||||
|                     bus._subscribers.keys() | ||||
|                 ) | ||||
|                 log.warning( | ||||
|                     f'No subscribers for {brokername!r} live-quote ??\n' | ||||
|                     f'broker_symbol: {broker_symbol}\n\n' | ||||
| 
 | ||||
|                     f'Maybe the backend-sys symbol does not match one of,\n' | ||||
|                     f'{pfmt(all_bs_fqmes)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # NOTE: by default the broker backend doesn't append | ||||
|             # it's own "name" into the fqme schema (but maybe it | ||||
|             # should?) so we have to manually generate the correct | ||||
|  |  | |||
|  | @ -359,8 +359,8 @@ async def open_autorecon_ws( | |||
| 
 | ||||
| 
 | ||||
| ''' | ||||
| JSONRPC response-request style machinery for transparent multiplexing of msgs | ||||
| over a NoBsWs. | ||||
| JSONRPC response-request style machinery for transparent multiplexing | ||||
| of msgs over a `NoBsWs`. | ||||
| 
 | ||||
| ''' | ||||
| 
 | ||||
|  | @ -377,19 +377,35 @@ async def open_jsonrpc_session( | |||
|     url: str, | ||||
|     start_id: int = 0, | ||||
|     response_type: type = JSONRPCResult, | ||||
|     request_type: Optional[type] = None, | ||||
|     request_hook: Optional[Callable] = None, | ||||
|     error_hook: Optional[Callable] = None, | ||||
|     msg_recv_timeout: float = float('inf'), | ||||
|     # ^NOTE, since only `deribit` is using this jsonrpc stuff atm | ||||
|     # and options mkts are generally "slow moving".. | ||||
|     # | ||||
|     # FURTHER if we break the underlying ws connection then since we | ||||
|     # don't pass a `fixture` to the task that manages `NoBsWs`, i.e. | ||||
|     # `_reconnect_forever()`, the jsonrpc "transport pipe" get's | ||||
|     # broken and never restored with wtv init sequence is required to | ||||
|     # re-establish a working req-resp session. | ||||
| 
 | ||||
|     # request_type: Optional[type] = None, | ||||
|     # request_hook: Optional[Callable] = None, | ||||
|     # error_hook: Optional[Callable] = None, | ||||
| ) -> Callable[[str, dict], dict]: | ||||
| 
 | ||||
|     async with ( | ||||
|         trio.open_nursery() as n, | ||||
|         open_autorecon_ws(url) as ws | ||||
|         open_autorecon_ws( | ||||
|             url=url, | ||||
|             msg_recv_timeout=msg_recv_timeout, | ||||
|         ) as ws | ||||
|     ): | ||||
|         rpc_id: Iterable = count(start_id) | ||||
|         rpc_results: dict[int, dict] = {} | ||||
| 
 | ||||
|         async def json_rpc(method: str, params: dict) -> dict: | ||||
|         async def json_rpc( | ||||
|             method: str, | ||||
|             params: dict, | ||||
|         ) -> dict: | ||||
|             ''' | ||||
|             perform a json rpc call and wait for the result, raise exception in | ||||
|             case of error field present on response | ||||
|  |  | |||
|  | @ -540,7 +540,10 @@ async def open_feed_bus( | |||
|         # subscription since the backend isn't (yet) expected to | ||||
|         # append it's own name to the fqme, so we filter on keys | ||||
|         # which *do not* include that name (e.g .ib) . | ||||
|         bus._subscribers.setdefault(bs_fqme, set()) | ||||
|         bus._subscribers.setdefault( | ||||
|             bs_fqme, | ||||
|             set(), | ||||
|         ) | ||||
| 
 | ||||
|     # sync feed subscribers with flume handles | ||||
|     await ctx.started( | ||||
|  |  | |||
							
								
								
									
										28
									
								
								piker/log.py
								
								
								
								
							
							
						
						
									
										28
									
								
								piker/log.py
								
								
								
								
							|  | @ -18,7 +18,11 @@ | |||
| Log like a forester! | ||||
| """ | ||||
| import logging | ||||
| import reprlib | ||||
| import json | ||||
| from typing import ( | ||||
|     Callable, | ||||
| ) | ||||
| 
 | ||||
| import tractor | ||||
| from pygments import ( | ||||
|  | @ -84,3 +88,27 @@ def colorize_json( | |||
|         # likeable styles: algol_nu, tango, monokai | ||||
|         formatters.TerminalTrueColorFormatter(style=style) | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def mk_repr( | ||||
|     **repr_kws, | ||||
| ) -> Callable[[str], str]: | ||||
|     ''' | ||||
|     Allocate and deliver a `repr.Repr` instance with provided input | ||||
|     settings using the std-lib's `reprlib` mod, | ||||
|      * https://docs.python.org/3/library/reprlib.html | ||||
| 
 | ||||
|     ------ Ex. ------ | ||||
|     An up to 6-layer-nested `dict` as multi-line: | ||||
|     - https://stackoverflow.com/a/79102479 | ||||
|     - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel | ||||
| 
 | ||||
|     ''' | ||||
|     def_kws: dict[str, int] = dict( | ||||
|         indent=2, | ||||
|         maxlevel=6,  # recursion levels | ||||
|         maxstring=66,  # match editor line-len limit | ||||
|     ) | ||||
|     def_kws |= repr_kws | ||||
|     reprr = reprlib.Repr(**def_kws) | ||||
|     return reprr.repr | ||||
|  |  | |||
|  | @ -161,7 +161,12 @@ class NativeStorageClient: | |||
| 
 | ||||
|     def index_files(self): | ||||
|         for path in self._datadir.iterdir(): | ||||
|             if path.name in {'borked', 'expired',}: | ||||
|             if ( | ||||
|                 path.name in {'borked', 'expired',} | ||||
|                 or | ||||
|                 '.parquet' not in str(path) | ||||
|             ): | ||||
|                 # ignore all non-apache files (for now) | ||||
|                 continue | ||||
| 
 | ||||
|             key: str = path.name.rstrip('.parquet') | ||||
|  |  | |||
|  | @ -434,21 +434,32 @@ async def start_backfill( | |||
|                 # - some other unknown error (ib blocking the | ||||
|                 #   history bc they don't want you seeing how they | ||||
|                 #   cucked all the tinas..) | ||||
|                 if dur := frame_types.get(timeframe): | ||||
|                     # decrement by a frame's worth of duration and | ||||
|                     # retry a few times. | ||||
|                     last_start_dt.subtract( | ||||
|                 if ( | ||||
|                     frame_types | ||||
|                     and | ||||
|                     (dur := frame_types.get(timeframe)) | ||||
|                 ): | ||||
|                     # decrement by a duration's (frame) worth of time | ||||
|                     # as maybe indicated by the backend to see if we | ||||
|                     # can get older data before this possible | ||||
|                     # "history gap". | ||||
|                     orig_last_start_dt = last_start_dt | ||||
|                     last_start_dt = last_start_dt.subtract( | ||||
|                         seconds=dur.total_seconds() | ||||
|                     ) | ||||
|                     log.warning( | ||||
|                         f'{mod.name} -> EMPTY FRAME for end_dt?\n' | ||||
|                         f'tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                         'bf_until <- last_start_dt:\n' | ||||
|                         f'{backfill_until_dt} <- {last_start_dt}\n' | ||||
|                         f'Decrementing `end_dt` by {dur} and retry..\n' | ||||
|                         f'Decrementing `end_dt` by {dur} and retry..\n\n' | ||||
| 
 | ||||
|                         f'orig_last_start_dt: {orig_last_start_dt}\n' | ||||
|                         f'dur subtracted last_start_dt: {last_start_dt}\n' | ||||
|                         f'bf_until: {backfill_until_dt}\n' | ||||
|                     ) | ||||
|                     continue | ||||
| 
 | ||||
|                 raise | ||||
| 
 | ||||
|             # broker says there never was or is no more history to pull | ||||
|             except DataUnavailable: | ||||
|                 log.warning( | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue