First draft history loading rework

It was a concurrency-hack mess somewhat due to all sorts of limitations
imposed by marketstore (query size limits, strange datetime/timestamp
errors, slow table loads for large queries..) and we can drastically
simplify. There's still some issues with getting new backfills (not yet
in storage) correctly prepended: there's sometimes little gaps due to shm
races when reading history indexing vs. when the live-feed startup
finishes.

We generally need tests for all this and likely a better rework of the
feed layer's init such that we're showing history in chart afap instead
of waiting on backfills or the live feed to come up.

Much more to come B)
basic_buy_bot
Tyler Goodlet 2023-06-02 12:17:31 -04:00
parent 0ba3c798d7
commit c52e889fe5
2 changed files with 339 additions and 261 deletions

View File

@ -340,7 +340,7 @@ async def allocate_persistent_feed(
# yield back control to starting nursery once we receive either # yield back control to starting nursery once we receive either
# some history or a real-time quote. # some history or a real-time quote.
log.info(f'waiting on history to load: {fqme}') log.info(f'loading OHLCV history: {fqme}')
await some_data_ready.wait() await some_data_ready.wait()
flume = Flume( flume = Flume(
@ -370,7 +370,8 @@ async def allocate_persistent_feed(
mkt.bs_fqme: flume, mkt.bs_fqme: flume,
}) })
# signal the ``open_feed_bus()`` caller task to continue # signal the ``open_feed_bus()`` caller task to continue since
# we now have (some) history pushed to the shm buffer.
task_status.started(init) task_status.started(init)
if not start_stream: if not start_stream:

View File

@ -57,6 +57,7 @@ from ..brokers._util import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from bidict import bidict
from ..service.marketstore import StorageClient from ..service.marketstore import StorageClient
from .feed import _FeedsBus from .feed import _FeedsBus
@ -83,13 +84,13 @@ async def start_backfill(
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
timeframe: float, timeframe: float,
sampler_stream: tractor.MsgStream, # sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event, feed_is_live: trio.Event,
last_tsdb_dt: datetime | None = None, last_tsdb_dt: datetime | None = None,
storage: StorageClient | None = None, storage: StorageClient | None = None,
write_tsdb: bool = True, write_tsdb: bool = True,
tsdb_is_up: bool = False, tsdb_is_up: bool = True,
task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
@ -120,6 +121,13 @@ async def start_backfill(
- pendulum.from_timestamp(times[-2]) - pendulum.from_timestamp(times[-2])
).seconds ).seconds
if step_size_s not in (1, 60):
log.error(f'Last 2 sample period is off!? -> {step_size_s}')
step_size_s = (
pendulum.from_timestamp(times[-2])
- pendulum.from_timestamp(times[-3])
).seconds
# if the market is open (aka we have a live feed) but the # if the market is open (aka we have a live feed) but the
# history sample step index seems off we report the surrounding # history sample step index seems off we report the surrounding
# data and drop into a bp. this case shouldn't really ever # data and drop into a bp. this case shouldn't really ever
@ -158,12 +166,15 @@ async def start_backfill(
) )
log.info(f'Pushing {to_push.size} to shm!') log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push, prepend=True) shm.push(
to_push,
# prepend=True,
)
# TODO: *** THIS IS A BUG *** # TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqme.. # we need to only broadcast to subscribers for this fqme..
# otherwise all fsps get reset on every chart.. # otherwise all fsps get reset on every chart..
await sampler_stream.send('broadcast_all') # await sampler_stream.send('broadcast_all')
# signal that backfilling to tsdb's end datum is complete # signal that backfilling to tsdb's end datum is complete
bf_done = trio.Event() bf_done = trio.Event()
@ -297,9 +308,13 @@ async def start_backfill(
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}'
) )
# bail gracefully on shm allocation overrun/full condition # bail gracefully on shm allocation overrun/full
# condition
try: try:
shm.push(to_push, prepend=True) shm.push(
to_push,
prepend=True,
)
except ValueError: except ValueError:
log.info( log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?' f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
@ -316,6 +331,7 @@ async def start_backfill(
if ( if (
storage is not None storage is not None
and write_tsdb and write_tsdb
# and False
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
@ -334,7 +350,7 @@ async def start_backfill(
await storage.write_ohlcv( await storage.write_ohlcv(
col_sym_key, col_sym_key,
to_push, shm.array,
timeframe, timeframe,
) )
@ -345,158 +361,83 @@ async def start_backfill(
# in the block above to avoid entering new ``frames`` # in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to # values while we're pipelining the current ones to
# memory... # memory...
await sampler_stream.send('broadcast_all') # await sampler_stream.send('broadcast_all')
# short-circuit (for now) # short-circuit (for now)
bf_done.set() bf_done.set()
async def basic_backfill( def push_tsdb_history_to_shm(
bus: _FeedsBus,
mod: ModuleType,
mkt: MktPair,
shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
) -> None:
# do a legacy incremental backfill from the provider.
log.info('No TSDB (marketstored) found, doing basic backfill..')
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
for timeframe, shm in shms.items():
try:
await bus.nursery.start(
partial(
start_backfill,
mod,
mkt,
shm,
timeframe,
sampler_stream,
feed_is_live,
)
)
except DataUnavailable:
# XXX: timeframe not supported for backend
continue
async def tsdb_backfill(
mod: ModuleType,
storemod: ModuleType, storemod: ModuleType,
bus: _FeedsBus,
storage: StorageClient,
mkt: MktPair,
shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
dts_per_tf: dict[int, datetime] = {}
fqme: str = mkt.fqme
# start history anal and load missing new data via backend.
timeframe: int
for timeframe, shm in shms.items():
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit.
tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load(
fqme,
timeframe=timeframe,
)
try:
(
latest_start_dt,
latest_end_dt,
bf_done,
) = await bus.nursery.start(
partial(
start_backfill,
mod,
mkt,
shm,
timeframe,
sampler_stream,
feed_is_live,
last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True,
storage=storage,
)
)
except DataUnavailable:
# XXX: timeframe not supported for backend
dts_per_tf[timeframe] = (
tsdb_history,
last_tsdb_dt,
None,
None,
None,
)
continue
# tsdb_history = series.get(timeframe)
dts_per_tf[timeframe] = (
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
)
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill
# query-iteration, then the sample step sizing calcs will
# break upstream from here since you can't diff on at least
# 2 steps... probably should also add logic to compute from
# the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se.
# unblock the feed bus management task
# assert len(shms[1].array)
task_status.started()
async def back_load_from_tsdb(
timeframe: int,
shm: ShmArray, shm: ShmArray,
): tsdb_history: np.ndarray,
( time_field_key: str,
tsdb_history, ) -> datetime:
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
) = dts_per_tf[timeframe]
# sync to backend history task's query/load completion
if bf_done:
await bf_done.wait()
# TODO: eventually it'd be nice to not require a shm array/buffer
# to accomplish this.. maybe we can do some kind of tsdb direct to
# graphics format eventually in a child-actor?
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
prepend_start = shm._first.value prepend_start = shm._first.value
to_push = tsdb_history[-prepend_start:]
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
# start=prepend_start,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
tsdb_last_frame_start = tsdb_history[time_field_key][0]
return pendulum.from_timestamp(tsdb_last_frame_start)
async def back_load_from_tsdb(
storemod: ModuleType,
storage: StorageClient,
fqme: str,
# dts_per_tf: dict[int, datetime],
tsdb_history: np.ndarray,
last_tsdb_dt: datetime,
latest_start_dt: datetime,
latest_end_dt: datetime,
bf_done: trio.Event,
timeframe: int,
shm: ShmArray,
):
assert len(tsdb_history)
# sync to backend history task's query/load completion
# if bf_done:
# await bf_done.wait()
# TODO: eventually it'd be nice to not require a shm array/buffer
# to accomplish this.. maybe we can do some kind of tsdb direct to
# graphics format eventually in a child-actor?
if storemod.name == 'nativedb':
return
await tractor.breakpoint()
assert shm._first.value == 0
array = shm.array array = shm.array
# if timeframe == 1:
# times = shm.array['time']
# assert (times[1] - times[0]) == 1
if len(array): if len(array):
shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) shm_last_dt = pendulum.from_timestamp(
shm.array[0]['time']
)
else: else:
shm_last_dt = None shm_last_dt = None
@ -525,29 +466,17 @@ async def tsdb_backfill(
# Load TSDB history into shm buffer (for display) if there is # Load TSDB history into shm buffer (for display) if there is
# remaining buffer space. # remaining buffer space.
if ( time_key: str = 'time'
len(tsdb_history) if getattr(storemod, 'ohlc_key_map', False):
): keymap: bidict = storemod.ohlc_key_map
# load the first (smaller) bit of history originally loaded time_key: str = keymap.inverse['time']
# above from ``StorageClient.load()``.
to_push = tsdb_history[-prepend_start:]
shm.push(
to_push,
# insert the history pre a "days worth" of samples # if (
# to leave some real-time buffer space at the end. # not len(tsdb_history)
prepend=True, # ):
# update_first=False, # return
# start=prepend_start,
field_map=storemod.ohlc_key_map,
)
tsdb_last_frame_start = tsdb_history['Epoch'][0]
if timeframe == 1:
times = shm.array['time']
assert (times[1] - times[0]) == 1
tsdb_last_frame_start: datetime = last_tsdb_dt
# load as much from storage into shm possible (depends on # load as much from storage into shm possible (depends on
# user's shm size settings). # user's shm size settings).
while shm._first.value > 0: while shm._first.value > 0:
@ -558,28 +487,24 @@ async def tsdb_backfill(
end=tsdb_last_frame_start, end=tsdb_last_frame_start,
) )
# empty query # # empty query
if not len(tsdb_history): # if not len(tsdb_history):
break # break
next_start = tsdb_history['Epoch'][0] next_start = tsdb_history[time_key][0]
if next_start >= tsdb_last_frame_start: if next_start >= tsdb_last_frame_start:
# no earlier data detected # no earlier data detected
break break
else: else:
tsdb_last_frame_start = next_start tsdb_last_frame_start = next_start
prepend_start = shm._first.value tsdb_last_frame_start: datetime = push_tsdb_history_to_shm(
to_push = tsdb_history[-prepend_start:] storemod,
shm,
# insert the history pre a "days worth" of samples tsdb_history,
# to leave some real-time buffer space at the end. time_key,
shm.push(
to_push,
prepend=True,
field_map=storemod.ohlc_key_map,
) )
log.info(f'Loaded {to_push.shape} datums from storage')
# manually trigger step update to update charts/fsps # manually trigger step update to update charts/fsps
# which need an incremental update. # which need an incremental update.
@ -592,21 +517,160 @@ async def tsdb_backfill(
# (usually a chart showing graphics for said fsp) # (usually a chart showing graphics for said fsp)
# which tells the chart to conduct a manual full # which tells the chart to conduct a manual full
# graphics loop cycle. # graphics loop cycle.
await sampler_stream.send('broadcast_all') # await sampler_stream.send('broadcast_all')
# TODO: write new data to tsdb to be ready to for next read. # TODO: write new data to tsdb to be ready to for next read.
# backload from db (concurrently per timeframe) once backfilling of
# recent dat a loaded from the backend provider (see async def tsdb_backfill(
# ``bf_done.wait()`` call). mod: ModuleType,
storemod: ModuleType,
bus: _FeedsBus,
storage: StorageClient,
mkt: MktPair,
shms: dict[int, ShmArray],
# sampler_stream: tractor.MsgStream,
feed_is_live: trio.Event,
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
dts_per_tf: dict[int, datetime] = {}
fqme: str = mkt.fqme
time_key: str = 'time'
if getattr(storemod, 'ohlc_key_map', False):
keymap: bidict = storemod.ohlc_key_map
time_key: str = keymap.inverse['time']
# start history anal and load missing new data via backend.
last_tsdb_dt: datetime | None = None
timeframe: int # OHLC sample period
for timeframe, shm in shms.items():
# loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem
# but if not then below the remaining history can be lazy
# loaded?
tsdb_entry: tuple | None = await storage.load(
fqme,
timeframe=timeframe,
)
if tsdb_entry:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
tsdb_last_frame_start: datetime = push_tsdb_history_to_shm(
storemod,
shm,
tsdb_history,
time_key,
)
assert tsdb_last_frame_start == first_tsdb_dt
# begin backfiller task ASAP
try:
(
latest_start_dt,
latest_end_dt,
bf_done,
) = await bus.nursery.start(
partial(
start_backfill,
mod,
mkt,
shm,
timeframe,
# sampler_stream,
feed_is_live,
last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True,
storage=storage,
)
)
if tsdb_entry:
dts_per_tf[timeframe] = (
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
)
except DataUnavailable:
# XXX: timeframe not supported for backend (since
# above exception type), so skip and move on to next.
continue
# tsdb_history = series.get(timeframe)
# if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill
# query-iteration, then the sample step sizing calcs will
# break upstream from here since you can't diff on at least
# 2 steps... probably should also add logic to compute from
# the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se.
# unblock the feed bus management task
# assert len(shms[1].array)
task_status.started()
# backload any further data from tsdb (concurrently per
# timeframe) if not all data was able to be loaded (in memory)
# from the ``StorageClient.load()`` call above.
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
for timeframe, shm in shms.items(): for timeframe, shm in shms.items():
entry = dts_per_tf.get(timeframe)
if not entry:
continue
(
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
) = entry
if not tsdb_history.size:
continue
nurse.start_soon( nurse.start_soon(
back_load_from_tsdb, back_load_from_tsdb,
storemod,
storage,
fqme,
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
bf_done,
timeframe, timeframe,
shm, shm,
) )
# try:
# await trio.sleep_forever()
# finally:
# write_ohlcv
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
@ -624,8 +688,23 @@ async def manage_history(
''' '''
Load and manage historical data including the loading of any Load and manage historical data including the loading of any
available series from any connected tsdb as well as conduct available series from any connected tsdb as well as conduct
real-time update of both that existing db and the allocated shared real-time update of both that existing db and the allocated
memory buffer. shared memory buffer.
Init sequence:
- allocate shm (numpy array) buffers for 60s & 1s sample rates
- configure "zero index" for each buffer: the index where
history will prepended *to* and new live data will be
appened *from*.
- open a ``.storage.StorageClient`` and load any existing tsdb
history as well as (async) start a backfill task which loads
missing (newer) history from the data provider backend:
- tsdb history is loaded first and pushed to shm ASAP.
- the backfill task loads the most recent history before
unblocking its parent task, so that the `ShmArray._last` is
up to date to allow the OHLC sampler to begin writing new
samples as the correct buffer index once the provider feed
engages.
''' '''
# TODO: is there a way to make each shm file key # TODO: is there a way to make each shm file key
@ -684,34 +763,36 @@ async def manage_history(
"Persistent shm for sym was already open?!" "Persistent shm for sym was already open?!"
) )
# register 1s and 1m buffers with the global incrementer task
async with open_sample_stream(
period_s=1.,
shms_by_period={
1.: rt_shm.token,
60.: hist_shm.token,
},
# NOTE: we want to only open a stream for doing broadcasts on
# backfill operations, not receive the sample index-stream
# (since there's no code in this data feed layer that needs to
# consume it).
open_index_stream=True,
sub_for_broadcasts=False,
) as sample_stream:
open_history_client = getattr( open_history_client = getattr(
mod, mod,
'open_history_client', 'open_history_client',
None, None,
) )
assert open_history_client assert open_history_client
from .. import storage
try: # TODO: maybe it should be a subpkg of `.data`?
from piker import storage
async with storage.open_storage_client() as (storemod, client): async with storage.open_storage_client() as (storemod, client):
log.info(f'Found existing `{storemod.name}`') log.info(
# TODO: drop returning the output that we pass in? f'Connecting to storage backend `{storemod.name}`:\n'
f'location: {client.address}\n'
f'db cardinality: {client.cardinality}\n'
# TODO: show backend config, eg:
# - network settings
# - storage size with compression
# - number of loaded time series?
)
# NOTE: this call ONLY UNBLOCKS once the latest-most frame
# (i.e. history just before the live feed latest datum) of
# history has been loaded and written to the shm buffer:
# - the backfiller task can write in reverse chronological
# to the shm and tsdb
# - the tsdb data can be loaded immediately and the
# backfiller can do a single append from it's end datum and
# then prepends backward to that from the current time
# step.
await bus.nursery.start( await bus.nursery.start(
tsdb_backfill, tsdb_backfill,
mod, mod,
@ -723,10 +804,36 @@ async def manage_history(
1: rt_shm, 1: rt_shm,
60: hist_shm, 60: hist_shm,
}, },
sample_stream, # sample_stream,
feed_is_live, feed_is_live,
) )
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# wait for a live feed before starting the sampler.
await feed_is_live.wait()
# register 1s and 1m buffers with the global incrementer task
async with open_sample_stream(
period_s=1.,
shms_by_period={
1.: rt_shm.token,
60.: hist_shm.token,
},
# NOTE: we want to only open a stream for doing
# broadcasts on backfill operations, not receive the
# sample index-stream (since there's no code in this
# data feed layer that needs to consume it).
open_index_stream=True,
sub_for_broadcasts=False,
) as sample_stream:
log.info(f'Connected to sampler stream: {sample_stream}')
# yield back after client connect with filled shm # yield back after client connect with filled shm
task_status.started(( task_status.started((
hist_zero_index, hist_zero_index,
@ -735,37 +842,7 @@ async def manage_history(
rt_shm, rt_shm,
)) ))
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# history retreival loop depending on user interaction # history retreival loop depending on user interaction
# and thus a small RPC-prot for remotely controllinlg # and thus a small RPC-prot for remotely controllinlg
# what data is loaded for viewing. # what data is loaded for viewing.
await trio.sleep_forever() await trio.sleep_forever()
except storage.StorageConnectionError:
log.exception(
"Can't connect to tsdb backend!?\n"
'Starting basic backfille to shm..'
)
await basic_backfill(
bus,
mod,
mkt,
{
1: rt_shm,
60: hist_shm,
},
sample_stream,
feed_is_live,
)
task_status.started((
hist_zero_index,
hist_shm,
rt_zero_index,
rt_shm,
))
some_data_ready.set()
await trio.sleep_forever()