Handle backends with no 1s OHLC history

If a history manager raises a `DataUnavailable` just assume the sample
rate isn't supported and that no shm prepends will be done. Further seed
the shm array in such cases as before from the 1m history's last datum.

Also, fix tsdb -> shm back-loading, cancelling tsdb queries when either
no array-data is returned or a frame is delivered which has a start time
no lesser then the least last retrieved. Use strict timeframes for every
`Storage` API call.
ib_1m_hist
Tyler Goodlet 2022-10-26 01:05:41 -04:00
parent f7ec66362e
commit 0000d9a314
1 changed files with 140 additions and 88 deletions

View File

@ -278,7 +278,6 @@ async def start_backfill(
timeframe, timeframe,
end_dt=None, end_dt=None,
) )
times = array['time'] times = array['time']
# sample period step size in seconds # sample period step size in seconds
@ -336,11 +335,15 @@ async def start_backfill(
if tsdb_is_up: if tsdb_is_up:
# do a decently sized backfill and load it into storage. # do a decently sized backfill and load it into storage.
periods = { periods = {
1: {'days': 1}, 1: {'days': 6},
60: {'years': 6}, 60: {'years': 6},
} }
kwargs = periods[step_size_s] kwargs = periods[step_size_s]
# NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history
# settings above when the tsdb is detected as being empty.
last_tsdb_dt = start_dt.subtract(**kwargs) last_tsdb_dt = start_dt.subtract(**kwargs)
# configure async query throttling # configure async query throttling
@ -348,31 +351,24 @@ async def start_backfill(
# XXX: legacy from ``trimeter`` code but unsupported now. # XXX: legacy from ``trimeter`` code but unsupported now.
# erlangs = config.get('erlangs', 1) # erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame
# starts.
starts: set[datetime] = set()
# inline sequential loop where we simply pass the # inline sequential loop where we simply pass the
# last retrieved start dt to the next request as # last retrieved start dt to the next request as
# it's end dt. # it's end dt.
starts: set[datetime] = set()
while start_dt > last_tsdb_dt: while start_dt > last_tsdb_dt:
try:
log.info( log.info(
f'Requesting {step_size_s}s frame ending in {start_dt}' f'Requesting {step_size_s}s frame ending in {start_dt}'
) )
try:
array, next_start_dt, end_dt = await hist( array, next_start_dt, end_dt = await hist(
timeframe, timeframe,
end_dt=start_dt, end_dt=start_dt,
) )
if next_start_dt in starts:
start_dt = min(starts)
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
continue
# only update new start point if new
start_dt = next_start_dt
starts.add(start_dt)
assert array['time'][0] == start_dt.timestamp()
except NoData: except NoData:
# XXX: unhandled history gap (shouldn't happen?) # XXX: unhandled history gap (shouldn't happen?)
log.warning( log.warning(
@ -392,6 +388,17 @@ async def start_backfill(
# request loop until the condition is resolved? # request loop until the condition is resolved?
return return
if next_start_dt in starts:
start_dt = min(starts)
print("SKIPPING DUPLICATE FRAME @ {next_start_dt}")
continue
# only update new start point if not-yet-seen
start_dt = next_start_dt
starts.add(start_dt)
assert array['time'][0] == start_dt.timestamp()
diff = end_dt - start_dt diff = end_dt - start_dt
frame_time_diff_s = diff.seconds frame_time_diff_s = diff.seconds
expected_frame_size_s = frame_size_s + step_size_s expected_frame_size_s = frame_size_s + step_size_s
@ -462,7 +469,6 @@ async def start_backfill(
# short-circuit (for now) # short-circuit (for now)
bf_done.set() bf_done.set()
return
async def basic_backfill( async def basic_backfill(
@ -480,6 +486,7 @@ async def basic_backfill(
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars
for timeframe, shm in shms.items(): for timeframe, shm in shms.items():
try:
await bus.nursery.start( await bus.nursery.start(
partial( partial(
start_backfill, start_backfill,
@ -489,6 +496,9 @@ async def basic_backfill(
timeframe=timeframe, timeframe=timeframe,
) )
) )
except DataUnavailable:
# XXX: timeframe not supported for backend
continue
async def tsdb_backfill( async def tsdb_backfill(
@ -500,7 +510,6 @@ async def tsdb_backfill(
bfqsn: str, bfqsn: str,
shms: dict[int, ShmArray], shms: dict[int, ShmArray],
# some_data_ready: trio.Event,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ShmArray, ShmArray] tuple[ShmArray, ShmArray]
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
@ -513,12 +522,13 @@ async def tsdb_backfill(
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
for timeframe, shm in shms.items(): for timeframe, shm in shms.items():
series, _, last_tsdb_dt = await storage.load( tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load(
fqsn, fqsn,
timeframe=timeframe, timeframe=timeframe,
) )
broker, symbol, expiry = unpack_fqsn(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn)
try:
( (
latest_start_dt, latest_start_dt,
latest_end_dt, latest_end_dt,
@ -535,8 +545,19 @@ async def tsdb_backfill(
storage=storage, storage=storage,
) )
) )
except DataUnavailable:
# XXX: timeframe not supported for backend
dts_per_tf[timeframe] = ( dts_per_tf[timeframe] = (
series.get(timeframe), tsdb_history,
last_tsdb_dt,
None,
None,
)
continue
# tsdb_history = series.get(timeframe)
dts_per_tf[timeframe] = (
tsdb_history,
last_tsdb_dt, last_tsdb_dt,
latest_start_dt, latest_start_dt,
latest_end_dt, latest_end_dt,
@ -553,7 +574,7 @@ async def tsdb_backfill(
# the shm buffer?.. no se. # the shm buffer?.. no se.
# unblock the feed bus management task # unblock the feed bus management task
assert len(shms[1].array) # assert len(shms[1].array)
task_status.started(( task_status.started((
shms[60], shms[60],
shms[1], shms[1],
@ -562,6 +583,11 @@ async def tsdb_backfill(
# sync to backend history task's query/load completion # sync to backend history task's query/load completion
await bf_done.wait() await bf_done.wait()
# Load tsdb history into shm buffer (for display).
# TODO: eventually it'd be nice to not require a shm array/buffer
# to accomplish this.. maybe we can do some kind of tsdb direct to
# graphics format eventually in a child-actor?
for timeframe, shm in shms.items(): for timeframe, shm in shms.items():
( (
tsdb_history, tsdb_history,
@ -573,7 +599,7 @@ async def tsdb_backfill(
# do diff against last start frame of history and only fill # do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent # in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data. # to be loaded into mem *before* tsdb data.
if last_tsdb_dt: if last_tsdb_dt and latest_start_dt:
dt_diff_s = ( dt_diff_s = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
@ -588,9 +614,10 @@ async def tsdb_backfill(
# sanity check on most-recent-data loading # sanity check on most-recent-data loading
assert prepend_start > dt_diff_s assert prepend_start > dt_diff_s
if tsdb_history and len(tsdb_history): if (
len(tsdb_history)
):
to_push = tsdb_history[:prepend_start] to_push = tsdb_history[:prepend_start]
shm.push( shm.push(
to_push, to_push,
@ -601,30 +628,49 @@ async def tsdb_backfill(
# start=prepend_start, # start=prepend_start,
field_map=marketstore.ohlc_key_map, field_map=marketstore.ohlc_key_map,
) )
prepend_start = shm._first.value
# load as much from storage into shm as space will # load as much from storage into shm as space will
# allow according to user's shm size settings. # allow according to user's shm size settings.
end = tsdb_history['Epoch'][0] last_frame_start = tsdb_history['Epoch'][0]
while shm._first.value > 0: while (
series = await storage.read_ohlcv( shm._first.value > 0
# and frame_start < last_frame_start
):
tsdb_history = await storage.read_ohlcv(
fqsn, fqsn,
end=end, end=last_frame_start,
timeframe=timeframe, timeframe=timeframe,
) )
prepend_start -= len(to_push) if (
to_push = tsdb_history[:prepend_start] not len(tsdb_history)
):
# on empty db history
break
shm.push( time = tsdb_history['Epoch']
to_push, frame_start = time[0]
frame_end = time[0]
print(f"LOADING MKTS HISTORY: {frame_start} - {frame_end}")
if frame_start >= last_frame_start:
# no new data loaded was from tsdb, so we can exit.
break
prepend_start = shm._first.value
to_push = tsdb_history[:prepend_start]
# insert the history pre a "days worth" of samples # insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end. # to leave some real-time buffer space at the end.
shm.push(
to_push,
prepend=True, prepend=True,
# update_first=False,
# start=prepend_start,
field_map=marketstore.ohlc_key_map, field_map=marketstore.ohlc_key_map,
) )
last_frame_start = frame_start
log.info(f'Loaded {to_push.shape} datums from storage')
# manually trigger step update to update charts/fsps # manually trigger step update to update charts/fsps
# which need an incremental update. # which need an incremental update.
@ -640,8 +686,6 @@ async def tsdb_backfill(
for delay_s in sampler.subscribers: for delay_s in sampler.subscribers:
await broadcast(delay_s) await broadcast(delay_s)
log.info(f'Loaded {to_push.shape} datums from storage')
# TODO: write new data to tsdb to be ready to for next read. # TODO: write new data to tsdb to be ready to for next read.
@ -692,14 +736,15 @@ async def manage_history(
# we expect the sub-actor to write # we expect the sub-actor to write
readonly=False, readonly=False,
size=3*_secs_in_day, size=4*_secs_in_day,
) )
# (for now) set the rt (hft) shm array with space to prepend # (for now) set the rt (hft) shm array with space to prepend
# only a days worth of 1s history. # only a few days worth of 1s history.
days = 1 days = 3
rt_shm._first.value = days*_secs_in_day start_index = days*_secs_in_day
rt_shm._last.value = days*_secs_in_day rt_shm._first.value = start_index
rt_shm._last.value = start_index
rt_zero_index = rt_shm.index - 1 rt_zero_index = rt_shm.index - 1
if not opened: if not opened:
@ -737,8 +782,6 @@ async def manage_history(
1: rt_shm, 1: rt_shm,
60: hist_shm, 60: hist_shm,
}, },
# some_data_ready=some_data_ready,
# task_status=task_status,
) )
# yield back after client connect with filled shm # yield back after client connect with filled shm
@ -866,7 +909,6 @@ async def allocate_persistent_feed(
# this task. # this task.
msg = init_msg[symbol] msg = init_msg[symbol]
msg['hist_shm_token'] = hist_shm.token msg['hist_shm_token'] = hist_shm.token
# msg['startup_hist_index'] = hist_shm.index - 1
msg['izero_hist'] = izero_hist msg['izero_hist'] = izero_hist
msg['izero_rt'] = izero_rt msg['izero_rt'] = izero_rt
msg['rt_shm_token'] = rt_shm.token msg['rt_shm_token'] = rt_shm.token
@ -937,6 +979,16 @@ async def allocate_persistent_feed(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
# NOTE: if no high-freq sampled data has (yet) been loaded,
# seed the buffer with a history datum - this is most handy
# for many backends which don't sample @ 1s OHLC but do have
# slower data such as 1m OHLC.
if not len(rt_shm.array):
rt_shm.push(hist_shm.array[-3:-1])
ohlckeys = ['open', 'high', 'low', 'close']
rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1]
rt_shm.array['volume'][-2] = 0
# start sample loop and shm incrementer task for OHLC style sampling # start sample loop and shm incrementer task for OHLC style sampling
# at the above registered step periods. # at the above registered step periods.
try: try: