diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index ee9c7033..bf640c9e 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -379,6 +379,82 @@ class Client: return flat + async def get_instruments( + self, + currency: str = 'btc', + kind: str = 'option', + expired: bool = False, + expiry_date: str = None, + + ) -> list[Symbol]: + """ + Get instruments for cryptoFeed.FeedHandler. + """ + params: dict[str, str] = { + 'currency': currency.upper(), + 'kind': kind, + 'expired': expired, + } + + r: JSONRPCResult = await self._json_rpc_auth_wrapper( + 'public/get_instruments', + params, + ) + resp = r.result + response_list = [] + + for i in range(len(resp)): + element = resp[i] + name = f'{element["instrument_name"].split("-")[1]}' + if not expiry_date or name == expiry_date.upper(): + response_list.append(piker_sym_to_cb_sym(element['instrument_name'])) + + return response_list + + async def get_expiration_dates( + self, + currency: str = 'btc', + kind: str = 'option', + + ) -> list[str]: + """ + Get a dict with all expiration dates listed as value and currency as key. + """ + + params: dict[str, str] = { + 'currency': currency.upper(), + 'kind': kind, + } + + r: JSONRPCResult = await self._json_rpc_auth_wrapper( + 'public/get_expirations', + params, + ) + resp = r.result + + return resp[currency][kind] + + def get_strikes_dict( + self, + instruments: list[Symbol], + + ) -> dict[str, dict[str, Decimal | None]]: + """ + Get a dict with strike prices as keys. + """ + + response: dict[str, dict[str, Decimal | None]] = {} + + for i in range(len(instruments)): + element = instruments[i] + strike = f'{str(element).split('-')[1]}' + response[f'{strike}'] = { + 'C': None, + 'P': None, + } + + return response + async def submit_limit( self, symbol: str, @@ -759,6 +835,117 @@ async def maybe_open_price_feed( +async def aio_open_interest_feed_relay( + fh: FeedHandler, + instruments: list[Symbol], + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> None: + async def _trade( + trade: Trade, # cryptofeed, NOT ours from `.venues`! + receipt_timestamp: int, + ) -> None: + ''' + Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. + + ''' + to_trio.send_nowait(('trade', trade)) + + # trade and oi are user defined functions that + # will be called when trade and open interest updates are received + # data type is not dict, is an object: cryptofeed.types.OpenINterest + async def _oi( + oi: OpenInterest, + receipt_timestamp: int, + ) -> None: + ''' + Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side. + + ''' + symbol: Symbol = str_to_cb_sym(oi.symbol) + piker_sym: str = cb_sym_to_deribit_inst(symbol) + ( + base, + expiry_date, + strike_price, + option_type + ) = tuple( + piker_sym.split('-') + ) + msg = { + 'timestamp': oi.timestamp, + 'strike_price': strike_price, + 'option_type': option_type, + 'open_interest': Decimal(oi.open_interest), + } + to_trio.send_nowait(('oi', msg)) + + + channels = [TRADES, OPEN_INTEREST] + callbacks={TRADES: _trade, OPEN_INTEREST: _oi} + + fh.add_feed( + DERIBIT, + channels=channels, + symbols=instruments, + callbacks=callbacks + ) + + if not fh.running: + fh.run( + start_loop=False, + install_signal_handlers=False + ) + + # sync with trio + to_trio.send_nowait(None) + + # run until cancelled + await asyncio.sleep(float('inf')) + + +@acm +async def open_oi_feed( + instruments: list[Symbol], +) -> to_asyncio.LinkedTaskChannel: + + fh: FeedHandler + first: None + chan: to_asyncio.LinkedTaskChannel + async with ( + maybe_open_feed_handler() as fh, + to_asyncio.open_channel_from( + partial( + aio_open_interest_feed_relay, + fh, + instruments, + ) + ) as (first, chan) + ): + yield chan + + +@acm +async def maybe_open_oi_feed( + instruments: list[Symbol], +) -> trio.abc.ReceiveStream: + + # TODO: add a predicate to maybe_open_context + feed: to_asyncio.LinkedTaskChannel + async with maybe_open_context( + acm_func=open_oi_feed, + kwargs={ + 'instruments': instruments + }, + key=f'{instruments[0].base}', + + ) as (cache_hit, feed): + if cache_hit: + yield broadcast_receiver(feed, 10) + else: + yield feed + + # TODO, move all to `.broker` submod! # async def aio_order_feed_relay( # fh: FeedHandler,