Finally get partial backfills after tsdb load workinnn

It took a little while (and a lot of commenting out of old no longer
needed code) but, this gets tsdb (from parquet file) loading *before*
final backfilling from the most recent history frame until the most
recent tsdb time stamp!

More or less all the convoluted concurrency shit we had for coping with
`marketstore` IPC junk is no longer needed, particularly all the query
size limits and accompanying load loops.. The recent frame loading
technique/order *has* now changed though since we'd like to show charts
asap once tsdb history loads.

The new load sequence is as follows:
- load mr (most recent) frame from backend.
- load existing history (one shot) from the "tsdb" aka parquet files
  with `polars`.
- backfill the gap part from the mr frame back to the tsdb start
  incrementally by making (hacky) `ShmArray.push(start=<blah>)` calls
  and *not* updating the `._first.value` while doing it XD

Dirtier deatz:
- make `tsdb_backfill()` run per timeframe in a separate task.
  - drop all the loop through timeframes and insert `dts_per_tf` crap.
  - only spawn a subtask for the `start_backfill()` call which in turn
    only does the gap backfilling as mentioned above.
- mask out all the code related to being limited to certain query sizes
  (over gRPC) as was restricted by marketstore.. not gonna go through
  what all of that was since it's probably getting deleted in a follow
  up commit.
- buncha off-by-one tweaks to do with backfilling the gap from mr frame
  to tsdb start.. mostly tinkered it to get it all right but seems to be
  working correctly B)
- still use the `broadcast_all()` msg stuff when doing the gap backfill
  though don't have it really working yet on the UI side (since
  previously we were relying on the shm first/last values.. so this will
  be "coming soon" :)
basic_buy_bot
Tyler Goodlet 2023-06-06 23:59:59 -04:00
parent 7a5c43d01a
commit 0dcfcea6ee
1 changed files with 515 additions and 259 deletions

View File

@ -24,7 +24,7 @@ from collections import (
) )
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
import time # import time
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Callable, Callable,
@ -34,7 +34,10 @@ from typing import (
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
import pendulum from pendulum import (
Duration,
from_timestamp,
)
import numpy as np import numpy as np
from ..accounting import ( from ..accounting import (
@ -64,113 +67,171 @@ if TYPE_CHECKING:
def diff_history( def diff_history(
array: np.ndarray, array: np.ndarray,
timeframe: int, # timeframe: int,
start_dt: datetime, # start_dt: datetime,
end_dt: datetime, # end_dt: datetime,
last_tsdb_dt: datetime | None = None
append_until_dt: datetime | None = None,
prepend_until_dt: datetime | None = None,
) -> np.ndarray: ) -> np.ndarray:
# no diffing with tsdb dt index possible.. # no diffing with tsdb dt index possible..
if last_tsdb_dt is None: if (
prepend_until_dt is None
and append_until_dt is None
):
return array return array
time = array['time'] times = array['time']
return array[time > last_tsdb_dt.timestamp()]
if append_until_dt:
return array[times < append_until_dt.timestamp()]
else:
return array[times >= prepend_until_dt.timestamp()]
# async def open_history_mngr(
# mod: ModuleType,
# mkt: MktPair,
# # shm: ShmArray,
# # timeframes: list[float] = [60, 1],
# timeframes: float,
# ) -> Callable[
# [int, datetime, datetime],
# tuple[np.ndarray, str]
# ]:
# '''
# Open a "history manager" for the backend data provider,
# get the latest "frames worth" of ohlcv history,
# push the history to shm and deliver
# the start datum's datetime value so that further history loading
# can be done until synchronized with the tsdb loaded time series.
# '''
# hist: Callable[
# [int, datetime, datetime],
# tuple[np.ndarray, str]
# ]
# config: dict[str, int]
# async with mod.open_history_client(
# mkt,
# ) as (hist, config):
# log.info(f'{mod} history client returned backfill config: {config}')
# # get latest query's worth of history all the way
# # back to what is recorded in the tsdb
# array, mr_start_dt, mr_end_dt = await hist(
# timeframe,
# end_dt=None,
# )
# times: np.ndarray = array['time']
# # sample period step size in seconds
# step_size_s = (
# from_timestamp(times[-1])
# - from_timestamp(times[-2])
# ).seconds
# if step_size_s not in (1, 60):
# log.error(f'Last 2 sample period is off!? -> {step_size_s}')
# step_size_s = (
# from_timestamp(times[-2])
# - from_timestamp(times[-3])
# ).seconds
# # NOTE: on the first history, most recent history
# # frame we PREPEND from the current shm ._last index
# # and thus a gap between the earliest datum loaded here
# # and the latest loaded from the tsdb may exist!
# log.info(f'Pushing {to_push.size} to shm!')
# shm.push(
# to_push,
# prepend=True,
# # start=
# )
# # if the market is open (aka we have a live feed) but the
# # history sample step index seems off we report the surrounding
# # data and drop into a bp. this case shouldn't really ever
# # happen if we're doing history retrieval correctly.
# # if (
# # step_size_s == 60
# # and feed_is_live.is_set()
# # ):
# # inow = round(time.time())
# # diff = inow - times[-1]
# # if abs(diff) > 60:
# # surr = array[-6:]
# # diff_in_mins = round(diff/60., ndigits=2)
# # log.warning(
# # f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n'
# # f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n'
# # 'Surrounding 6 time stamps:\n'
# # f'{list(surr["time"])}\n'
# # 'Here is surrounding 6 samples:\n'
# # f'{surr}\nn'
# # )
# # uncomment this for a hacker who wants to investigate
# # this case manually..
# # await tractor.breakpoint()
# # frame's worth of sample-period-steps, in seconds
# # frame_size_s = len(array) * step_size_s
# to_push = array
# # to_push = diff_history(
# # array,
# # # timeframe,
# # # mr_start_dt,
# # # mr_end_dt,
# # # backfill scenario for "most recent" frame
# # prepend_until_dt=last_tsdb_dt,
# # )
# # NOTE: on the first history, most recent history
# # frame we PREPEND from the current shm ._last index
# # and thus a gap between the earliest datum loaded here
# # and the latest loaded from the tsdb may exist!
# log.info(f'Pushing {to_push.size} to shm!')
# shm.push(
# to_push,
# prepend=True,
# # start=
# )
# # TODO: should we wrap this in a "history frame" type or
# # something?
# yield hist, mr_start_dt, mr_end_dt
async def start_backfill( async def start_backfill(
get_hist,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
timeframe: float, timeframe: float,
# sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
last_tsdb_dt: datetime | None = None, backfill_from_shm_index: int,
backfill_from_dt: datetime,
sampler_stream: tractor.MsgStream,
backfill_until_dt: datetime | None = None,
storage: StorageClient | None = None, storage: StorageClient | None = None,
write_tsdb: bool = True, write_tsdb: bool = True,
tsdb_is_up: bool = True, # tsdb_is_up: bool = True,
task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
) -> int: ) -> int:
hist: Callable[
[int, datetime, datetime],
tuple[np.ndarray, str]
]
config: dict[str, int]
async with mod.open_history_client(
mkt,
) as (hist, config):
log.info(f'{mod} history client returned backfill config: {config}')
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(
timeframe,
end_dt=None,
)
times = array['time']
# sample period step size in seconds
step_size_s = (
pendulum.from_timestamp(times[-1])
- pendulum.from_timestamp(times[-2])
).seconds
if step_size_s not in (1, 60):
log.error(f'Last 2 sample period is off!? -> {step_size_s}')
step_size_s = (
pendulum.from_timestamp(times[-2])
- pendulum.from_timestamp(times[-3])
).seconds
# if the market is open (aka we have a live feed) but the
# history sample step index seems off we report the surrounding
# data and drop into a bp. this case shouldn't really ever
# happen if we're doing history retrieval correctly.
if (
step_size_s == 60
and feed_is_live.is_set()
):
inow = round(time.time())
diff = inow - times[-1]
if abs(diff) > 60:
surr = array[-6:]
diff_in_mins = round(diff/60., ndigits=2)
log.warning(
f'STEP ERROR `{mkt.fqme}` for period {step_size_s}s:\n'
f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n'
'Surrounding 6 time stamps:\n'
f'{list(surr["time"])}\n'
'Here is surrounding 6 samples:\n'
f'{surr}\nn'
)
# uncomment this for a hacker who wants to investigate
# this case manually..
# await tractor.breakpoint()
# frame's worth of sample-period-steps, in seconds
frame_size_s = len(array) * step_size_s
to_push = diff_history(
array,
timeframe,
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
)
log.info(f'Pushing {to_push.size} to shm!')
shm.push(
to_push,
# prepend=True,
)
# TODO: *** THIS IS A BUG *** # TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqme.. # we need to only broadcast to subscribers for this fqme..
# otherwise all fsps get reset on every chart.. # otherwise all fsps get reset on every chart..
@ -180,43 +241,45 @@ async def start_backfill(
bf_done = trio.Event() bf_done = trio.Event()
# let caller unblock and deliver latest history frame # let caller unblock and deliver latest history frame
task_status.started(( task_status.started( #(
start_dt, # mr_start_dt,
end_dt, # mr_end_dt,
bf_done, bf_done,
)) )# )
# based on the sample step size, maybe load a certain amount history # based on the sample step size, maybe load a certain amount history
if last_tsdb_dt is None: update_start_on_prepend: bool = False
if backfill_until_dt is None:
if step_size_s not in (1, 60): if timeframe not in (1, 60):
raise ValueError( raise ValueError(
'`piker` only needs to support 1m and 1s sampling ' '`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer ' 'but ur api is trying to deliver a longer '
f'timeframe of {step_size_s} seconds..\n' f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.' 'So yuh.. dun do dat brudder.'
) )
# when no tsdb "last datum" is provided, we just load # when no tsdb "last datum" is provided, we just load
# some near-term history. # some near-term history.
# periods = {
# 1: {'days': 1},
# 60: {'days': 14},
# }
# if tsdb_is_up:
# do a decently sized backfill and load it into storage.
periods = { periods = {
1: {'days': 1}, 1: {'days': 6},
60: {'days': 14}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe]
if tsdb_is_up: update_start_on_prepend = True
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
60: {'years': 6},
}
period_duration = periods[step_size_s]
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
# settings above when the tsdb is detected as being empty. # settings above when the tsdb is detected as being empty.
last_tsdb_dt = start_dt.subtract(**period_duration) backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# configure async query throttling # configure async query throttling
# rate = config.get('rate', 1) # rate = config.get('rate', 1)
@ -228,18 +291,39 @@ async def start_backfill(
# per time stamp. # per time stamp.
starts: Counter[datetime] = Counter() starts: Counter[datetime] = Counter()
# inline sequential loop where we simply pass the # conduct "backward history filling" since
# last retrieved start dt to the next request as # no tsdb history yet exists.
# it's end dt.
while end_dt > last_tsdb_dt: # implemented via a simple inline sequential loop where we
# simply pass the last retrieved start dt to the next
# request as it's end dt.
# while end_dt < backfill_until_dt:
# while (
# end_dt is None # init case
# or end_dt < mr_start_dt
# ):
# conduct "forward filling" from the last time step
# loaded from the tsdb until the first step loaded
# just above
end_dt: datetime = backfill_from_dt
# start_dt: datetime = backfill_until_dt
next_prepend_index: int = backfill_from_shm_index
while end_dt > backfill_until_dt:
log.debug( log.debug(
f'Requesting {step_size_s}s frame ending in {start_dt}' f'Requesting {timeframe}s frame ending in {end_dt}'
) )
try: try:
array, next_start_dt, end_dt = await hist( (
array,
next_start_dt,
next_end_dt,
) = await get_hist(
timeframe, timeframe,
end_dt=start_dt, end_dt=end_dt,
# start_dt=start_dt,
) )
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
@ -272,15 +356,17 @@ async def start_backfill(
return return
# only update new start point if not-yet-seen # only update new start point if not-yet-seen
start_dt = next_start_dt start_dt: datetime = next_start_dt
starts[start_dt] += 1 starts[start_dt] += 1
assert array['time'][0] == start_dt.timestamp() assert array['time'][0] == start_dt.timestamp()
diff = end_dt - start_dt diff = end_dt - start_dt
frame_time_diff_s = diff.seconds frame_time_diff_s = diff.seconds
expected_frame_size_s = frame_size_s + step_size_s
# frame's worth of sample-period-steps, in seconds
frame_size_s = len(array) * timeframe
expected_frame_size_s = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s: if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
@ -294,10 +380,10 @@ async def start_backfill(
to_push = diff_history( to_push = diff_history(
array, array,
timeframe, # timeframe,
start_dt, # start_dt,
end_dt, # end_dt,
last_tsdb_dt=last_tsdb_dt, prepend_until_dt=backfill_until_dt,
) )
ln = len(to_push) ln = len(to_push)
if ln: if ln:
@ -314,11 +400,52 @@ async def start_backfill(
shm.push( shm.push(
to_push, to_push,
prepend=True, prepend=True,
# XXX: only update the ._first index if no tsdb
# segment was previously prepended by the
# parent task.
update_first=update_start_on_prepend,
# XXX: only prepend from a manually calculated shm
# index if there was already a tsdb history
# segment prepended (since then the
# ._first.value is going to be wayyy in the
# past!)
start=(
next_prepend_index
if not update_start_on_prepend
else None
),
) )
except ValueError: await sampler_stream.send({
'broadcast_all': {
'backfilling': True
},
})
# decrement next prepend point
next_prepend_index = next_prepend_index - ln
end_dt = next_start_dt
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
array = shm.array
zeros = array[array['low'] == 0]
if (
0 < zeros.size < 10
):
await tractor.breakpoint()
except ValueError as ve:
_ve = ve
log.info( log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?' f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
) )
await tractor.breakpoint()
# can't push the entire frame? so # can't push the entire frame? so
# push only the amount that can fit.. # push only the amount that can fit..
break break
@ -328,10 +455,12 @@ async def start_backfill(
f'{start_dt} -> {end_dt}' f'{start_dt} -> {end_dt}'
) )
# FINALLY, maybe write immediately to the tsdb backend for
# long-term storage.
if ( if (
storage is not None storage is not None
and write_tsdb and write_tsdb
# and False and False
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
@ -372,19 +501,22 @@ def push_tsdb_history_to_shm(
shm: ShmArray, shm: ShmArray,
tsdb_history: np.ndarray, tsdb_history: np.ndarray,
time_field_key: str, time_field_key: str,
prepend: bool = False,
) -> datetime: ) -> datetime:
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
prepend_start = shm._first.value prepend_start = shm._first.value
to_push = tsdb_history[-prepend_start:] to_push = tsdb_history[-prepend_start:]
shm.push( shm.push(
to_push, to_push,
# insert the history pre a "days worth" of samples # insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end. # to leave some real-time buffer space at the end.
prepend=True, prepend=prepend,
# update_first=False, # update_first=False,
# start=prepend_start, # start=prepend_start,
field_map=storemod.ohlc_key_map, field_map=storemod.ohlc_key_map,
@ -392,7 +524,7 @@ def push_tsdb_history_to_shm(
log.info(f'Loaded {to_push.shape} datums from storage') log.info(f'Loaded {to_push.shape} datums from storage')
tsdb_last_frame_start = tsdb_history[time_field_key][0] tsdb_last_frame_start = tsdb_history[time_field_key][0]
return pendulum.from_timestamp(tsdb_last_frame_start) return from_timestamp(tsdb_last_frame_start)
async def back_load_from_tsdb( async def back_load_from_tsdb(
@ -435,7 +567,7 @@ async def back_load_from_tsdb(
# assert (times[1] - times[0]) == 1 # assert (times[1] - times[0]) == 1
if len(array): if len(array):
shm_last_dt = pendulum.from_timestamp( shm_last_dt = from_timestamp(
shm.array[0]['time'] shm.array[0]['time']
) )
else: else:
@ -525,12 +657,16 @@ async def back_load_from_tsdb(
async def tsdb_backfill( async def tsdb_backfill(
mod: ModuleType, mod: ModuleType,
storemod: ModuleType, storemod: ModuleType,
bus: _FeedsBus, # bus: _FeedsBus,
tn: trio.Nursery,
storage: StorageClient, storage: StorageClient,
mkt: MktPair, mkt: MktPair,
shms: dict[int, ShmArray], # shms: dict[int, ShmArray],
# sampler_stream: tractor.MsgStream, shm: ShmArray,
feed_is_live: trio.Event, timeframe: float,
sampler_stream: tractor.MsgStream,
# feed_is_live: trio.Event,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ShmArray, ShmArray] tuple[ShmArray, ShmArray]
@ -540,18 +676,75 @@ async def tsdb_backfill(
# TODO: this should be used verbatim for the pure # TODO: this should be used verbatim for the pure
# shm backfiller approach below. # shm backfiller approach below.
dts_per_tf: dict[int, datetime] = {} # dts_per_tf: dict[int, datetime] = {}
fqme: str = mkt.fqme fqme: str = mkt.fqme
time_key: str = 'time' # time_key: str = 'time'
if getattr(storemod, 'ohlc_key_map', False): # if getattr(storemod, 'ohlc_key_map', False):
keymap: bidict = storemod.ohlc_key_map # keymap: bidict = storemod.ohlc_key_map
time_key: str = keymap.inverse['time'] # time_key: str = keymap.inverse['time']
# start history anal and load missing new data via backend. get_hist: Callable[
last_tsdb_dt: datetime | None = None [int, datetime, datetime],
timeframe: int # OHLC sample period tuple[np.ndarray, str]
for timeframe, shm in shms.items(): ]
config: dict[str, int]
async with mod.open_history_client(
mkt,
) as (get_hist, config):
log.info(f'{mod} history client returned backfill config: {config}')
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
array, mr_start_dt, mr_end_dt = await get_hist(
timeframe,
end_dt=None,
)
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
return
times: np.ndarray = array['time']
# sample period step size in seconds
step_size_s = (
from_timestamp(times[-1])
- from_timestamp(times[-2])
).seconds
if step_size_s not in (1, 60):
log.error(f'Last 2 sample period is off!? -> {step_size_s}')
step_size_s = (
from_timestamp(times[-2])
- from_timestamp(times[-3])
).seconds
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
# and thus a gap between the earliest datum loaded here
# and the latest loaded from the tsdb may exist!
log.info(f'Pushing {array.size} to shm!')
shm.push(
array,
prepend=True, # append on first frame
# start=
)
backfill_gap_from_shm_index: int = shm._first.value + 1
# tell parent task to continue
task_status.started()
# start history anal and load missing new data via backend.
# backfill_until_dt: datetime | None = None
# started_after_tsdb_load: bool = False
# for timeframe, shm in shms.items():
# loads a (large) frame of data from the tsdb depending # loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using # on the db's query size limit; our "nativedb" (using
@ -563,6 +756,7 @@ async def tsdb_backfill(
timeframe=timeframe, timeframe=timeframe,
) )
last_tsdb_dt: datetime | None = None
if tsdb_entry: if tsdb_entry:
( (
tsdb_history, tsdb_history,
@ -570,106 +764,160 @@ async def tsdb_backfill(
last_tsdb_dt, last_tsdb_dt,
) = tsdb_entry ) = tsdb_entry
tsdb_last_frame_start: datetime = push_tsdb_history_to_shm( # calc the index from which the tsdb data should be
storemod, # prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
offset_samples: int = round(offset_s / timeframe)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
# tsdb_last_frame_start: datetime = push_tsdb_history_to_shm(
# storemod,
# shm,
# tsdb_history,
# time_key,
# prepend=True,
# )
# assert tsdb_last_frame_start == first_tsdb_dt
# unblock the feed bus management task
# assert len(shms[1].array)
# if not started_after_tsdb_load:
# task_status.started()
# started_after_tsdb_load = True
# begin backfiller task ASAP
# try:
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
# try:
# (
# latest_start_dt,
# latest_end_dt,
bf_done = await tn.start(
partial(
start_backfill,
get_hist,
mod,
mkt,
shm, shm,
tsdb_history,
time_key,
)
assert tsdb_last_frame_start == first_tsdb_dt
# begin backfiller task ASAP
try:
(
latest_start_dt,
latest_end_dt,
bf_done,
) = await bus.nursery.start(
partial(
start_backfill,
mod,
mkt,
shm,
timeframe,
# sampler_stream,
feed_is_live,
last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True,
storage=storage,
)
)
if tsdb_entry:
dts_per_tf[timeframe] = (
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
)
except DataUnavailable:
# XXX: timeframe not supported for backend (since
# above exception type), so skip and move on to next.
continue
# tsdb_history = series.get(timeframe)
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill
# query-iteration, then the sample step sizing calcs will
# break upstream from here since you can't diff on at least
# 2 steps... probably should also add logic to compute from
# the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se.
# unblock the feed bus management task
# assert len(shms[1].array)
task_status.started()
# backload any further data from tsdb (concurrently per
# timeframe) if not all data was able to be loaded (in memory)
# from the ``StorageClient.load()`` call above.
async with trio.open_nursery() as nurse:
for timeframe, shm in shms.items():
entry = dts_per_tf.get(timeframe)
if not entry:
continue
(
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
) = entry
if not tsdb_history.size:
continue
nurse.start_soon(
back_load_from_tsdb,
storemod,
storage,
fqme,
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
timeframe, timeframe,
shm,
)
# try: backfill_from_shm_index=backfill_gap_from_shm_index,
# await trio.sleep_forever() backfill_from_dt=mr_start_dt,
# finally: backfill_until_dt=last_tsdb_dt,
# write_ohlcv sampler_stream=sampler_stream,
# feed_is_live,
storage=storage,
# tsdb_is_up=True,
)
)
# if tsdb_entry:
# dts_per_tf[timeframe] = (
# tsdb_history,
# last_tsdb_dt,
# latest_start_dt,
# latest_end_dt,
# bf_done,
# )
# elif not started_after_tsdb_load:
# task_status.started()
# started_after_tsdb_load = True
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
# except DataUnavailable:
# return
# continue
# tsdb_history = series.get(timeframe)
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill
# query-iteration, then the sample step sizing calcs will
# break upstream from here since you can't diff on at least
# 2 steps... probably should also add logic to compute from
# the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se.
# backload any further data from tsdb (concurrently per
# timeframe) if not all data was able to be loaded (in memory)
# from the ``StorageClient.load()`` call above.
try:
await trio.sleep_forever()
finally:
return
# write_ohlcv
# IF we need to continue backloading incrementall from the
# tsdb client..
tn.start_soon(
back_load_from_tsdb,
storemod,
storage,
fqme,
tsdb_history,
last_tsdb_dt,
mr_start_dt,
mr_end_dt,
bf_done,
timeframe,
shm,
)
# async with trio.open_nursery() as nurse:
# for timeframe, shm in shms.items():
# entry = dts_per_tf.get(timeframe)
# if not entry:
# continue
# (
# tsdb_history,
# last_tsdb_dt,
# latest_start_dt,
# latest_end_dt,
# bf_done,
# ) = entry
# if not tsdb_history.size:
# continue
# try:
# await trio.sleep_forever()
# finally:
# write_ohlcv
async def manage_history( async def manage_history(
@ -773,7 +1021,10 @@ async def manage_history(
# TODO: maybe it should be a subpkg of `.data`? # TODO: maybe it should be a subpkg of `.data`?
from piker import storage from piker import storage
async with storage.open_storage_client() as (storemod, client): async with (
storage.open_storage_client() as (storemod, client),
trio.open_nursery() as tn,
):
log.info( log.info(
f'Connecting to storage backend `{storemod.name}`:\n' f'Connecting to storage backend `{storemod.name}`:\n'
f'location: {client.address}\n' f'location: {client.address}\n'
@ -793,30 +1044,10 @@ async def manage_history(
# backfiller can do a single append from it's end datum and # backfiller can do a single append from it's end datum and
# then prepends backward to that from the current time # then prepends backward to that from the current time
# step. # step.
await bus.nursery.start( tf2mem: dict = {
tsdb_backfill, 1: rt_shm,
mod, 60: hist_shm,
storemod, }
bus,
client,
mkt,
{
1: rt_shm,
60: hist_shm,
},
# sample_stream,
feed_is_live,
)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# wait for a live feed before starting the sampler.
await feed_is_live.wait()
# register 1s and 1m buffers with the global incrementer task
async with open_sample_stream( async with open_sample_stream(
period_s=1., period_s=1.,
shms_by_period={ shms_by_period={
@ -832,8 +1063,33 @@ async def manage_history(
sub_for_broadcasts=False, sub_for_broadcasts=False,
) as sample_stream: ) as sample_stream:
# register 1s and 1m buffers with the global incrementer task
log.info(f'Connected to sampler stream: {sample_stream}') log.info(f'Connected to sampler stream: {sample_stream}')
for timeframe in [60, 1]:
await tn.start(
tsdb_backfill,
mod,
storemod,
tn,
# bus,
client,
mkt,
tf2mem[timeframe],
timeframe,
sample_stream,
# feed_is_live,
)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# wait for a live feed before starting the sampler.
await feed_is_live.wait()
# yield back after client connect with filled shm # yield back after client connect with filled shm
task_status.started(( task_status.started((
hist_zero_index, hist_zero_index,