From 324dcbbfb00fa61dad7a48bcb3432c918d89ff4c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 3 May 2022 16:22:01 -0400 Subject: [PATCH] Always write newly pulled frames to tsdb --- piker/data/feed.py | 44 ++++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 8c88f04b..a95b87b4 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -31,6 +31,7 @@ from typing import ( AsyncIterator, Optional, Generator, Awaitable, + TYPE_CHECKING, ) import trio @@ -74,6 +75,8 @@ from ..brokers._util import ( DataUnavailable, ) +if TYPE_CHECKING: + from .marketstore import Storage log = get_logger(__name__) @@ -234,6 +237,7 @@ async def start_backfill( shm: ShmArray, last_tsdb_dt: Optional[datetime] = None, + storage: Optional[Storage] = None, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -520,6 +524,7 @@ async def start_backfill( break to_push, start_dt, end_dt = frames.pop(epoch) + ln = len(to_push) # bail gracefully on shm allocation overrun/full condition try: @@ -528,19 +533,27 @@ async def start_backfill( log.info( f'Shm buffer overrun on: {start_dt} -> {end_dt}?' ) - # await tractor.breakpoint() break log.info( - f'Shm pushed {len(to_push)} frame:\n' + f'Shm pushed {ln} frame:\n' f'{start_dt} -> {end_dt}' ) - # keep track of most recent "prepended" ``start_dt`` # both for detecting gaps and ensuring async # frame-result order. earliest_end_dt = start_dt + if storage is not None: + log.info( + f'Writing {ln} frame to storage:\n' + f'{start_dt} -> {end_dt}' + ) + await storage.write_ohlcv( + f'{bfqsn}.{mod.name}', # lul.. + to_push, + ) + # TODO: can we only trigger this if the respective # history in "in view"?!? # XXX: extremely important, there can be no checkpoints @@ -609,7 +622,7 @@ async def manage_history( # shm backfiller approach below. # start history anal and load missing new data via backend. - series, first_dt, last_dt = await storage.load(fqsn) + series, _, last_tsdb_dt = await storage.load(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn) ( @@ -623,7 +636,8 @@ async def manage_history( mod, bfqsn, shm, - last_tsdb_dt=last_dt, + last_tsdb_dt=last_tsdb_dt, + storage=storage, ) ) @@ -644,8 +658,10 @@ async def manage_history( # do diff against last start frame of history and only fill # in from the tsdb an allotment that allows for most recent # to be loaded into mem *before* tsdb data. - if last_dt: - dt_diff_s = (latest_start_dt - last_dt).seconds + if last_tsdb_dt: + dt_diff_s = ( + latest_start_dt - last_tsdb_dt + ).seconds else: dt_diff_s = 0 @@ -674,7 +690,7 @@ async def manage_history( field_map=marketstore.ohlc_key_map, ) - # load as much from storage into shm as spacec will + # load as much from storage into shm as space will # allow according to user's shm size settings. count = 0 end = fastest['Epoch'][0] @@ -699,15 +715,11 @@ async def manage_history( prepend=True, # update_first=False, # start=prepend_start, - field_map={ - 'Epoch': 'time', - 'Open': 'open', - 'High': 'high', - 'Low': 'low', - 'Close': 'close', - 'Volume': 'volume', - }, + field_map=marketstore.ohlc_key_map, ) + + # manually trigger step update to update charts/fsps + # which need an incremental update. for delay_s in sampler.subscribers: await broadcast(delay_s)