Signal hist start using `OptionPair.creation_timestamp`
Such that the `get_hist()` query func raises `DataUnavailable` with an explicit message regarding the start of the (option) contract's lifetime. Other, - mask some unused imports (for now?) - drop a duplicate `tractor.get_console_log()` call which was causing duplicate console emits (it's already setup by brokerd init now). - comment various unused code bits i found. - add a info log around live quotes so we can see for the moment when they actually occur.. XDpull/20/head
							parent
							
								
									27c800f5c4
								
							
						
					
					
						commit
						7d93acff6c
					
				|  | @ -18,56 +18,65 @@ | ||||||
| Deribit backend. | Deribit backend. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
|  | from __future__ import annotations | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
| from datetime import datetime | from datetime import datetime | ||||||
| from typing import Any, Optional, Callable | from typing import ( | ||||||
| from pprint import pformat |     # Any, | ||||||
|  |     # Optional, | ||||||
|  |     Callable, | ||||||
|  | ) | ||||||
|  | # from pprint import pformat | ||||||
| import time | import time | ||||||
| 
 | 
 | ||||||
| import trio | import trio | ||||||
| from trio_typing import TaskStatus | from trio_typing import TaskStatus | ||||||
| from pendulum import ( | from pendulum import ( | ||||||
|     from_timestamp, |     from_timestamp, | ||||||
|     now, |  | ||||||
| ) | ) | ||||||
| from rapidfuzz import process as fuzzy |  | ||||||
| import numpy as np | import numpy as np | ||||||
| import tractor | import tractor | ||||||
| 
 | 
 | ||||||
| from piker.accounting import ( | from piker.accounting import ( | ||||||
|  |     Asset, | ||||||
|     MktPair, |     MktPair, | ||||||
|     unpack_fqme, |     unpack_fqme, | ||||||
| ) | ) | ||||||
| from piker.brokers import ( | from piker.brokers import ( | ||||||
|     open_cached_client, |     open_cached_client, | ||||||
|     NoData, |     NoData, | ||||||
|  |     DataUnavailable, | ||||||
| ) | ) | ||||||
| from piker._cacheables import ( | from piker._cacheables import ( | ||||||
|     async_lifo_cache, |     async_lifo_cache, | ||||||
| ) | ) | ||||||
| from piker.log import get_logger, get_console_log | from piker.log import ( | ||||||
| from piker.data import ShmArray |     get_logger, | ||||||
|  | ) | ||||||
| from piker.data.validate import FeedInit | from piker.data.validate import FeedInit | ||||||
| from piker.brokers._util import ( |  | ||||||
|     BrokerError, |  | ||||||
|     DataUnavailable, |  | ||||||
| ) |  | ||||||
| 
 | 
 | ||||||
| from cryptofeed import FeedHandler | # from cryptofeed import FeedHandler | ||||||
| from cryptofeed.defines import ( | # from cryptofeed.defines import ( | ||||||
|     DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT | #     DERIBIT, | ||||||
| ) | #     L1_BOOK, | ||||||
| from cryptofeed.symbols import Symbol | #     TRADES, | ||||||
|  | #     OPTION, | ||||||
|  | #     CALL, | ||||||
|  | #     PUT, | ||||||
|  | # ) | ||||||
|  | # from cryptofeed.symbols import Symbol | ||||||
| 
 | 
 | ||||||
| from .api import ( | from .api import ( | ||||||
|     Client, Trade, |     Client, | ||||||
|     get_config, |     # get_config, | ||||||
|     piker_sym_to_cb_sym, cb_sym_to_deribit_inst, |     piker_sym_to_cb_sym, | ||||||
|  |     cb_sym_to_deribit_inst, | ||||||
|     maybe_open_price_feed |     maybe_open_price_feed | ||||||
| ) | ) | ||||||
| from .venues import ( | from .venues import ( | ||||||
|     Pair, |     Pair, | ||||||
|     OptionPair, |     OptionPair, | ||||||
|  |     Trade, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| _spawn_kwargs = { | _spawn_kwargs = { | ||||||
|  | @ -86,6 +95,10 @@ async def open_history_client( | ||||||
|     # TODO implement history getter for the new storage layer. |     # TODO implement history getter for the new storage layer. | ||||||
|     async with open_cached_client('deribit') as client: |     async with open_cached_client('deribit') as client: | ||||||
| 
 | 
 | ||||||
|  |         pair: OptionPair = client._pairs[mkt.dst.name] | ||||||
|  |         # XXX NOTE, the cuckers use ms !!! | ||||||
|  |         creation_time_s: int = pair.creation_timestamp/1000 | ||||||
|  | 
 | ||||||
|         async def get_ohlc( |         async def get_ohlc( | ||||||
|             timeframe: float, |             timeframe: float, | ||||||
|             end_dt: datetime | None = None, |             end_dt: datetime | None = None, | ||||||
|  | @ -105,6 +118,31 @@ async def open_history_client( | ||||||
|                 end_dt=end_dt, |                 end_dt=end_dt, | ||||||
|             ) |             ) | ||||||
|             if len(array) == 0: |             if len(array) == 0: | ||||||
|  |                 if ( | ||||||
|  |                     end_dt is None | ||||||
|  |                 ): | ||||||
|  |                     raise DataUnavailable( | ||||||
|  |                         'No history seems to exist yet?\n\n' | ||||||
|  |                         f'{mkt}' | ||||||
|  |                     ) | ||||||
|  |                 elif ( | ||||||
|  |                     end_dt | ||||||
|  |                     and | ||||||
|  |                     end_dt.timestamp() < creation_time_s | ||||||
|  |                 ): | ||||||
|  |                     # the contract can't have history | ||||||
|  |                     # before it was created. | ||||||
|  |                     pair_type_str: str = type(pair).__name__ | ||||||
|  |                     create_dt: datetime = from_timestamp(creation_time_s) | ||||||
|  |                     raise DataUnavailable( | ||||||
|  |                         f'No history prior to\n' | ||||||
|  |                         f'`{pair_type_str}.creation_timestamp: int = ' | ||||||
|  |                         f'{pair.creation_timestamp}\n\n' | ||||||
|  |                         f'------ deribit sux ------\n' | ||||||
|  |                         f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n' | ||||||
|  |                         f'creation_time_s: {creation_time_s}\n' | ||||||
|  |                         f'create_dt: {create_dt}\n' | ||||||
|  |                     ) | ||||||
|                 raise NoData( |                 raise NoData( | ||||||
|                     f'No frame for {start_dt} -> {end_dt}\n' |                     f'No frame for {start_dt} -> {end_dt}\n' | ||||||
|                 ) |                 ) | ||||||
|  | @ -126,14 +164,20 @@ async def open_history_client( | ||||||
| 
 | 
 | ||||||
|             return array, start_dt, end_dt |             return array, start_dt, end_dt | ||||||
| 
 | 
 | ||||||
|         yield get_ohlc, {'erlangs': 3, 'rate': 3} |         yield ( | ||||||
|  |             get_ohlc, | ||||||
|  |             {  # backfill config | ||||||
|  |                 'erlangs': 3, | ||||||
|  |                 'rate': 3, | ||||||
|  |             } | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @async_lifo_cache() | @async_lifo_cache() | ||||||
| async def get_mkt_info( | async def get_mkt_info( | ||||||
|     fqme: str, |     fqme: str, | ||||||
| 
 | 
 | ||||||
| ) -> tuple[MktPair, Pair] | None: | ) -> tuple[MktPair, Pair|OptionPair] | None: | ||||||
| 
 | 
 | ||||||
|     # uppercase since kraken bs_mktid is always upper |     # uppercase since kraken bs_mktid is always upper | ||||||
|     if 'deribit' not in fqme.lower(): |     if 'deribit' not in fqme.lower(): | ||||||
|  | @ -149,7 +193,7 @@ async def get_mkt_info( | ||||||
|     # returns, always! |     # returns, always! | ||||||
|     expiry: str = expiry.upper() |     expiry: str = expiry.upper() | ||||||
|     venue: str = venue.upper() |     venue: str = venue.upper() | ||||||
|     venue_lower: str = venue.lower() |     # venue_lower: str = venue.lower() | ||||||
| 
 | 
 | ||||||
|     mkt_mode: str = 'option' |     mkt_mode: str = 'option' | ||||||
| 
 | 
 | ||||||
|  | @ -195,8 +239,6 @@ async def stream_quotes( | ||||||
|     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, |     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     # XXX: required to propagate ``tractor`` loglevel to piker logging |  | ||||||
|     get_console_log(loglevel or tractor.current_actor().loglevel) |  | ||||||
| 
 | 
 | ||||||
|     sym = symbols[0].split('.')[0] |     sym = symbols[0].split('.')[0] | ||||||
| 
 | 
 | ||||||
|  | @ -217,7 +259,8 @@ async def stream_quotes( | ||||||
| 
 | 
 | ||||||
|         async with maybe_open_price_feed(sym) as stream: |         async with maybe_open_price_feed(sym) as stream: | ||||||
| 
 | 
 | ||||||
|             cache = client._pairs |             # TODO, uhh use it ?? XD | ||||||
|  |             # cache = client._pairs | ||||||
| 
 | 
 | ||||||
|             last_trades = (await client.last_trades( |             last_trades = (await client.last_trades( | ||||||
|                 cb_sym_to_deribit_inst(nsym), count=1)).trades |                 cb_sym_to_deribit_inst(nsym), count=1)).trades | ||||||
|  | @ -247,9 +290,16 @@ async def stream_quotes( | ||||||
| 
 | 
 | ||||||
|             feed_is_live.set() |             feed_is_live.set() | ||||||
| 
 | 
 | ||||||
|  |             # deliver until cancelled | ||||||
|             async for typ, quote in stream: |             async for typ, quote in stream: | ||||||
|                 topic = quote['symbol'] |                 topic: str = quote['symbol'] | ||||||
|                 await send_chan.send({topic: quote}) |                 log.info( | ||||||
|  |                     f'deribit {typ!r} quote\n\n' | ||||||
|  |                     f'{quote}\n' | ||||||
|  |                 ) | ||||||
|  |                 await send_chan.send({ | ||||||
|  |                     topic: quote, | ||||||
|  |                 }) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @tractor.context | @tractor.context | ||||||
|  | @ -259,13 +309,14 @@ async def open_symbol_search( | ||||||
|     async with open_cached_client('deribit') as client: |     async with open_cached_client('deribit') as client: | ||||||
| 
 | 
 | ||||||
|         # load all symbols locally for fast search |         # load all symbols locally for fast search | ||||||
|         cache = client._pairs |         # cache = client._pairs | ||||||
|         await ctx.started() |         await ctx.started() | ||||||
| 
 | 
 | ||||||
|         async with ctx.open_stream() as stream: |         async with ctx.open_stream() as stream: | ||||||
| 
 | 
 | ||||||
|             pattern: str |             pattern: str | ||||||
|             async for pattern in stream: |             async for pattern in stream: | ||||||
|  | 
 | ||||||
|                 # NOTE: pattern fuzzy-matching is done within |                 # NOTE: pattern fuzzy-matching is done within | ||||||
|                 # the methd impl. |                 # the methd impl. | ||||||
|                 pairs: dict[str, Pair] = await client.search_symbols( |                 pairs: dict[str, Pair] = await client.search_symbols( | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue