Adjust all `.tsp` imports to use new sub-pkg

Also toss in a poll loop around the `hist_shm: ShmArray` backfill
read-check in the `.data.allocate_persisten_feed()`  init to cope with
possible racy-ness from the increased tsdb history loading concurrency
now implemented.
distribute_dis
Tyler Goodlet 2023-12-18 11:54:28 -05:00
parent 4568c55f17
commit f5dc21d3f4
5 changed files with 15 additions and 8 deletions

View File

@ -69,7 +69,7 @@ from .validate import (
FeedInit, FeedInit,
validate_backend, validate_backend,
) )
from .history import ( from ..tsp import (
manage_history, manage_history,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
@ -407,7 +407,13 @@ async def allocate_persistent_feed(
rt_shm.array['time'][1] = ts + 1 rt_shm.array['time'][1] = ts + 1
elif hist_shm.array.size == 0: elif hist_shm.array.size == 0:
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?') for i in range(100):
await trio.sleep(0.1)
if hist_shm.array.size > 0:
break
else:
await tractor.pause()
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
# wait the spawning parent task to register its subscriber # wait the spawning parent task to register its subscriber
# send-stream entry before we start the sample loop. # send-stream entry before we start the sample loop.

View File

@ -56,8 +56,6 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
import time import time
# from bidict import bidict
# import tractor
import numpy as np import numpy as np
import polars as pl import polars as pl
from pendulum import ( from pendulum import (
@ -65,10 +63,10 @@ from pendulum import (
) )
from piker import config from piker import config
from piker import tsp
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
ShmArray, ShmArray,
tsp,
) )
from piker.log import get_logger from piker.log import get_logger
from . import TimeseriesNotFound from . import TimeseriesNotFound

View File

@ -49,7 +49,7 @@ from ..data._formatters import (
OHLCBarsAsCurveFmtr, # OHLC converted to line OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm) StepCurveFmtr, # "step" curve (like for vlm)
) )
from ..data.tsp import ( from ..tsp import (
slice_from_time, slice_from_time,
) )
from ._ohlc import ( from ._ohlc import (

View File

@ -181,7 +181,10 @@ async def open_fsp_sidepane(
async def open_fsp_actor_cluster( async def open_fsp_actor_cluster(
names: list[str] = ['fsp_0', 'fsp_1'], names: list[str] = ['fsp_0', 'fsp_1'],
) -> AsyncGenerator[int, dict[str, tractor.Portal]]: ) -> AsyncGenerator[
int,
dict[str, tractor.Portal]
]:
from tractor._clustering import open_actor_cluster from tractor._clustering import open_actor_cluster

View File

@ -31,7 +31,7 @@ import pendulum
import pyqtgraph as pg import pyqtgraph as pg
from piker.types import Struct from piker.types import Struct
from ..data.tsp import slice_from_time from ..tsp import slice_from_time
from ..log import get_logger from ..log import get_logger
from ..toolz import Profiler from ..toolz import Profiler