Deribit api key changes introduce:
- `get_timestamp_int`: added this is the hack, so we can aboid use the custom deribit date format. - `get_currencies`: added so we could get all deribit's available currencies. - `get_instruments`: for a especific expiration date, it return a list of criptofeed.Symbol. - `get_expiration_dates`: expirations dates available for btc's option contracts . - `get_strikes_dict`: all the strike prices for an especific expiration date. - `aio_open_interest_feed_relay` `open_oi_feed` `maybe_open_oi_feed`: this three handles all the portal stuff and the cryptofeed callbacks for the open interest and trades, for some reason it need both to work, i need to check that out at some point. - Also a couple of format fixes.
							parent
							
								
									022432cce7
								
							
						
					
					
						commit
						7bf3749f95
					
				|  | @ -52,12 +52,14 @@ from cryptofeed import FeedHandler | |||
| from cryptofeed.defines import ( | ||||
|     DERIBIT, | ||||
|     L1_BOOK, TRADES, | ||||
|     OPTION, CALL, PUT | ||||
|     OPTION, CALL, PUT, | ||||
|     OPEN_INTEREST, | ||||
| ) | ||||
| from cryptofeed.symbols import Symbol | ||||
| from cryptofeed.types import ( | ||||
|     L1Book, | ||||
|     Trade, | ||||
|     OpenInterest, | ||||
| ) | ||||
| from piker.brokers import SymbolNotFound | ||||
| from .venues import ( | ||||
|  | @ -110,6 +112,10 @@ def deribit_timestamp(when: datetime) -> int: | |||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def get_timestamp_int(expiry_date: str) -> int: | ||||
|     return int(time.mktime(time.strptime(expiry_date, '%d%b%y'))) | ||||
| 
 | ||||
| 
 | ||||
| def str_to_cb_sym(name: str) -> Symbol: | ||||
|     base, strike_price, expiry_date, option_type = name.split('-') | ||||
| 
 | ||||
|  | @ -117,13 +123,14 @@ def str_to_cb_sym(name: str) -> Symbol: | |||
| 
 | ||||
|     if option_type == 'put': | ||||
|         option_type = PUT  | ||||
|     elif option_type  == 'call': | ||||
|     elif option_type == 'call': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
| 
 | ||||
|     new_expiry_date = get_values_from_cb_normalized_date(expiry_date) | ||||
| 
 | ||||
|     new_expiry_date: int = get_timestamp_int( | ||||
|         get_values_from_cb_normalized_date(expiry_date) | ||||
|     ) | ||||
|     return Symbol( | ||||
|         base=base, | ||||
|         quote=quote, | ||||
|  | @ -143,11 +150,12 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: | |||
|     )= tuple( | ||||
|         name.upper().split('-')) | ||||
| 
 | ||||
|     new_expiry_date = get_timestamp_int(expiry_date) | ||||
|     quote: str = base | ||||
| 
 | ||||
|     if option_type == 'P': | ||||
|     if option_type == 'P' or option_type == 'PUT': | ||||
|         option_type = PUT  | ||||
|     elif option_type == 'C': | ||||
|     elif option_type == 'C' or option_type == 'CALL': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
|  | @ -158,7 +166,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: | |||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=expiry_date | ||||
|         expiry_date=new_expiry_date | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -226,16 +234,18 @@ def get_config() -> dict[str, Any]: | |||
|         ) | ||||
| 
 | ||||
|     conf_option = section.get('option', {}) | ||||
|     section.clear # clear the dict to reuse it | ||||
|     section['deribit'] = {} | ||||
|     section['deribit']['key_id'] = conf_option.get('api_key') | ||||
|     section['deribit']['key_secret'] = conf_option.get('api_secret') | ||||
| 
 | ||||
|     section['log'] = {} | ||||
|     section['log']['filename'] = 'feedhandler.log' | ||||
|     section['log']['level'] = 'DEBUG' | ||||
| 
 | ||||
|     return section | ||||
|     conf_log = conf_option.get('log', {}) | ||||
|     return { | ||||
|         'deribit': { | ||||
|             'key_id': conf_option['key_id'], | ||||
|             'key_secret': conf_option['key_secret'], | ||||
|         }, | ||||
|         'log': { | ||||
|             'filename': conf_log['filename'], | ||||
|             'level': conf_log['level'], | ||||
|             'disabled': conf_log['disabled'], | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| class Client: | ||||
|  | @ -311,6 +321,20 @@ class Client: | |||
| 
 | ||||
|         return balances | ||||
| 
 | ||||
|     async def get_currencies( | ||||
|         self, | ||||
| 
 | ||||
|     ) -> list[dict]: | ||||
|         ''' | ||||
|         Return the set of currencies for deribit. | ||||
|         ''' | ||||
|         assets = {} | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_currencies', | ||||
|             params={} | ||||
|         ) | ||||
|         return resp.result | ||||
| 
 | ||||
|     async def get_assets( | ||||
|         self, | ||||
|         venue: str | None = None, | ||||
|  | @ -323,11 +347,7 @@ class Client: | |||
| 
 | ||||
|         ''' | ||||
|         assets = {} | ||||
|         resp = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_currencies', | ||||
|             params={} | ||||
|         ) | ||||
|         currencies: list[dict] = resp.result | ||||
|         currencies = await self.get_currencies() | ||||
|         for currency in currencies: | ||||
|             name: str = currency['currency'] | ||||
|             tx_tick: Decimal = digits_to_dec(currency['fee_precision']) | ||||
|  | @ -359,6 +379,82 @@ class Client: | |||
| 
 | ||||
|         return flat | ||||
| 
 | ||||
|     async def get_instruments( | ||||
|         self, | ||||
|         currency: str = 'btc', | ||||
|         kind: str = 'option', | ||||
|         expired: bool = False, | ||||
|         expiry_date: str = None, | ||||
| 
 | ||||
|     ) -> list[Symbol]: | ||||
|         """ | ||||
|         Get instruments for cryptoFeed.FeedHandler. | ||||
|         """ | ||||
|         params: dict[str, str] = { | ||||
|             'currency': currency.upper(), | ||||
|             'kind': kind, | ||||
|             'expired': expired, | ||||
|         } | ||||
| 
 | ||||
|         r: JSONRPCResult = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_instruments', | ||||
|             params, | ||||
|         ) | ||||
|         resp = r.result | ||||
|         response_list = [] | ||||
| 
 | ||||
|         for i in range(len(resp)): | ||||
|             element = resp[i] | ||||
|             name = f'{element["instrument_name"].split("-")[1]}' | ||||
|             if not expiry_date or name == expiry_date.upper(): | ||||
|                 response_list.append(piker_sym_to_cb_sym(element['instrument_name'])) | ||||
| 
 | ||||
|         return response_list | ||||
| 
 | ||||
|     async def get_expiration_dates( | ||||
|         self, | ||||
|         currency: str = 'btc', | ||||
|         kind: str = 'option', | ||||
| 
 | ||||
|     ) ->  list[str]: | ||||
|         """ | ||||
|         Get a dict with all expiration dates listed as value and currency as key. | ||||
|         """ | ||||
|          | ||||
|         params: dict[str, str] = { | ||||
|             'currency': currency.upper(), | ||||
|             'kind': kind, | ||||
|         } | ||||
| 
 | ||||
|         r: JSONRPCResult = await self._json_rpc_auth_wrapper( | ||||
|             'public/get_expirations', | ||||
|             params, | ||||
|         ) | ||||
|         resp = r.result | ||||
| 
 | ||||
|         return resp[currency][kind] | ||||
| 
 | ||||
|     def get_strikes_dict( | ||||
|         self, | ||||
|         instruments: list[Symbol], | ||||
| 
 | ||||
|     ) -> dict[str, dict[str, Decimal | None]]: | ||||
|         """ | ||||
|         Get a dict with strike prices as keys. | ||||
|         """ | ||||
| 
 | ||||
|         response: dict[str, dict[str, Decimal | None]] = {} | ||||
| 
 | ||||
|         for i in range(len(instruments)): | ||||
|             element = instruments[i] | ||||
|             strike = f'{str(element).split('-')[1]}' | ||||
|             response[f'{strike}'] = { | ||||
|                 'C': None, | ||||
|                 'P': None, | ||||
|             } | ||||
| 
 | ||||
|         return response | ||||
| 
 | ||||
|     async def submit_limit( | ||||
|         self, | ||||
|         symbol: str, | ||||
|  | @ -648,6 +744,38 @@ async def aio_price_feed_relay( | |||
|     the `piker`-side `trio.task` consumers for delivery to consumer | ||||
|     sub-actors for various subsystems. | ||||
| 
 | ||||
|     ''' | ||||
|     async def _trade( | ||||
|         trade: Trade,  # cryptofeed, NOT ours from `.venues`! | ||||
|         receipt_timestamp: int, | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. | ||||
| 
 | ||||
|         ''' | ||||
|         to_trio.send_nowait(('trade', trade)) | ||||
| 
 | ||||
|     async def _l1( | ||||
|         book: L1Book, | ||||
|         receipt_timestamp: int, | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Relay-thru "l1 book" updates. | ||||
| 
 | ||||
|         ''' | ||||
| 
 | ||||
|         to_trio.send_nowait(('l1', book)) | ||||
| 
 | ||||
|         # TODO, make this work! | ||||
|         # -[ ] why isn't this working in `tractor.pause_from_sync()`?? | ||||
|         # breakpoint() | ||||
| 
 | ||||
|     sym: Symbol = piker_sym_to_cb_sym(instrument) | ||||
|     ''' | ||||
|     Relay price feed quotes from the `cryptofeed.FeedHandler` to | ||||
|     the `piker`-side `trio.task` consumers for delivery to consumer | ||||
|     sub-actors for various subsystems. | ||||
| 
 | ||||
|     ''' | ||||
|     async def _trade( | ||||
|         trade: Trade,  # cryptofeed, NOT ours from `.venues`! | ||||
|  | @ -679,6 +807,7 @@ async def aio_price_feed_relay( | |||
|         DERIBIT, | ||||
|         channels=[TRADES, L1_BOOK], | ||||
|         symbols=[sym], | ||||
|         symbols=[sym], | ||||
|         callbacks={ | ||||
|             TRADES: _trade, | ||||
|             L1_BOOK: _l1 | ||||
|  | @ -689,10 +818,13 @@ async def aio_price_feed_relay( | |||
|             start_loop=False, | ||||
|             install_signal_handlers=False | ||||
|         ) | ||||
|             install_signal_handlers=False | ||||
|         ) | ||||
| 
 | ||||
|     # sync with trio | ||||
|     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     # run until cancelled | ||||
|     # run until cancelled | ||||
|     await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
|  | @ -702,6 +834,14 @@ async def open_price_feed( | |||
|     instrument: str | ||||
| ) -> to_asyncio.LinkedTaskChannel: | ||||
| 
 | ||||
|     fh: FeedHandler | ||||
|     first: None | ||||
|     chan: to_asyncio.LinkedTaskChannel | ||||
|     async with ( | ||||
|         maybe_open_feed_handler() as fh, | ||||
|         to_asyncio.open_channel_from( | ||||
| ) -> to_asyncio.LinkedTaskChannel: | ||||
| 
 | ||||
|     fh: FeedHandler | ||||
|     first: None | ||||
|     chan: to_asyncio.LinkedTaskChannel | ||||
|  | @ -709,28 +849,34 @@ async def open_price_feed( | |||
|         maybe_open_feed_handler() as fh, | ||||
|         to_asyncio.open_channel_from( | ||||
|             partial( | ||||
|                 aio_price_feed_relay, | ||||
|                 aio_open_interest_feed_relay, | ||||
|                 fh, | ||||
|                 instrument | ||||
|                 instruments, | ||||
|             ) | ||||
|         ) as (first, chan) | ||||
|     ): | ||||
|         yield chan | ||||
|         ) as (first, chan) | ||||
|     ): | ||||
|         yield chan | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_price_feed( | ||||
|     instrument: str | ||||
| async def maybe_open_oi_feed( | ||||
|     instruments: list[Symbol],  | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     # TODO: add a predicate to maybe_open_context | ||||
|     feed: to_asyncio.LinkedTaskChannel | ||||
|     feed: to_asyncio.LinkedTaskChannel | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_price_feed, | ||||
|         acm_func=open_oi_feed, | ||||
|         kwargs={ | ||||
|             'instrument': instrument.split('.')[0] | ||||
|             'instrument': instrument.split('.')[0] | ||||
|         }, | ||||
|         key=f'{instrument.split('.')[0]}-price', | ||||
|         key=f'{instrument.split('.')[0]}-price', | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|  | @ -738,6 +884,116 @@ async def maybe_open_price_feed( | |||
|             yield feed | ||||
| 
 | ||||
| 
 | ||||
| async def aio_open_interest_feed_relay( | ||||
|     fh: FeedHandler, | ||||
|     instruments: list[Symbol], | ||||
|     from_trio: asyncio.Queue, | ||||
|     to_trio: trio.abc.SendChannel, | ||||
| ) -> None: | ||||
|     async def _trade( | ||||
|         trade: Trade,  # cryptofeed, NOT ours from `.venues`! | ||||
|         receipt_timestamp: int, | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. | ||||
| 
 | ||||
|         ''' | ||||
|         to_trio.send_nowait(('trade', trade)) | ||||
| 
 | ||||
| 	# trade and oi are user defined functions that | ||||
| 	# will be called when trade and open interest updates are received | ||||
| 	# data type is not dict, is an object: cryptofeed.types.OpenINterest | ||||
|     async def _oi( | ||||
|         oi: OpenInterest, | ||||
|         receipt_timestamp: int, | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side. | ||||
| 
 | ||||
|         ''' | ||||
|         symbol: Symbol = str_to_cb_sym(oi.symbol) | ||||
|         piker_sym: str = cb_sym_to_deribit_inst(symbol) | ||||
|         ( | ||||
|             base, | ||||
|             expiry_date, | ||||
|             strike_price, | ||||
|             option_type | ||||
|         ) = tuple( | ||||
|             piker_sym.split('-') | ||||
|         ) | ||||
|         msg = { | ||||
|             'timestamp': oi.timestamp, | ||||
|             'strike_price': strike_price, | ||||
|             'option_type': option_type, | ||||
|             'open_interest': Decimal(oi.open_interest), | ||||
|         } | ||||
|         to_trio.send_nowait(('oi', msg)) | ||||
| 
 | ||||
| 
 | ||||
|     channels = [TRADES, OPEN_INTEREST] | ||||
|     callbacks={TRADES: _trade, OPEN_INTEREST: _oi} | ||||
| 
 | ||||
|     fh.add_feed( | ||||
|         DERIBIT, | ||||
|         channels=channels, | ||||
|         symbols=instruments, | ||||
|         callbacks=callbacks | ||||
|     ) | ||||
| 
 | ||||
|     if not fh.running: | ||||
|         fh.run( | ||||
|             start_loop=False, | ||||
|             install_signal_handlers=False | ||||
|         ) | ||||
| 
 | ||||
|     # sync with trio | ||||
|     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     # run until cancelled | ||||
|     await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_oi_feed( | ||||
|     instruments: list[Symbol],  | ||||
| ) -> to_asyncio.LinkedTaskChannel: | ||||
| 
 | ||||
|     fh: FeedHandler | ||||
|     first: None | ||||
|     chan: to_asyncio.LinkedTaskChannel | ||||
|     async with ( | ||||
|         maybe_open_feed_handler() as fh, | ||||
|         to_asyncio.open_channel_from( | ||||
|             partial( | ||||
|                 aio_open_interest_feed_relay, | ||||
|                 fh, | ||||
|                 instruments, | ||||
|             ) | ||||
|         ) as (first, chan) | ||||
|     ): | ||||
|         yield chan | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_oi_feed( | ||||
|     instruments: list[Symbol],  | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     # TODO: add a predicate to maybe_open_context | ||||
|     feed: to_asyncio.LinkedTaskChannel | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_oi_feed, | ||||
|         kwargs={ | ||||
|             'instruments': instruments | ||||
|         }, | ||||
|         key=f'{instruments[0].base}', | ||||
| 
 | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|         else: | ||||
|             yield feed | ||||
| 
 | ||||
| 
 | ||||
| # TODO, move all to `.broker` submod! | ||||
| # async def aio_order_feed_relay( | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue