diff --git a/piker/data/feed.py b/piker/data/feed.py index b00cf70e..88bf8810 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -35,6 +35,7 @@ from typing import ( import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus +import trimeter import tractor from pydantic import BaseModel import pendulum @@ -263,32 +264,66 @@ async def start_backfill( # let caller unblock and deliver latest history frame task_status.started((shm, start_dt, end_dt, bf_done)) + times = array['time'] + step_size_s = ( + pendulum.from_timestamp(times[-1]) - + pendulum.from_timestamp(times[-2]) + ).seconds + frame_step_s = (end_dt - start_dt).seconds + if last_tsdb_dt is None: # maybe a better default (they don't seem to define epoch?!) - last_tsdb_dt = pendulum.now().subtract(days=1) + + # based on the sample step size load a certain amount + # history + if step_size_s == 1: + last_tsdb_dt = pendulum.now().subtract(weeks=2) + + elif step_size_s == 60: + last_tsdb_dt = pendulum.now().subtract(years=2) + + else: + raise ValueError( + '`piker` only needs to support 1m and 1s sampling ' + 'but ur api is trying to deliver a longer ' + f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' + 'do dat bruh.' + ) + + hist_period = pendulum.period( + start_dt.subtract(seconds=step_size_s), + last_tsdb_dt, + ) + end_dts = list(hist_period.range('seconds', frame_step_s)) # pull new history frames until we hit latest # already in the tsdb or a max count. - # mx_fills = 16 count = 0 - # while True: - while ( - end_dt > last_tsdb_dt - # and count < mx_fills - ): + frames = {} + + async def get_ohlc_frame( + input_end_dt: datetime, + + ) -> np.ndarray: + + nonlocal count count += 1 try: - array, start_dt, end_dt = await hist(end_dt=start_dt) + array, start_dt, end_dt = await hist(end_dt=input_end_dt) + # if input_end_dt.timestamp() == end_dts[0].timestamp(): + # await tractor.breakpoint() except NoData: # decrement by the diff in time last delivered. end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds) - continue + log.warning('no data for range {(end_dt - start_dt)} ?!?') + # continue except DataUnavailable: # broker is being a bish and we can't pull # any more.. - break + log.warning('backend halted on data deliver !?!?') + # break to_push = diff_history( array, @@ -302,18 +337,74 @@ async def start_backfill( print(f"PULLING {count}") log.info(f'Pushing {to_push.size} to shm!') - if to_push.size < 1: - break + frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) - # bail on shm allocation overrun - try: - shm.push(to_push, prepend=True) - except ValueError: - await tractor.breakpoint() - break + return to_push, start_dt, end_dt - for delay_s in sampler.subscribers: - await broadcast(delay_s) + # if to_push.size < 1: + # print('UHHH SIZE <1 BREAKING!?') + # break + + rate = erlangs = 5 + async with trimeter.amap( + + get_ohlc_frame, + end_dts, + + capture_outcome=True, + include_value=True, + max_at_once=erlangs, + max_per_second=rate, + + ) as outcomes: + + # Then iterate over the return values, as they become available + # (i.e., not necessarily in the original order) + async for input_end_dt, outcome in outcomes: + try: + out = outcome.unwrap() + except Exception: + log.exception('uhh trimeter bail') + raise + else: + to_push, start_dt, end_dt = out + + # pipeline-style pull frames until we need to wait for + # the next in order to arrive. + i = end_dts.index(input_end_dt) + print(f'latest end_dt {end_dt} found at index {i}') + + for epoch in reversed(sorted(frames)): + start = shm.array['time'][0] + + # we don't yet have the next frame to push + # so break back to the async request loop. + diff = epoch - start + if abs(diff) > step_size_s: + if len(frames) > 20: + log.warning( + f'there appears to be a history gap of {diff}?' + ) + # from pprint import pprint + # await tractor.breakpoint() + else: + break + + to_push, start_dt, end_dt = frames.pop(epoch) + print(f'pushing frame ending at {end_dt}') + + if not len(to_push): + break + + # bail on shm allocation overrun + try: + shm.push(to_push, prepend=True) + except ValueError: + await tractor.breakpoint() + break + + for delay_s in sampler.subscribers: + await broadcast(delay_s) bf_done.set() # update start index to include all tsdb history