Always write newly pulled frames to tsdb

incr_update_backup
Tyler Goodlet 2022-05-03 16:22:01 -04:00
parent 112cba43e5
commit 324dcbbfb0
1 changed files with 28 additions and 16 deletions

View File

@ -31,6 +31,7 @@ from typing import (
AsyncIterator, Optional, AsyncIterator, Optional,
Generator, Generator,
Awaitable, Awaitable,
TYPE_CHECKING,
) )
import trio import trio
@ -74,6 +75,8 @@ from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
if TYPE_CHECKING:
from .marketstore import Storage
log = get_logger(__name__) log = get_logger(__name__)
@ -234,6 +237,7 @@ async def start_backfill(
shm: ShmArray, shm: ShmArray,
last_tsdb_dt: Optional[datetime] = None, last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -520,6 +524,7 @@ async def start_backfill(
break break
to_push, start_dt, end_dt = frames.pop(epoch) to_push, start_dt, end_dt = frames.pop(epoch)
ln = len(to_push)
# bail gracefully on shm allocation overrun/full condition # bail gracefully on shm allocation overrun/full condition
try: try:
@ -528,19 +533,27 @@ async def start_backfill(
log.info( log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?' f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
) )
# await tractor.breakpoint()
break break
log.info( log.info(
f'Shm pushed {len(to_push)} frame:\n' f'Shm pushed {ln} frame:\n'
f'{start_dt} -> {end_dt}' f'{start_dt} -> {end_dt}'
) )
# keep track of most recent "prepended" ``start_dt`` # keep track of most recent "prepended" ``start_dt``
# both for detecting gaps and ensuring async # both for detecting gaps and ensuring async
# frame-result order. # frame-result order.
earliest_end_dt = start_dt earliest_end_dt = start_dt
if storage is not None:
log.info(
f'Writing {ln} frame to storage:\n'
f'{start_dt} -> {end_dt}'
)
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
to_push,
)
# TODO: can we only trigger this if the respective # TODO: can we only trigger this if the respective
# history in "in view"?!? # history in "in view"?!?
# XXX: extremely important, there can be no checkpoints # XXX: extremely important, there can be no checkpoints
@ -609,7 +622,7 @@ async def manage_history(
# shm backfiller approach below. # shm backfiller approach below.
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
series, first_dt, last_dt = await storage.load(fqsn) series, _, last_tsdb_dt = await storage.load(fqsn)
broker, symbol, expiry = unpack_fqsn(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn)
( (
@ -623,7 +636,8 @@ async def manage_history(
mod, mod,
bfqsn, bfqsn,
shm, shm,
last_tsdb_dt=last_dt, last_tsdb_dt=last_tsdb_dt,
storage=storage,
) )
) )
@ -644,8 +658,10 @@ async def manage_history(
# do diff against last start frame of history and only fill # do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent # in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data. # to be loaded into mem *before* tsdb data.
if last_dt: if last_tsdb_dt:
dt_diff_s = (latest_start_dt - last_dt).seconds dt_diff_s = (
latest_start_dt - last_tsdb_dt
).seconds
else: else:
dt_diff_s = 0 dt_diff_s = 0
@ -674,7 +690,7 @@ async def manage_history(
field_map=marketstore.ohlc_key_map, field_map=marketstore.ohlc_key_map,
) )
# load as much from storage into shm as spacec will # load as much from storage into shm as space will
# allow according to user's shm size settings. # allow according to user's shm size settings.
count = 0 count = 0
end = fastest['Epoch'][0] end = fastest['Epoch'][0]
@ -699,15 +715,11 @@ async def manage_history(
prepend=True, prepend=True,
# update_first=False, # update_first=False,
# start=prepend_start, # start=prepend_start,
field_map={ field_map=marketstore.ohlc_key_map,
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
) )
# manually trigger step update to update charts/fsps
# which need an incremental update.
for delay_s in sampler.subscribers: for delay_s in sampler.subscribers:
await broadcast(delay_s) await broadcast(delay_s)