Add concurrent multi-time-frame history loading

Our default sample periods are 60s (1m) for the history chart and 1s for
the fast chart. This patch adds concurrent loading of both (or more)
different sample period data sets using the existing loading code but
with new support for looping through a passed "timeframe" table which
points to each shm instance.

More detailed adjustments include:
- breaking the "basic" and tsdb loading into 2 new funcs:
  `basic_backfill()` and `tsdb_backfill()` the latter of which is run
  when the tsdb daemon is discovered.
- adjust the fast shm buffer to offset with one day's worth of 1s so
  that only up to a day is backfilled as history in the fast chart.
- adjust bus task starting in `manage_history()` to deliver back the
  offset indices for both fast and slow shms and set them on the
  `Feed` object as `.izero_hist/rt: int` values:
  - allows the chart-UI linked view region handlers to use the offsets
    in the view-linking-transform math to index-align the history and
    fast chart.
ib_1m_hist
Tyler Goodlet 2022-10-20 21:19:26 -04:00
parent 330d16262e
commit 956c7d3435
1 changed files with 286 additions and 195 deletions

View File

@ -21,7 +21,7 @@ This module is enabled for ``brokerd`` daemons.
""" """
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from dataclasses import ( from dataclasses import (
dataclass, dataclass,
field, field,
@ -287,7 +287,7 @@ async def start_backfill(
- pendulum.from_timestamp(times[-2]) - pendulum.from_timestamp(times[-2])
).seconds ).seconds
# "frame"'s worth of sample period steps in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s = len(array) * step_size_s frame_size_s = len(array) * step_size_s
to_push = diff_history( to_push = diff_history(
@ -298,7 +298,7 @@ async def start_backfill(
) )
log.info(f'Pushing {to_push.size} to shm!') log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push) shm.push(to_push, prepend=True)
# TODO: *** THIS IS A BUG *** # TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn.. # we need to only broadcast to subscribers for this fqsn..
@ -310,7 +310,11 @@ async def start_backfill(
bf_done = trio.Event() bf_done = trio.Event()
# let caller unblock and deliver latest history frame # let caller unblock and deliver latest history frame
task_status.started((start_dt, end_dt, bf_done)) task_status.started((
start_dt,
end_dt,
bf_done,
))
# based on the sample step size, maybe load a certain amount history # based on the sample step size, maybe load a certain amount history
if last_tsdb_dt is None: if last_tsdb_dt is None:
@ -325,14 +329,14 @@ async def start_backfill(
# when no tsdb "last datum" is provided, we just load # when no tsdb "last datum" is provided, we just load
# some near-term history. # some near-term history.
periods = { periods = {
1: {'seconds': 4000}, 1: {'days': 1},
60: {'days': 14}, 60: {'days': 14},
} }
if tsdb_is_up: if tsdb_is_up:
# do a decently sized backfill and load it into storage. # do a decently sized backfill and load it into storage.
periods = { periods = {
1: {'days': 6}, 1: {'days': 1},
60: {'years': 6}, 60: {'years': 6},
} }
@ -461,79 +465,54 @@ async def start_backfill(
return return
async def manage_history( async def basic_backfill(
mod: ModuleType,
bus: _FeedsBus, bus: _FeedsBus,
fqsn: str, mod: ModuleType,
some_data_ready: trio.Event, bfqsn: str,
feed_is_live: trio.Event, shms: dict[int, ShmArray],
timeframe: float = 60, # in seconds
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
'''
Load and manage historical data including the loading of any
available series from `marketstore` as well as conducting real-time
update of both that existing db and the allocated shared memory
buffer.
''' # do a legacy incremental backfill from the provider.
# (maybe) allocate shm array for this broker/symbol which will log.info('No TSDB (marketstored) found, doing basic backfill..')
# be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_hist',
# use any broker defined ohlc dtype: # start history backfill task ``backfill_bars()`` is
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), # a required backend func this must block until shm is
# filled with first set of ohlc bars
# we expect the sub-actor to write for timeframe, shm in shms.items():
readonly=False, await bus.nursery.start(
partial(
start_backfill,
mod,
bfqsn,
shm,
timeframe=timeframe,
) )
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
) )
rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt',
# use any broker defined ohlc dtype: async def tsdb_backfill(
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), mod: ModuleType,
marketstore: ModuleType,
bus: _FeedsBus,
storage: Storage,
fqsn: str,
bfqsn: str,
shms: dict[int, ShmArray],
# we expect the sub-actor to write # some_data_ready: trio.Event,
readonly=False, task_status: TaskStatus[
size=3*_secs_in_day, tuple[ShmArray, ShmArray]
) ] = trio.TASK_STATUS_IGNORED,
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`') ) -> None:
is_up = await check_for_service('marketstored')
# for now only do backfilling if no tsdb can be found
do_legacy_backfill = not is_up and opened
bfqsn = fqsn.replace('.' + mod.name, '')
open_history_client = getattr(mod, 'open_history_client', None)
assert open_history_client
if is_up and opened and open_history_client:
log.info('Found existing `marketstored`')
from . import marketstore
async with marketstore.open_storage_client(
fqsn,
) as storage:
# TODO: this should be used verbatim for the pure # TODO: this should be used verbatim for the pure
# shm backfiller approach below. # shm backfiller approach below.
dts_per_tf: dict[int, datetime] = {}
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
for timeframe, shm in shms.items():
series, _, last_tsdb_dt = await storage.load( series, _, last_tsdb_dt = await storage.load(
fqsn, fqsn,
timeframe=timeframe, timeframe=timeframe,
@ -549,13 +528,19 @@ async def manage_history(
start_backfill, start_backfill,
mod, mod,
bfqsn, bfqsn,
hist_shm, shm,
timeframe=timeframe, timeframe=timeframe,
last_tsdb_dt=last_tsdb_dt, last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True, tsdb_is_up=True,
storage=storage, storage=storage,
) )
) )
dts_per_tf[timeframe] = (
series.get(timeframe),
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
)
# if len(hist_shm.array) < 2: # if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last # TODO: there's an edge case here to solve where if the last
@ -567,10 +552,24 @@ async def manage_history(
# the tsdb series and stash that somewhere as meta data on # the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se. # the shm buffer?.. no se.
task_status.started((hist_shm, rt_shm)) # unblock the feed bus management task
some_data_ready.set() assert len(shms[1].array)
task_status.started((
shms[60],
shms[1],
))
# sync to backend history task's query/load completion
await bf_done.wait() await bf_done.wait()
for timeframe, shm in shms.items():
(
tsdb_history,
last_tsdb_dt,
latest_start_dt,
latest_end_dt,
) = dts_per_tf[timeframe]
# do diff against last start frame of history and only fill # do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent # in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data. # to be loaded into mem *before* tsdb data.
@ -584,17 +583,15 @@ async def manage_history(
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
prepend_start = hist_shm._first.value prepend_start = shm._first.value
# sanity check on most-recent-data loading # sanity check on most-recent-data loading
assert prepend_start > dt_diff_s assert prepend_start > dt_diff_s
history = list(series.values()) if tsdb_history and len(tsdb_history):
if history: to_push = tsdb_history[:prepend_start]
fastest = history[0]
to_push = fastest[:prepend_start]
hist_shm.push( shm.push(
to_push, to_push,
# insert the history pre a "days worth" of samples # insert the history pre a "days worth" of samples
@ -607,23 +604,18 @@ async def manage_history(
# load as much from storage into shm as space will # load as much from storage into shm as space will
# allow according to user's shm size settings. # allow according to user's shm size settings.
count = 0 end = tsdb_history['Epoch'][0]
end = fastest['Epoch'][0]
while hist_shm._first.value > 0: while shm._first.value > 0:
count += 1
series = await storage.read_ohlcv( series = await storage.read_ohlcv(
fqsn, fqsn,
end=end, end=end,
timeframe=timeframe, timeframe=timeframe,
) )
history = list(series.values())
fastest = history[0]
end = fastest['Epoch'][0]
prepend_start -= len(to_push) prepend_start -= len(to_push)
to_push = fastest[:prepend_start] to_push = tsdb_history[:prepend_start]
hist_shm.push( shm.push(
to_push, to_push,
# insert the history pre a "days worth" of samples # insert the history pre a "days worth" of samples
@ -648,32 +640,114 @@ async def manage_history(
for delay_s in sampler.subscribers: for delay_s in sampler.subscribers:
await broadcast(delay_s) await broadcast(delay_s)
if count > 6:
break
log.info(f'Loaded {to_push.shape} datums from storage') log.info(f'Loaded {to_push.shape} datums from storage')
# TODO: write new data to tsdb to be ready to for next read. # TODO: write new data to tsdb to be ready to for next read.
if do_legacy_backfill:
# do a legacy incremental backfill from the provider.
log.info('No existing `marketstored` found..')
# start history backfill task ``backfill_bars()`` is async def manage_history(
# a required backend func this must block until shm is mod: ModuleType,
# filled with first set of ohlc bars bus: _FeedsBus,
await bus.nursery.start( fqsn: str,
partial( some_data_ready: trio.Event,
start_backfill, feed_is_live: trio.Event,
mod, timeframe: float = 60, # in seconds
bfqsn,
hist_shm, task_status: TaskStatus[
timeframe=timeframe, tuple[ShmArray, ShmArray]
] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Load and manage historical data including the loading of any
available series from `marketstore` as well as conducting real-time
update of both that existing db and the allocated shared memory
buffer.
'''
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_hist',
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
) )
hist_zero_index = hist_shm.index - 1
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt',
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
size=3*_secs_in_day,
)
# (for now) set the rt (hft) shm array with space to prepend
# only a days worth of 1s history.
days = 1
rt_shm._first.value = days*_secs_in_day
rt_shm._last.value = days*_secs_in_day
rt_zero_index = rt_shm.index - 1
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`')
tsdb_is_up = await check_for_service('marketstored')
bfqsn = fqsn.replace('.' + mod.name, '')
open_history_client = getattr(mod, 'open_history_client', None)
assert open_history_client
if (
tsdb_is_up
and opened
and open_history_client
):
log.info('Found existing `marketstored`')
from . import marketstore
async with (
marketstore.open_storage_client(fqsn)as storage,
):
hist_shm, rt_shm = await bus.nursery.start(
tsdb_backfill,
mod,
marketstore,
bus,
storage,
fqsn,
bfqsn,
{
1: rt_shm,
60: hist_shm,
},
# some_data_ready=some_data_ready,
# task_status=task_status,
) )
# yield back after client connect with filled shm # yield back after client connect with filled shm
task_status.started((hist_shm, rt_shm)) task_status.started((
hist_zero_index,
hist_shm,
rt_zero_index,
rt_shm,
))
# indicate to caller that feed can be delivered to # indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history # remote requesting client since we've loaded history
@ -685,6 +759,29 @@ async def manage_history(
# for viewing. # for viewing.
await trio.sleep_forever() await trio.sleep_forever()
# load less history if no tsdb can be found
elif (
not tsdb_is_up
and opened
):
await basic_backfill(
bus,
mod,
bfqsn,
shms={
1: rt_shm,
60: hist_shm,
},
)
task_status.started((
hist_zero_index,
hist_shm,
rt_zero_index,
rt_shm,
))
some_data_ready.set()
await trio.sleep_forever()
async def allocate_persistent_feed( async def allocate_persistent_feed(
bus: _FeedsBus, bus: _FeedsBus,
@ -750,7 +847,12 @@ async def allocate_persistent_feed(
# https://github.com/python-trio/trio/issues/2258 # https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon( # bus.nursery.start_soon(
# await bus.start_task( # await bus.start_task(
hist_shm, rt_shm = await bus.nursery.start( (
izero_hist,
hist_shm,
izero_rt,
rt_shm,
) = await bus.nursery.start(
manage_history, manage_history,
mod, mod,
bus, bus,
@ -764,7 +866,9 @@ async def allocate_persistent_feed(
# this task. # this task.
msg = init_msg[symbol] msg = init_msg[symbol]
msg['hist_shm_token'] = hist_shm.token msg['hist_shm_token'] = hist_shm.token
msg['startup_hist_index'] = hist_shm.index - 1 # msg['startup_hist_index'] = hist_shm.index - 1
msg['izero_hist'] = izero_hist
msg['izero_rt'] = izero_rt
msg['rt_shm_token'] = rt_shm.token msg['rt_shm_token'] = rt_shm.token
# true fqsn # true fqsn
@ -794,31 +898,19 @@ async def allocate_persistent_feed(
fqsn: first_quote, fqsn: first_quote,
} }
# for ambiguous names we simply apply the retreived
# feed to that name (for now).
bus.feeds[symbol] = bus.feeds[bfqsn] = ( bus.feeds[symbol] = bus.feeds[bfqsn] = (
init_msg, init_msg,
generic_first_quotes, generic_first_quotes,
) )
# for ambiguous names we simply apply the retreived
# feed to that name (for now).
# insert 1s ohlc into the increment buffer set
# to update and shift every second
sampler.ohlcv_shms.setdefault( sampler.ohlcv_shms.setdefault(
1, 1,
[] []
).append(rt_shm) ).append(rt_shm)
ohlckeys = ['open', 'high', 'low', 'close']
# set the rt (hft) shm array as append only
# (for now).
rt_shm._first.value = 0
rt_shm._last.value = 0
# push last sample from history to rt buffer just as a filler datum
# but we don't want a history sized datum outlier so set vlm to zero
# and ohlc to the close value.
rt_shm.push(hist_shm.array[-2:-1])
rt_shm.array[ohlckeys] = hist_shm.array['close'][-1]
rt_shm._array['volume'] = 0
task_status.started() task_status.started()
@ -829,16 +921,12 @@ async def allocate_persistent_feed(
# the backend will indicate when real-time quotes have begun. # the backend will indicate when real-time quotes have begun.
await feed_is_live.wait() await feed_is_live.wait()
# start shm incrementer task for OHLC style sampling # insert 1m ohlc into the increment buffer set
# at the current detected step period. # to shift every 60s.
times = hist_shm.array['time'] sampler.ohlcv_shms.setdefault(60, []).append(hist_shm)
delay_s = times[-1] - times[times != times[-1]][-1]
sampler.ohlcv_shms.setdefault(delay_s, []).append(hist_shm)
# create buffer a single incrementer task broker backend # create buffer a single incrementer task broker backend
# (aka `brokerd`) using the lowest sampler period. # (aka `brokerd`) using the lowest sampler period.
# await tractor.breakpoint()
# for delay_s in sampler.ohlcv_shms:
if sampler.incrementers.get(_default_delay_s) is None: if sampler.incrementers.get(_default_delay_s) is None:
await bus.start_task( await bus.start_task(
increment_ohlc_buffer, increment_ohlc_buffer,
@ -849,7 +937,8 @@ async def allocate_persistent_feed(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
# start sample loop # start sample loop and shm incrementer task for OHLC style sampling
# at the above registered step periods.
try: try:
await sample_and_broadcast( await sample_and_broadcast(
bus, bus,
@ -1037,7 +1126,8 @@ class Feed:
stream: trio.abc.ReceiveChannel[dict[str, Any]] stream: trio.abc.ReceiveChannel[dict[str, Any]]
status: dict[str, Any] status: dict[str, Any]
startup_hist_index: int = 0 izero_hist: int = 0
izero_rt: int = 0
throttle_rate: Optional[int] = None throttle_rate: Optional[int] = None
@ -1055,7 +1145,7 @@ class Feed:
async def receive(self) -> dict: async def receive(self) -> dict:
return await self.stream.receive() return await self.stream.receive()
@asynccontextmanager @acm
async def index_stream( async def index_stream(
self, self,
delay_s: int = 1, delay_s: int = 1,
@ -1116,7 +1206,7 @@ class Feed:
) )
@asynccontextmanager @acm
async def install_brokerd_search( async def install_brokerd_search(
portal: tractor.Portal, portal: tractor.Portal,
@ -1150,7 +1240,7 @@ async def install_brokerd_search(
yield yield
@asynccontextmanager @acm
async def open_feed( async def open_feed(
fqsns: list[str], fqsns: list[str],
@ -1226,7 +1316,8 @@ async def open_feed(
stream=stream, stream=stream,
_portal=portal, _portal=portal,
status={}, status={},
startup_hist_index=init['startup_hist_index'], izero_hist=init['izero_hist'],
izero_rt=init['izero_rt'],
throttle_rate=tick_throttle, throttle_rate=tick_throttle,
) )
@ -1278,7 +1369,7 @@ async def open_feed(
await ctx.cancel() await ctx.cancel()
@asynccontextmanager @acm
async def maybe_open_feed( async def maybe_open_feed(
fqsns: list[str], fqsns: list[str],