Compare commits

..

1 Commits

4 changed files with 22 additions and 71 deletions

View File

@ -275,15 +275,9 @@ async def open_history_client(
f'{times}' f'{times}'
) )
# XXX, debug any case where the latest 1m bar we get is
# already another "sample's-step-old"..
if end_dt is None: if end_dt is None:
inow: int = round(time.time()) inow: int = round(time.time())
if ( if (inow - times[-1]) > 60:
_time_step := (inow - times[-1])
>
timeframe * 2
):
await tractor.pause() await tractor.pause()
start_dt = from_timestamp(times[0]) start_dt = from_timestamp(times[0])

View File

@ -447,13 +447,7 @@ def ldshm(
) )
# last chance manual overwrites in REPL # last chance manual overwrites in REPL
# await tractor.pause() # await tractor.pause()
if not aids: assert aids
log.warning(
f'No gaps were found !?\n'
f'fqme: {fqme!r}\n'
f'timeframe: {period_s!r}\n'
f"WELL THAT'S GOOD NOOZ!\n"
)
tf2aids[period_s] = aids tf2aids[period_s] = aids
else: else:

View File

@ -49,7 +49,6 @@ from pendulum import (
Duration, Duration,
duration as mk_duration, duration as mk_duration,
from_timestamp, from_timestamp,
timezone,
) )
import numpy as np import numpy as np
import polars as pl import polars as pl
@ -58,7 +57,9 @@ from piker.brokers import NoData
from piker.accounting import ( from piker.accounting import (
MktPair, MktPair,
) )
from piker.log import get_logger from piker.data._util import (
log,
)
from ..data._sharedmem import ( from ..data._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
@ -96,9 +97,6 @@ if TYPE_CHECKING:
# from .feed import _FeedsBus # from .feed import _FeedsBus
log = get_logger()
# `ShmArray` buffer sizing configuration: # `ShmArray` buffer sizing configuration:
_mins_in_day = int(60 * 24) _mins_in_day = int(60 * 24)
# how much is probably dependent on lifestyle # how much is probably dependent on lifestyle
@ -403,9 +401,7 @@ async def start_backfill(
# based on the sample step size, maybe load a certain amount history # based on the sample step size, maybe load a certain amount history
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if ( if backfill_until_dt is None:
_until_was_none := (backfill_until_dt is None)
):
# TODO: per-provider default history-durations? # TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow # -[ ] inside the `open_history_client()` config allow
@ -439,8 +435,6 @@ async def start_backfill(
last_start_dt: datetime = backfill_from_dt last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index next_prepend_index: int = backfill_from_shm_index
est = timezone('EST')
while last_start_dt > backfill_until_dt: while last_start_dt > backfill_until_dt:
log.info( log.info(
f'Requesting {timeframe}s frame:\n' f'Requesting {timeframe}s frame:\n'
@ -454,10 +448,9 @@ async def start_backfill(
next_end_dt, next_end_dt,
) = await get_hist( ) = await get_hist(
timeframe, timeframe,
end_dt=(end_dt_param := last_start_dt), end_dt=last_start_dt,
) )
except NoData as nodata: except NoData as _daterr:
_nodata = nodata
orig_last_start_dt: datetime = last_start_dt orig_last_start_dt: datetime = last_start_dt
gap_report: str = ( gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
@ -525,32 +518,8 @@ async def start_backfill(
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert (
(last_time := time[-1])
==
next_end_dt.timestamp()
)
frame_last_dt = from_timestamp(last_time) assert time[-1] == next_end_dt.timestamp()
if (
frame_last_dt.add(seconds=timeframe)
<
end_dt_param
):
est_frame_last_dt = est.convert(frame_last_dt)
est_end_dt_param = est.convert(end_dt_param)
log.warning(
f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n'
f'frame_last_dt (EST): {est_frame_last_dt!r}\n'
f'end_dt_param (EST): {est_end_dt_param!r}\n'
f'\n'
f'Likely contains,\n'
f'- a venue closure.\n'
f'- (maybe?) missing data ?\n'
)
# ?TODO, check against venue closure hours
# if/when provided by backend?
await tractor.pause()
expected_dur: Interval = ( expected_dur: Interval = (
last_start_dt.subtract( last_start_dt.subtract(
@ -612,11 +581,10 @@ async def start_backfill(
'0 BARS TO PUSH after diff!?\n' '0 BARS TO PUSH after diff!?\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
) )
await tractor.pause()
# Check if we're about to exceed buffer capacity BEFORE # Check if we're about to exceed buffer capacity BEFORE
# attempting the push # attempting the push
if (next_prepend_index - ln) < 0: if next_prepend_index - ln < 0:
log.warning( log.warning(
f'Backfill would exceed buffer capacity!\n' f'Backfill would exceed buffer capacity!\n'
f'next_prepend_index: {next_prepend_index}\n' f'next_prepend_index: {next_prepend_index}\n'
@ -687,7 +655,7 @@ async def start_backfill(
}, },
}) })
# XXX, can't push the entire frame? so # can't push the entire frame? so
# push only the amount that can fit.. # push only the amount that can fit..
break break
@ -747,8 +715,8 @@ async def start_backfill(
) = dedupe(df) ) = dedupe(df)
if diff: if diff:
log.warning( log.warning(
f'Found {diff!r} duplicates in tsdb! ' f'Found {diff} duplicates in tsdb, '
f'=> Overwriting with `deduped` data !! <=\n' f'overwriting with deduped data\n'
) )
await storage.write_ohlcv( await storage.write_ohlcv(
col_sym_key, col_sym_key,

View File

@ -87,11 +87,7 @@ def update_fsp_chart(
# guard against unreadable case # guard against unreadable case
if not last_row: if not last_row:
log.warning( log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
f'Read-race on shm array,\n'
f'graphics_name: {graphics_name!r}\n'
f'shm.token: {shm.token}\n'
)
return return
# update graphics # update graphics
@ -207,6 +203,7 @@ async def open_fsp_actor_cluster(
async def run_fsp_ui( async def run_fsp_ui(
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
flume: Flume, flume: Flume,
started: trio.Event, started: trio.Event,
@ -626,10 +623,8 @@ async def open_fsp_admin(
event.set() event.set()
# TODO, passing in `pikerd` related settings here!
# [ ] read in the `tractor` setting for `enable_transports: list`
# from the root `conf.toml`!
async def open_vlm_displays( async def open_vlm_displays(
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, flume: Flume,
dvlm: bool = True, dvlm: bool = True,
@ -639,12 +634,12 @@ async def open_vlm_displays(
) -> None: ) -> None:
''' '''
Vlm (volume) subchart displays. Volume subchart displays.
Since "volume" is often included directly alongside OHLCV price Since "volume" is often included directly alongside OHLCV price
data, we don't really need a separate FSP-actor + shm array for data, we don't really need a separate FSP-actor + shm array for it
it since it's likely already directly adjacent to OHLC samples since it's likely already directly adjacent to OHLC samples from the
from the data provider. data provider.
Further only if volume data is detected (it sometimes isn't provided Further only if volume data is detected (it sometimes isn't provided
eg. forex, certain commodities markets) will volume dependent FSPs eg. forex, certain commodities markets) will volume dependent FSPs