Finally fix tsdb -> shm backfill loading

A slight facepalm but, the main issue was a simple indexing logic error:
we need to slice with `tsdb_history[-shm._first.value:]` to push most
recent history not oldest.. This allows cleanup of tsdb backfill loop as
well.

Further, greatly simply `diff_history()` time slicing by using the
classic `numpy` conditional slice on the epoch field.
agg_feedz
Tyler Goodlet 2022-11-11 14:31:53 -05:00
parent d6fb6fe3ae
commit 81516c5204
1 changed files with 36 additions and 73 deletions

View File

@ -220,40 +220,8 @@ def diff_history(
if last_tsdb_dt is None: if last_tsdb_dt is None:
return array return array
time_attr = { time = array['time']
1: 'seconds', return array[time > last_tsdb_dt.timestamp()]
60: 'minutes',
}[timeframe]
i_diff = getattr((end_dt - last_tsdb_dt), time_attr)
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if i_diff < 0:
# empty case since tsdb already has this history
return array[:0]
ln = len(array)
to_push = array
if i_diff < ln:
# slice out missing history from end of frame
to_push = array[ln - i_diff:ln]
# XXX: OLD GAP HANDLING..if there's a gap in a partial frame
# worth. we don't need this any more with the timeframe aware
# diffing above right?
# else:
# # pass back only the portion of the array that is
# # greater then the last time stamp in the tsdb.
# time = array['time']
# to_push = array[time >= last_tsdb_dt.timestamp()]
log.debug(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
async def start_backfill( async def start_backfill(
@ -375,8 +343,6 @@ async def start_backfill(
timeframe, timeframe,
end_dt=start_dt, end_dt=start_dt,
) )
# if timeframe == 1:
# await tractor.breakpoint()
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable: except DataUnavailable:
@ -441,7 +407,6 @@ async def start_backfill(
) )
# can't push the entire frame? so # can't push the entire frame? so
# push only the amount that can fit.. # push only the amount that can fit..
await tractor.breakpoint()
break break
log.info( log.info(
@ -614,7 +579,11 @@ async def tsdb_backfill(
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
prepend_start = shm._first.value prepend_start = shm._first.value
array = shm.array
if len(array):
shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) shm_last_dt = pendulum.from_timestamp(shm.array[0]['time'])
else:
shm_last_dt = None
if last_tsdb_dt: if last_tsdb_dt:
assert shm_last_dt >= last_tsdb_dt assert shm_last_dt >= last_tsdb_dt
@ -629,27 +598,24 @@ async def tsdb_backfill(
backfilled_size_s = ( backfilled_size_s = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
else:
backfilled_size_s = (
latest_start_dt - shm_last_dt
).seconds
# Load TSDB history into shm buffer (for display) if there is
# remaining buffer space.
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
# all missing data between the most recent backend-queried frame # all missing data between the most recent backend-queried frame
# and the most recent dt-index in the db we warn that we only # and the most recent dt-index in the db we warn that we only
# want to load a portion of the next tsdb query to fill that # want to load a portion of the next tsdb query to fill that
# space. # space.
log.info( log.info(
f'{backfilled_size_s} seconds worth of {timeframe}s data loaded' f'{backfilled_size_s} seconds worth of {timeframe}s loaded'
) )
# Load TSDB history into shm buffer (for display) if there is
# remaining buffer space.
if ( if (
len(tsdb_history) len(tsdb_history)
): ):
to_push = tsdb_history[:prepend_start]
# load the first (smaller) bit of history originally loaded
# above from ``Storage.load()``.
to_push = tsdb_history[-prepend_start:]
shm.push( shm.push(
to_push, to_push,
@ -660,37 +626,30 @@ async def tsdb_backfill(
# start=prepend_start, # start=prepend_start,
field_map=marketstore.ohlc_key_map, field_map=marketstore.ohlc_key_map,
) )
prepend_start = shm._first.value
# load as much from storage into shm as space will
# allow according to user's shm size settings.
last_frame_start = tsdb_history['Epoch'][0]
while ( while (
shm._first.value > 0 shm._first.value > 0
): ):
# load as much from storage into shm as space will
# allow according to user's shm size settings.
tsdb_last_frame_start = tsdb_history['Epoch'][0]
tsdb_history = await storage.read_ohlcv( tsdb_history = await storage.read_ohlcv(
fqsn, fqsn,
end=last_frame_start, end=tsdb_last_frame_start,
timeframe=timeframe, timeframe=timeframe,
) )
if ( if (
not len(tsdb_history) not len(tsdb_history) # empty query
# no earlier data detected
or tsdb_history['Epoch'][0] >= tsdb_last_frame_start
): ):
# on empty db history
break
time = tsdb_history['Epoch']
frame_start = time[0]
frame_end = time[0]
print(f"LOADING MKTS HISTORY: {frame_start} - {frame_end}")
if frame_start >= last_frame_start:
# no new data loaded was from tsdb, so we can exit.
break break
prepend_start = shm._first.value prepend_start = shm._first.value
to_push = tsdb_history[:prepend_start] to_push = tsdb_history[-prepend_start:]
# insert the history pre a "days worth" of samples # insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end. # to leave some real-time buffer space at the end.
@ -699,8 +658,6 @@ async def tsdb_backfill(
prepend=True, prepend=True,
field_map=marketstore.ohlc_key_map, field_map=marketstore.ohlc_key_map,
) )
last_frame_start = frame_start
log.info(f'Loaded {to_push.shape} datums from storage') log.info(f'Loaded {to_push.shape} datums from storage')
# manually trigger step update to update charts/fsps # manually trigger step update to update charts/fsps
@ -1248,6 +1205,11 @@ async def open_feed_bus(
# ensure we are who we think we are # ensure we are who we think we are
servicename = tractor.current_actor().name servicename = tractor.current_actor().name
assert 'brokerd' in servicename assert 'brokerd' in servicename
# XXX: figure this not crashing into debug!
# await tractor.breakpoint()
# assert 0
assert brokername in servicename assert brokername in servicename
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
@ -1557,6 +1519,7 @@ async def open_feed(
bus_ctxs.append( bus_ctxs.append(
portal.open_context( portal.open_context(
open_feed_bus, open_feed_bus,
# brokername=brokermod.name,
brokername=brokername, brokername=brokername,
symbols=bfqsns, symbols=bfqsns,
loglevel=loglevel, loglevel=loglevel,