Signal hist start using `OptionPair.creation_timestamp`

Such that the `get_hist()` query func raises `DataUnavailable` with an
explicit message regarding the start of the (option) contract's
lifetime.

Other,
- mask some unused imports (for now?)
- drop a duplicate `tractor.get_console_log()` call which was causing
  duplicate console emits (it's already setup by brokerd init now).
- comment various unused code bits i found.
- add a info log around live quotes so we can see for the moment when
  they actually occur.. XD
fix_deribit_hist_queries
Tyler Goodlet 2024-11-19 17:45:39 -05:00
parent dc2c379d86
commit b734245183
1 changed files with 78 additions and 27 deletions

View File

@ -18,56 +18,65 @@
Deribit backend. Deribit backend.
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import Any, Optional, Callable from typing import (
from pprint import pformat # Any,
# Optional,
Callable,
)
# from pprint import pformat
import time import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
now,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.accounting import ( from piker.accounting import (
Asset,
MktPair, MktPair,
unpack_fqme, unpack_fqme,
) )
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData, NoData,
DataUnavailable,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
) )
from piker.log import get_logger, get_console_log from piker.log import (
from piker.data import ShmArray get_logger,
)
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from cryptofeed import FeedHandler # from cryptofeed import FeedHandler
from cryptofeed.defines import ( # from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT # DERIBIT,
) # L1_BOOK,
from cryptofeed.symbols import Symbol # TRADES,
# OPTION,
# CALL,
# PUT,
# )
# from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Trade, Client,
get_config, # get_config,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst, piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import ( from .venues import (
Pair, Pair,
OptionPair, OptionPair,
Trade,
) )
_spawn_kwargs = { _spawn_kwargs = {
@ -86,6 +95,10 @@ async def open_history_client(
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc( async def get_ohlc(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
@ -105,6 +118,31 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData( raise NoData(
f'No frame for {start_dt} -> {end_dt}\n' f'No frame for {start_dt} -> {end_dt}\n'
) )
@ -126,14 +164,20 @@ async def open_history_client(
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache() @async_lifo_cache()
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[MktPair, Pair] | None: ) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper # uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower(): if 'deribit' not in fqme.lower():
@ -149,7 +193,7 @@ async def get_mkt_info(
# returns, always! # returns, always!
expiry: str = expiry.upper() expiry: str = expiry.upper()
venue: str = venue.upper() venue: str = venue.upper()
venue_lower: str = venue.lower() # venue_lower: str = venue.lower()
mkt_mode: str = 'option' mkt_mode: str = 'option'
@ -195,8 +239,6 @@ async def stream_quotes(
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
@ -217,7 +259,8 @@ async def stream_quotes(
async with maybe_open_price_feed(sym) as stream: async with maybe_open_price_feed(sym) as stream:
cache = client._pairs # TODO, uhh use it ?? XD
# cache = client._pairs
last_trades = (await client.last_trades( last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades cb_sym_to_deribit_inst(nsym), count=1)).trades
@ -247,9 +290,16 @@ async def stream_quotes(
feed_is_live.set() feed_is_live.set()
# deliver until cancelled
async for typ, quote in stream: async for typ, quote in stream:
topic = quote['symbol'] topic: str = quote['symbol']
await send_chan.send({topic: quote}) log.info(
f'deribit {typ!r} quote\n\n'
f'{quote}\n'
)
await send_chan.send({
topic: quote,
})
@tractor.context @tractor.context
@ -259,13 +309,14 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
cache = client._pairs # cache = client._pairs
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str pattern: str
async for pattern in stream: async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within # NOTE: pattern fuzzy-matching is done within
# the methd impl. # the methd impl.
pairs: dict[str, Pair] = await client.search_symbols( pairs: dict[str, Pair] = await client.search_symbols(