Rework backfiller and null-segment task conc

For each timeframe open a sub-nursery to do the backfilling + tsdb load
+ null-segment scanning in an effort to both speed up load time (though
we need to reverse the current order to really make it faster rn since
moving to the much faster parquet file backend) and do concurrent
time-gap/null-segment checking of tsdb history while mrf (most recent
frame) history is backfilling.

The details are more or less just `trio` related task-func composition
tricks and a reordering of said funcs for optimal startup latency.
Also commented the `back_load_from_tsdb()` task for now since it's
unused.
distribute_dis
Tyler Goodlet 2023-12-15 13:11:00 -05:00
parent a4084d6a0b
commit 97e2403fb1
1 changed files with 310 additions and 197 deletions

View File

@ -36,6 +36,7 @@ import trio
from trio_typing import TaskStatus
import tractor
from pendulum import (
DateTime,
Duration,
from_timestamp,
)
@ -172,16 +173,27 @@ async def maybe_fill_null_segments(
sampler_stream: tractor.MsgStream,
mkt: MktPair,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
) -> list[Frame]:
null_segs_detected = trio.Event()
task_status.started(null_segs_detected)
frame: Frame = shm.array
async for (
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
for (
absi_start, absi_end,
fi_start, fi_end,
start_t, end_t,
start_dt, end_dt,
) in iter_null_segs(
frame,
null_segs=null_segs,
frame=frame,
timeframe=timeframe,
):
@ -191,6 +203,7 @@ async def maybe_fill_null_segments(
start_dt
and end_dt < start_dt
):
await tractor.pause()
break
(
@ -233,29 +246,37 @@ async def maybe_fill_null_segments(
},
})
null_segs_detected.set()
# RECHECK for more null-gaps
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
if (
null_segs
and
len(null_segs[-1])
):
await tractor.pause()
# TODO: interatively step through any remaining time gaps?
# if (
# next_end_dt not in frame[
# ):
# pass
# RECHECK for more null-gaps
frame: Frame = shm.array
null_segs: tuple | None = get_null_segs(
frame,
period=timeframe,
)
if (
null_segs
and
len(null_segs[-1])
):
await tractor.pause()
# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
# -[ ] not sure that's going to work so well on the ib
# backend but worth a shot?
# -[ ] mk new history connections to make it properly
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# if (
# next_end_dt not in frame[
# ):
# pass
async def start_backfill(
tn: trio.Nursery,
get_hist,
mod: ModuleType,
mkt: MktPair,
@ -510,23 +531,6 @@ async def start_backfill(
f'Finished filling gap to tsdb start @ {backfill_until_dt}!'
)
# NOTE: conduct tsdb timestamp gap detection and backfill any
# seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
await maybe_fill_null_segments(
shm=shm,
timeframe=timeframe,
get_hist=get_hist,
sampler_stream=sampler_stream,
mkt=mkt,
)
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
@ -537,6 +541,12 @@ async def start_backfill(
bf_done.set()
# NOTE: originally this was used to cope with a tsdb (marketstore)
# which could not delivery very large frames of history over gRPC
# (thanks goolag) due to corruption issues. NOW, using apache
# parquet (by default in the local filesys) we don't have this
# requirement since the files can be loaded very quickly in
# entirety to memory via
async def back_load_from_tsdb(
storemod: ModuleType,
storage: StorageClient,
@ -674,10 +684,94 @@ async def back_load_from_tsdb(
# await sampler_stream.send('broadcast_all')
async def push_latest_frame(
dt_eps: list[DateTime, DateTime],
shm: ShmArray,
get_hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
],
timeframe: float,
config: dict,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
timeframe,
end_dt=None,
)
# so caller can access these ep values
dt_eps.extend([
mr_start_dt,
mr_end_dt,
])
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
if timeframe > 1:
await tractor.pause()
return
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
async def load_tsdb_hist(
storage: StorageClient,
mkt: MktPair,
timeframe: float,
) -> tuple[
np.ndarray,
DateTime,
DateTime,
] | None:
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
tsdb_entry: tuple[
np.ndarray,
DateTime,
DateTime,
]
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
return tsdb_entry
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {timeframe}@{fqme}'
)
return None
async def tsdb_backfill(
mod: ModuleType,
storemod: ModuleType,
tn: trio.Nursery,
storage: StorageClient,
mkt: MktPair,
@ -692,175 +786,193 @@ async def tsdb_backfill(
) -> None:
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
get_hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
]
config: dict[str, int]
async with mod.open_history_client(
mkt,
) as (get_hist, config):
async with (
mod.open_history_client(
mkt,
) as (get_hist, config),
# NOTE: this sub-nursery splits to tasks for the given
# sampling rate to concurrently load offline tsdb
# timeseries as well as new data from the venue backend!
):
log.info(
f'`{mod}` history client returned backfill config:\n'
f'{config}\n'
)
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
tn.start_soon(
push_latest_frame,
dt_eps,
shm,
get_hist,
timeframe,
end_dt=None,
config,
)
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
# tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
task_status.started()
if timeframe > 1:
await tractor.pause()
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
timeframe,
)
return
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
)
# NOTE: iabs to start backfilling from, reverse chronological,
# ONLY AFTER the first history frame has been pushed to
# mem!
backfill_gap_from_shm_index: int = shm._first.value + 1
# tell parent task to continue
task_status.started()
(
mr_start_dt,
mr_end_dt,
) = dt_eps
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
fqme: str = mkt.fqme
last_tsdb_dt: datetime | None = None
try:
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {timeframe}@{fqme}'
)
else:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
async with trio.open_nursery() as tn:
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
# Prepend any tsdb history to the shm buffer which should
# now be full of the most recent history pulled from the
# backend's last frame.
if tsdb_entry:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
# NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
tn=tn,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
offset_samples: int = round(offset_s / timeframe)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
log.info(f'Loaded {to_push.shape} datums from storage')
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
# NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
offset_samples: int = round(offset_s / timeframe)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
# seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
nulls_detected: trio.Event = await tn.start(partial(
maybe_fill_null_segments,
shm=shm,
timeframe=timeframe,
get_hist=get_hist,
sampler_stream=sampler_stream,
mkt=mkt,
))
# TODO: who would want to?
await nulls_detected.wait()
await bf_done.wait()
# TODO: maybe start history anal and load missing "history
# gaps" via backend..
if timeframe not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
@ -874,32 +986,29 @@ async def tsdb_backfill(
# backload any further data from tsdb (concurrently per
# timeframe) if not all data was able to be loaded (in memory)
# from the ``StorageClient.load()`` call above.
try:
await trio.sleep_forever()
finally:
return
await trio.sleep_forever()
# XXX NOTE: this is legacy from when we were using
# marketstore and we needed to continue backloading
# incrementally from the tsdb client.. (bc it couldn't
# handle a single large query with gRPC for some
# reason.. classic goolag pos)
tn.start_soon(
back_load_from_tsdb,
# tn.start_soon(
# back_load_from_tsdb,
storemod,
storage,
fqme,
# storemod,
# storage,
# fqme,
tsdb_history,
last_tsdb_dt,
mr_start_dt,
mr_end_dt,
bf_done,
# tsdb_history,
# last_tsdb_dt,
# mr_start_dt,
# mr_end_dt,
# bf_done,
timeframe,
shm,
)
# timeframe,
# shm,
# )
async def manage_history(
@ -1007,6 +1116,11 @@ async def manage_history(
async with (
storage.open_storage_client() as (storemod, client),
# NOTE: this nursery spawns a task per "timeframe" (aka
# sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently
# ;)
trio.open_nursery() as tn,
):
log.info(
@ -1056,7 +1170,6 @@ async def manage_history(
tsdb_backfill,
mod=mod,
storemod=storemod,
tn=tn,
# bus,
storage=client,
mkt=mkt,