diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index e32e31d2..b3a202ba 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -18,56 +18,65 @@ Deribit backend. ''' +from __future__ import annotations from contextlib import asynccontextmanager as acm from datetime import datetime -from typing import Any, Optional, Callable -from pprint import pformat +from typing import ( + # Any, + # Optional, + Callable, +) +# from pprint import pformat import time import trio from trio_typing import TaskStatus from pendulum import ( from_timestamp, - now, ) -from rapidfuzz import process as fuzzy import numpy as np import tractor from piker.accounting import ( + Asset, MktPair, unpack_fqme, ) from piker.brokers import ( open_cached_client, NoData, + DataUnavailable, ) from piker._cacheables import ( async_lifo_cache, ) -from piker.log import get_logger, get_console_log -from piker.data import ShmArray +from piker.log import ( + get_logger, +) from piker.data.validate import FeedInit -from piker.brokers._util import ( - BrokerError, - DataUnavailable, -) -from cryptofeed import FeedHandler -from cryptofeed.defines import ( - DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT -) -from cryptofeed.symbols import Symbol +# from cryptofeed import FeedHandler +# from cryptofeed.defines import ( +# DERIBIT, +# L1_BOOK, +# TRADES, +# OPTION, +# CALL, +# PUT, +# ) +# from cryptofeed.symbols import Symbol from .api import ( - Client, Trade, - get_config, - piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + Client, + # get_config, + piker_sym_to_cb_sym, + cb_sym_to_deribit_inst, maybe_open_price_feed ) from .venues import ( Pair, OptionPair, + Trade, ) _spawn_kwargs = { @@ -86,6 +95,10 @@ async def open_history_client( # TODO implement history getter for the new storage layer. async with open_cached_client('deribit') as client: + pair: OptionPair = client._pairs[mkt.dst.name] + # XXX NOTE, the cuckers use ms !!! + creation_time_s: int = pair.creation_timestamp/1000 + async def get_ohlc( timeframe: float, end_dt: datetime | None = None, @@ -105,6 +118,31 @@ async def open_history_client( end_dt=end_dt, ) if len(array) == 0: + if ( + end_dt is None + ): + raise DataUnavailable( + 'No history seems to exist yet?\n\n' + f'{mkt}' + ) + elif ( + end_dt + and + end_dt.timestamp() < creation_time_s + ): + # the contract can't have history + # before it was created. + pair_type_str: str = type(pair).__name__ + create_dt: datetime = from_timestamp(creation_time_s) + raise DataUnavailable( + f'No history prior to\n' + f'`{pair_type_str}.creation_timestamp: int = ' + f'{pair.creation_timestamp}\n\n' + f'------ deribit sux ------\n' + f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n' + f'creation_time_s: {creation_time_s}\n' + f'create_dt: {create_dt}\n' + ) raise NoData( f'No frame for {start_dt} -> {end_dt}\n' ) @@ -126,14 +164,20 @@ async def open_history_client( return array, start_dt, end_dt - yield get_ohlc, {'erlangs': 3, 'rate': 3} + yield ( + get_ohlc, + { # backfill config + 'erlangs': 3, + 'rate': 3, + } + ) @async_lifo_cache() async def get_mkt_info( fqme: str, -) -> tuple[MktPair, Pair] | None: +) -> tuple[MktPair, Pair|OptionPair] | None: # uppercase since kraken bs_mktid is always upper if 'deribit' not in fqme.lower(): @@ -149,7 +193,7 @@ async def get_mkt_info( # returns, always! expiry: str = expiry.upper() venue: str = venue.upper() - venue_lower: str = venue.lower() + # venue_lower: str = venue.lower() mkt_mode: str = 'option' @@ -195,8 +239,6 @@ async def stream_quotes( task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) sym = symbols[0].split('.')[0] @@ -217,7 +259,8 @@ async def stream_quotes( async with maybe_open_price_feed(sym) as stream: - cache = client._pairs + # TODO, uhh use it ?? XD + # cache = client._pairs last_trades = (await client.last_trades( cb_sym_to_deribit_inst(nsym), count=1)).trades @@ -247,9 +290,16 @@ async def stream_quotes( feed_is_live.set() + # deliver until cancelled async for typ, quote in stream: - topic = quote['symbol'] - await send_chan.send({topic: quote}) + topic: str = quote['symbol'] + log.info( + f'deribit {typ!r} quote\n\n' + f'{quote}\n' + ) + await send_chan.send({ + topic: quote, + }) @tractor.context @@ -259,13 +309,14 @@ async def open_symbol_search( async with open_cached_client('deribit') as client: # load all symbols locally for fast search - cache = client._pairs + # cache = client._pairs await ctx.started() async with ctx.open_stream() as stream: pattern: str async for pattern in stream: + # NOTE: pattern fuzzy-matching is done within # the methd impl. pairs: dict[str, Pair] = await client.search_symbols(