Compare commits
1 Commits
7280a1452d
...
bc26676e59
| Author | SHA1 | Date |
|---|---|---|
|
|
bc26676e59 |
|
|
@ -275,15 +275,9 @@ async def open_history_client(
|
||||||
f'{times}'
|
f'{times}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX, debug any case where the latest 1m bar we get is
|
|
||||||
# already another "sample's-step-old"..
|
|
||||||
if end_dt is None:
|
if end_dt is None:
|
||||||
inow: int = round(time.time())
|
inow: int = round(time.time())
|
||||||
if (
|
if (inow - times[-1]) > 60:
|
||||||
_time_step := (inow - times[-1])
|
|
||||||
>
|
|
||||||
timeframe * 2
|
|
||||||
):
|
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
start_dt = from_timestamp(times[0])
|
start_dt = from_timestamp(times[0])
|
||||||
|
|
|
||||||
|
|
@ -447,13 +447,7 @@ def ldshm(
|
||||||
)
|
)
|
||||||
# last chance manual overwrites in REPL
|
# last chance manual overwrites in REPL
|
||||||
# await tractor.pause()
|
# await tractor.pause()
|
||||||
if not aids:
|
assert aids
|
||||||
log.warning(
|
|
||||||
f'No gaps were found !?\n'
|
|
||||||
f'fqme: {fqme!r}\n'
|
|
||||||
f'timeframe: {period_s!r}\n'
|
|
||||||
f"WELL THAT'S GOOD NOOZ!\n"
|
|
||||||
)
|
|
||||||
tf2aids[period_s] = aids
|
tf2aids[period_s] = aids
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,6 @@ from pendulum import (
|
||||||
Duration,
|
Duration,
|
||||||
duration as mk_duration,
|
duration as mk_duration,
|
||||||
from_timestamp,
|
from_timestamp,
|
||||||
timezone,
|
|
||||||
)
|
)
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import polars as pl
|
import polars as pl
|
||||||
|
|
@ -58,7 +57,9 @@ from piker.brokers import NoData
|
||||||
from piker.accounting import (
|
from piker.accounting import (
|
||||||
MktPair,
|
MktPair,
|
||||||
)
|
)
|
||||||
from piker.log import get_logger
|
from piker.data._util import (
|
||||||
|
log,
|
||||||
|
)
|
||||||
from ..data._sharedmem import (
|
from ..data._sharedmem import (
|
||||||
maybe_open_shm_array,
|
maybe_open_shm_array,
|
||||||
ShmArray,
|
ShmArray,
|
||||||
|
|
@ -96,9 +97,6 @@ if TYPE_CHECKING:
|
||||||
# from .feed import _FeedsBus
|
# from .feed import _FeedsBus
|
||||||
|
|
||||||
|
|
||||||
log = get_logger()
|
|
||||||
|
|
||||||
|
|
||||||
# `ShmArray` buffer sizing configuration:
|
# `ShmArray` buffer sizing configuration:
|
||||||
_mins_in_day = int(60 * 24)
|
_mins_in_day = int(60 * 24)
|
||||||
# how much is probably dependent on lifestyle
|
# how much is probably dependent on lifestyle
|
||||||
|
|
@ -403,9 +401,7 @@ async def start_backfill(
|
||||||
|
|
||||||
# based on the sample step size, maybe load a certain amount history
|
# based on the sample step size, maybe load a certain amount history
|
||||||
update_start_on_prepend: bool = False
|
update_start_on_prepend: bool = False
|
||||||
if (
|
if backfill_until_dt is None:
|
||||||
_until_was_none := (backfill_until_dt is None)
|
|
||||||
):
|
|
||||||
|
|
||||||
# TODO: per-provider default history-durations?
|
# TODO: per-provider default history-durations?
|
||||||
# -[ ] inside the `open_history_client()` config allow
|
# -[ ] inside the `open_history_client()` config allow
|
||||||
|
|
@ -439,8 +435,6 @@ async def start_backfill(
|
||||||
last_start_dt: datetime = backfill_from_dt
|
last_start_dt: datetime = backfill_from_dt
|
||||||
next_prepend_index: int = backfill_from_shm_index
|
next_prepend_index: int = backfill_from_shm_index
|
||||||
|
|
||||||
est = timezone('EST')
|
|
||||||
|
|
||||||
while last_start_dt > backfill_until_dt:
|
while last_start_dt > backfill_until_dt:
|
||||||
log.info(
|
log.info(
|
||||||
f'Requesting {timeframe}s frame:\n'
|
f'Requesting {timeframe}s frame:\n'
|
||||||
|
|
@ -454,10 +448,9 @@ async def start_backfill(
|
||||||
next_end_dt,
|
next_end_dt,
|
||||||
) = await get_hist(
|
) = await get_hist(
|
||||||
timeframe,
|
timeframe,
|
||||||
end_dt=(end_dt_param := last_start_dt),
|
end_dt=last_start_dt,
|
||||||
)
|
)
|
||||||
except NoData as nodata:
|
except NoData as _daterr:
|
||||||
_nodata = nodata
|
|
||||||
orig_last_start_dt: datetime = last_start_dt
|
orig_last_start_dt: datetime = last_start_dt
|
||||||
gap_report: str = (
|
gap_report: str = (
|
||||||
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
|
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
|
||||||
|
|
@ -525,32 +518,8 @@ async def start_backfill(
|
||||||
==
|
==
|
||||||
next_start_dt.timestamp()
|
next_start_dt.timestamp()
|
||||||
)
|
)
|
||||||
assert (
|
|
||||||
(last_time := time[-1])
|
|
||||||
==
|
|
||||||
next_end_dt.timestamp()
|
|
||||||
)
|
|
||||||
|
|
||||||
frame_last_dt = from_timestamp(last_time)
|
assert time[-1] == next_end_dt.timestamp()
|
||||||
if (
|
|
||||||
frame_last_dt.add(seconds=timeframe)
|
|
||||||
<
|
|
||||||
end_dt_param
|
|
||||||
):
|
|
||||||
est_frame_last_dt = est.convert(frame_last_dt)
|
|
||||||
est_end_dt_param = est.convert(end_dt_param)
|
|
||||||
log.warning(
|
|
||||||
f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n'
|
|
||||||
f'frame_last_dt (EST): {est_frame_last_dt!r}\n'
|
|
||||||
f'end_dt_param (EST): {est_end_dt_param!r}\n'
|
|
||||||
f'\n'
|
|
||||||
f'Likely contains,\n'
|
|
||||||
f'- a venue closure.\n'
|
|
||||||
f'- (maybe?) missing data ?\n'
|
|
||||||
)
|
|
||||||
# ?TODO, check against venue closure hours
|
|
||||||
# if/when provided by backend?
|
|
||||||
await tractor.pause()
|
|
||||||
|
|
||||||
expected_dur: Interval = (
|
expected_dur: Interval = (
|
||||||
last_start_dt.subtract(
|
last_start_dt.subtract(
|
||||||
|
|
@ -612,11 +581,10 @@ async def start_backfill(
|
||||||
'0 BARS TO PUSH after diff!?\n'
|
'0 BARS TO PUSH after diff!?\n'
|
||||||
f'{next_start_dt} -> {last_start_dt}'
|
f'{next_start_dt} -> {last_start_dt}'
|
||||||
)
|
)
|
||||||
await tractor.pause()
|
|
||||||
|
|
||||||
# Check if we're about to exceed buffer capacity BEFORE
|
# Check if we're about to exceed buffer capacity BEFORE
|
||||||
# attempting the push
|
# attempting the push
|
||||||
if (next_prepend_index - ln) < 0:
|
if next_prepend_index - ln < 0:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Backfill would exceed buffer capacity!\n'
|
f'Backfill would exceed buffer capacity!\n'
|
||||||
f'next_prepend_index: {next_prepend_index}\n'
|
f'next_prepend_index: {next_prepend_index}\n'
|
||||||
|
|
@ -687,7 +655,7 @@ async def start_backfill(
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
# XXX, can't push the entire frame? so
|
# can't push the entire frame? so
|
||||||
# push only the amount that can fit..
|
# push only the amount that can fit..
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
@ -747,8 +715,8 @@ async def start_backfill(
|
||||||
) = dedupe(df)
|
) = dedupe(df)
|
||||||
if diff:
|
if diff:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Found {diff!r} duplicates in tsdb! '
|
f'Found {diff} duplicates in tsdb, '
|
||||||
f'=> Overwriting with `deduped` data !! <=\n'
|
f'overwriting with deduped data\n'
|
||||||
)
|
)
|
||||||
await storage.write_ohlcv(
|
await storage.write_ohlcv(
|
||||||
col_sym_key,
|
col_sym_key,
|
||||||
|
|
|
||||||
|
|
@ -87,11 +87,7 @@ def update_fsp_chart(
|
||||||
|
|
||||||
# guard against unreadable case
|
# guard against unreadable case
|
||||||
if not last_row:
|
if not last_row:
|
||||||
log.warning(
|
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
|
||||||
f'Read-race on shm array,\n'
|
|
||||||
f'graphics_name: {graphics_name!r}\n'
|
|
||||||
f'shm.token: {shm.token}\n'
|
|
||||||
)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# update graphics
|
# update graphics
|
||||||
|
|
@ -207,6 +203,7 @@ async def open_fsp_actor_cluster(
|
||||||
|
|
||||||
|
|
||||||
async def run_fsp_ui(
|
async def run_fsp_ui(
|
||||||
|
|
||||||
linkedsplits: LinkedSplits,
|
linkedsplits: LinkedSplits,
|
||||||
flume: Flume,
|
flume: Flume,
|
||||||
started: trio.Event,
|
started: trio.Event,
|
||||||
|
|
@ -626,10 +623,8 @@ async def open_fsp_admin(
|
||||||
event.set()
|
event.set()
|
||||||
|
|
||||||
|
|
||||||
# TODO, passing in `pikerd` related settings here!
|
|
||||||
# [ ] read in the `tractor` setting for `enable_transports: list`
|
|
||||||
# from the root `conf.toml`!
|
|
||||||
async def open_vlm_displays(
|
async def open_vlm_displays(
|
||||||
|
|
||||||
linked: LinkedSplits,
|
linked: LinkedSplits,
|
||||||
flume: Flume,
|
flume: Flume,
|
||||||
dvlm: bool = True,
|
dvlm: bool = True,
|
||||||
|
|
@ -639,12 +634,12 @@ async def open_vlm_displays(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Vlm (volume) subchart displays.
|
Volume subchart displays.
|
||||||
|
|
||||||
Since "volume" is often included directly alongside OHLCV price
|
Since "volume" is often included directly alongside OHLCV price
|
||||||
data, we don't really need a separate FSP-actor + shm array for
|
data, we don't really need a separate FSP-actor + shm array for it
|
||||||
it since it's likely already directly adjacent to OHLC samples
|
since it's likely already directly adjacent to OHLC samples from the
|
||||||
from the data provider.
|
data provider.
|
||||||
|
|
||||||
Further only if volume data is detected (it sometimes isn't provided
|
Further only if volume data is detected (it sometimes isn't provided
|
||||||
eg. forex, certain commodities markets) will volume dependent FSPs
|
eg. forex, certain commodities markets) will volume dependent FSPs
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue