From f5dc21d3f4e8f2eba1b7c09d268f14eb6a0c89bc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 18 Dec 2023 11:54:28 -0500 Subject: [PATCH] Adjust all `.tsp` imports to use new sub-pkg Also toss in a poll loop around the `hist_shm: ShmArray` backfill read-check in the `.data.allocate_persisten_feed()` init to cope with possible racy-ness from the increased tsdb history loading concurrency now implemented. --- piker/data/feed.py | 10 ++++++++-- piker/storage/nativedb.py | 4 +--- piker/ui/_dataviz.py | 2 +- piker/ui/_fsp.py | 5 ++++- piker/ui/view_mode.py | 2 +- 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index c6a56e15..6fc3bb1b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -69,7 +69,7 @@ from .validate import ( FeedInit, validate_backend, ) -from .history import ( +from ..tsp import ( manage_history, ) from .ingest import get_ingestormod @@ -407,7 +407,13 @@ async def allocate_persistent_feed( rt_shm.array['time'][1] = ts + 1 elif hist_shm.array.size == 0: - raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?') + for i in range(100): + await trio.sleep(0.1) + if hist_shm.array.size > 0: + break + else: + await tractor.pause() + raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?') # wait the spawning parent task to register its subscriber # send-stream entry before we start the sample loop. diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index ebc09679..7d64cb6e 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -56,8 +56,6 @@ from datetime import datetime from pathlib import Path import time -# from bidict import bidict -# import tractor import numpy as np import polars as pl from pendulum import ( @@ -65,10 +63,10 @@ from pendulum import ( ) from piker import config +from piker import tsp from piker.data import ( def_iohlcv_fields, ShmArray, - tsp, ) from piker.log import get_logger from . import TimeseriesNotFound diff --git a/piker/ui/_dataviz.py b/piker/ui/_dataviz.py index d5a7195e..c18f5b67 100644 --- a/piker/ui/_dataviz.py +++ b/piker/ui/_dataviz.py @@ -49,7 +49,7 @@ from ..data._formatters import ( OHLCBarsAsCurveFmtr, # OHLC converted to line StepCurveFmtr, # "step" curve (like for vlm) ) -from ..data.tsp import ( +from ..tsp import ( slice_from_time, ) from ._ohlc import ( diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 58842a73..0227d60b 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -181,7 +181,10 @@ async def open_fsp_sidepane( async def open_fsp_actor_cluster( names: list[str] = ['fsp_0', 'fsp_1'], -) -> AsyncGenerator[int, dict[str, tractor.Portal]]: +) -> AsyncGenerator[ + int, + dict[str, tractor.Portal] +]: from tractor._clustering import open_actor_cluster diff --git a/piker/ui/view_mode.py b/piker/ui/view_mode.py index 31a06645..fa512549 100644 --- a/piker/ui/view_mode.py +++ b/piker/ui/view_mode.py @@ -31,7 +31,7 @@ import pendulum import pyqtgraph as pg from piker.types import Struct -from ..data.tsp import slice_from_time +from ..tsp import slice_from_time from ..log import get_logger from ..toolz import Profiler