Oof, fix no-tsdb-entry since needs full backfill case!

Got borked by the logic re-factoring to get more conc going around
tsdb vs. latest frame loads with nested nurseries. So, repair all that
such that we can still backfill symbols previously not loaded as well as
drop all the `_FeedBus` instance passing to subtasks where it's
definitely not needed.

Toss in a pause point around sampler stream `'backfilling'` msgs as well
since there's seems to be a weird ctx-cancelled propagation going on
when a feed client disconnects during backfill and this might be where
the src `tractor.ContextCancelled` is getting bubbled from?
distribute_dis
Tyler Goodlet 2023-12-22 21:34:31 -05:00
parent b064a5f94d
commit 61e52213b2
2 changed files with 135 additions and 112 deletions

View File

@ -269,6 +269,7 @@ async def maybe_fill_null_segments(
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
try:
await sampler_stream.send({
'broadcast_all': {
@ -279,6 +280,9 @@ async def maybe_fill_null_segments(
'backfilling': (mkt.fqme, timeframe),
},
})
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
null_segs_detected.set()
# RECHECK for more null-gaps
@ -354,7 +358,6 @@ async def maybe_fill_null_segments(
async def start_backfill(
tn: trio.Nursery,
get_hist,
mod: ModuleType,
mkt: MktPair,
@ -408,7 +411,6 @@ async def start_backfill(
# settings above when the tsdb is detected as being empty.
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# STAGE NOTE: "backward history gap filling":
# - we push to the shm buffer until we have history back
# until the latest entry loaded from the tsdb's table B)
@ -752,6 +754,8 @@ async def back_load_from_tsdb(
async def push_latest_frame(
# box-type only that should get packed with the datetime
# objects received for the latest history frame
dt_eps: list[DateTime, DateTime],
shm: ShmArray,
get_hist: Callable[
@ -761,8 +765,11 @@ async def push_latest_frame(
timeframe: float,
config: dict,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
):
task_status: TaskStatus[
Exception | list[datetime, datetime]
] = trio.TASK_STATUS_IGNORED,
) -> list[datetime, datetime] | None:
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
@ -779,17 +786,19 @@ async def push_latest_frame(
mr_start_dt,
mr_end_dt,
])
task_status.started(dt_eps)
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
task_status.started(None)
if timeframe > 1:
await tractor.pause()
return
# prolly tf not supported
return None
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
@ -801,11 +810,16 @@ async def push_latest_frame(
prepend=True, # append on first frame
)
return dt_eps
async def load_tsdb_hist(
storage: StorageClient,
mkt: MktPair,
timeframe: float,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> tuple[
np.ndarray,
DateTime,
@ -909,20 +923,31 @@ async def tsdb_backfill(
# mem!
backfill_gap_from_shm_index: int = shm._first.value + 1
# Prepend any tsdb history to the shm buffer which should
# now be full of the most recent history pulled from the
# backend's last frame.
if (
dt_eps
and tsdb_entry
):
# unpack both the latest (gap) backfilled frame dts
# Prepend any tsdb history into the rt-shm-buffer which
# should NOW be getting filled with the most recent history
# pulled from the data-backend.
if dt_eps:
# well then, unpack the latest (gap) backfilled frame dts
(
mr_start_dt,
mr_end_dt,
) = dt_eps
# AND the tsdb history from (offline) storage)
# NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
# we shouldn't be here!), or
# - no prior history has been stored (yet) and we need
# todo full backfill of the history now.
if tsdb_entry is None:
# indicate to backfill task to fill the whole
# shm buffer as much as it can!
last_tsdb_dt = None
# there's existing tsdb history from (offline) storage
# so only backfill the gap between the
# most-recent-frame (mrf) and that latest sample.
else:
(
tsdb_history,
first_tsdb_dt,
@ -937,7 +962,6 @@ async def tsdb_backfill(
bf_done = await tn.start(
partial(
start_backfill,
tn=tn,
get_hist=get_hist,
mod=mod,
mkt=mkt,
@ -954,7 +978,8 @@ async def tsdb_backfill(
write_tsdb=True,
)
)
nulls_detected: trio.Event | None = None
if last_tsdb_dt is not None:
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
@ -1042,7 +1067,9 @@ async def tsdb_backfill(
# 2nd nursery END
# TODO: who would want to?
if nulls_detected:
await nulls_detected.wait()
await bf_done.wait()
# TODO: maybe start history anal and load missing "history
# gaps" via backend..
@ -1087,7 +1114,6 @@ async def tsdb_backfill(
async def manage_history(
mod: ModuleType,
bus: _FeedsBus,
mkt: MktPair,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
@ -1244,7 +1270,6 @@ async def manage_history(
tsdb_backfill,
mod=mod,
storemod=storemod,
# bus,
storage=client,
mkt=mkt,
shm=tf2mem[timeframe],
@ -1337,5 +1362,3 @@ def iter_dfs_from_shms(
shm,
df,
)

View File

@ -526,7 +526,7 @@ def with_dts(
pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([
pl.from_epoch(
pl.col(f'{time_col}_prev')
column=pl.col(f'{time_col}_prev'),
).alias('dt_prev'),
pl.col('dt').diff().alias('dt_diff'),
]) #.with_columns(