Factor `TimeseriesNotFound` to top level

TO CHERRY into #486
distribute_dis
Tyler Goodlet 2023-12-07 12:31:12 -05:00
parent dd0167b9a5
commit b9af6176c5
3 changed files with 24 additions and 14 deletions

View File

@ -57,6 +57,7 @@ from ._sampling import (
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
from ..storage import TimeseriesNotFound
if TYPE_CHECKING: if TYPE_CHECKING:
from bidict import bidict from bidict import bidict
@ -690,13 +691,18 @@ async def tsdb_backfill(
# but if not then below the remaining history can be lazy # but if not then below the remaining history can be lazy
# loaded? # loaded?
fqme: str = mkt.fqme fqme: str = mkt.fqme
last_tsdb_dt: datetime | None = None
try:
tsdb_entry: tuple | None = await storage.load( tsdb_entry: tuple | None = await storage.load(
fqme, fqme,
timeframe=timeframe, timeframe=timeframe,
) )
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {fqme}'
)
last_tsdb_dt: datetime | None = None else:
if tsdb_entry:
( (
tsdb_history, tsdb_history,
first_tsdb_dt, first_tsdb_dt,
@ -963,7 +969,8 @@ async def manage_history(
sub_for_broadcasts=False, sub_for_broadcasts=False,
) as sample_stream: ) as sample_stream:
# register 1s and 1m buffers with the global incrementer task # register 1s and 1m buffers with the global
# incrementer task
log.info(f'Connected to sampler stream: {sample_stream}') log.info(f'Connected to sampler stream: {sample_stream}')
for timeframe in [60, 1]: for timeframe in [60, 1]:

View File

@ -139,6 +139,13 @@ class StorageClient(
... ...
class TimeseriesNotFound(Exception):
'''
No timeseries entry can be found for this backend.
'''
class StorageConnectionError(ConnectionError): class StorageConnectionError(ConnectionError):
''' '''
Can't connect to the desired tsdb subsys/service. Can't connect to the desired tsdb subsys/service.

View File

@ -19,7 +19,8 @@
call a poor man's tsdb). call a poor man's tsdb).
AKA a `piker`-native file-system native "time series database" AKA a `piker`-native file-system native "time series database"
without needing an extra process and no standard TSDB features, YET! without needing an extra process and no standard TSDB features,
YET!
''' '''
# TODO: like there's soo much.. # TODO: like there's soo much..
@ -67,17 +68,12 @@ from piker import config
from piker.data import def_iohlcv_fields from piker.data import def_iohlcv_fields
from piker.data import ShmArray from piker.data import ShmArray
from piker.log import get_logger from piker.log import get_logger
from . import TimeseriesNotFound
log = get_logger('storage.nativedb') log = get_logger('storage.nativedb')
class TimeseriesNotFound(Exception):
'''
No timeseries entry can be found for this backend.
'''
# NOTE: thanks to this SO answer for the below conversion routines # NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back: # to go from numpy struct-arrays to polars dataframes and back:
# https://stackoverflow.com/a/72054819 # https://stackoverflow.com/a/72054819