# piker: trading gear for hackers # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' Deribit backend. ''' from __future__ import annotations from contextlib import asynccontextmanager as acm from datetime import datetime from typing import ( # Any, # Optional, Callable, ) # from pprint import pformat import time import trio from trio_typing import TaskStatus from pendulum import ( from_timestamp, ) import numpy as np import tractor from piker.accounting import ( Asset, MktPair, unpack_fqme, ) from piker.brokers import ( open_cached_client, NoData, DataUnavailable, ) from piker._cacheables import ( async_lifo_cache, ) from piker.log import ( get_logger, ) from piker.data.validate import FeedInit # from cryptofeed import FeedHandler # from cryptofeed.defines import ( # DERIBIT, # L1_BOOK, # TRADES, # OPTION, # CALL, # PUT, # ) # from cryptofeed.symbols import Symbol from .api import ( Client, # get_config, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, maybe_open_price_feed ) from .venues import ( Pair, OptionPair, Trade, ) _spawn_kwargs = { 'infect_asyncio': True, } log = get_logger(__name__) @acm async def open_history_client( mkt: MktPair, ) -> tuple[Callable, int]: # TODO implement history getter for the new storage layer. async with open_cached_client('deribit') as client: pair: OptionPair = client._pairs[mkt.dst.name] # XXX NOTE, the cuckers use ms !!! creation_time_s: int = pair.creation_timestamp/1000 async def get_ohlc( timeframe: float, end_dt: datetime | None = None, start_dt: datetime | None = None, ) -> tuple[ np.ndarray, datetime, # start datetime, # end ]: if timeframe != 60: raise DataUnavailable('Only 1m bars are supported') array: np.ndarray = await client.bars( mkt, start_dt=start_dt, end_dt=end_dt, ) if len(array) == 0: if ( end_dt is None ): raise DataUnavailable( 'No history seems to exist yet?\n\n' f'{mkt}' ) elif ( end_dt and end_dt.timestamp() < creation_time_s ): # the contract can't have history # before it was created. pair_type_str: str = type(pair).__name__ create_dt: datetime = from_timestamp(creation_time_s) raise DataUnavailable( f'No history prior to\n' f'`{pair_type_str}.creation_timestamp: int = ' f'{pair.creation_timestamp}\n\n' f'------ deribit sux ------\n' f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n' f'creation_time_s: {creation_time_s}\n' f'create_dt: {create_dt}\n' ) raise NoData( f'No frame for {start_dt} -> {end_dt}\n' ) start_dt = from_timestamp(array[0]['time']) end_dt = from_timestamp(array[-1]['time']) times = array['time'] if not times.any(): raise ValueError( 'Bad frame with null-times?\n\n' f'{times}' ) if end_dt is None: inow: int = round(time.time()) if (inow - times[-1]) > 60: await tractor.pause() return array, start_dt, end_dt yield ( get_ohlc, { # backfill config 'erlangs': 3, 'rate': 3, } ) @async_lifo_cache() async def get_mkt_info( fqme: str, ) -> tuple[MktPair, Pair|OptionPair] | None: # uppercase since kraken bs_mktid is always upper if 'deribit' not in fqme.lower(): fqme += '.deribit' mkt_mode: str = '' broker, mkt_ep, venue, expiry = unpack_fqme(fqme) # NOTE: we always upper case all tokens to be consistent with # binance's symbology style for pairs, like `BTCUSDT`, but in # theory we could also just keep things lower case; as long as # we're consistent and the symcache matches whatever this func # returns, always! expiry: str = expiry.upper() venue: str = venue.upper() # venue_lower: str = venue.lower() mkt_mode: str = 'option' async with open_cached_client( 'deribit', ) as client: assets: dict[str, Asset] = await client.get_assets() pair_str: str = mkt_ep.lower() pair: Pair = await client.exch_info( sym=pair_str, ) mkt_mode = pair.venue client.mkt_mode = mkt_mode dst: Asset | None = assets.get(pair.bs_dst_asset) src: Asset | None = assets.get(pair.bs_src_asset) mkt = MktPair( dst=dst, src=src, price_tick=pair.price_tick, size_tick=pair.size_tick, bs_mktid=pair.symbol, expiry=pair.expiry, venue=mkt_mode, broker='deribit', _atype=mkt_mode, _fqme_without_src=True, ) return mkt, pair async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, loglevel: str = None, # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: sym = symbols[0].split('.')[0] init_msgs: list[FeedInit] = [] async with ( open_cached_client('deribit') as client, send_chan as send_chan ): mkt, pair = await get_mkt_info(sym) # build out init msgs according to latest spec init_msgs.append( FeedInit(mkt_info=mkt) ) nsym = piker_sym_to_cb_sym(sym) async with maybe_open_price_feed(sym) as stream: # TODO, uhh use it ?? XD # cache = client._pairs last_trades = (await client.last_trades( cb_sym_to_deribit_inst(nsym), count=1)).trades if len(last_trades) == 0: last_trade = None async for typ, quote in stream: if typ == 'trade': last_trade = Trade(**(quote['data'])) break else: last_trade = Trade(**(last_trades[0])) first_quote = { 'symbol': sym, 'last': last_trade.price, 'brokerd_ts': last_trade.timestamp, 'ticks': [{ 'type': 'trade', 'price': last_trade.price, 'size': last_trade.amount, 'broker_ts': last_trade.timestamp }] } task_status.started((init_msgs, first_quote)) feed_is_live.set() # deliver until cancelled async for typ, quote in stream: topic: str = quote['symbol'] log.info( f'deribit {typ!r} quote\n\n' f'{quote}\n' ) await send_chan.send({ topic: quote, }) @tractor.context async def open_symbol_search( ctx: tractor.Context, ) -> Client: async with open_cached_client('deribit') as client: # load all symbols locally for fast search # cache = client._pairs await ctx.started() async with ctx.open_stream() as stream: pattern: str async for pattern in stream: # NOTE: pattern fuzzy-matching is done within # the methd impl. pairs: dict[str, Pair] = await client.search_symbols( pattern, ) # repack in fqme-keyed table byfqme: dict[str, Pair] = {} for pair in pairs.values(): byfqme[pair.bs_fqme] = pair await stream.send(byfqme)