diff --git a/piker/data/feed.py b/piker/data/feed.py index 2400f39d..c7041135 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -21,18 +21,19 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations -from dataclasses import dataclass, field -from datetime import datetime from contextlib import asynccontextmanager +from dataclasses import ( + dataclass, + field, +) +from datetime import datetime from functools import partial -from pprint import pformat from types import ModuleType from typing import ( Any, AsyncIterator, Callable, Optional, - Generator, Awaitable, TYPE_CHECKING, Union, @@ -41,7 +42,6 @@ from typing import ( import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus -import trimeter import tractor from tractor.trionics import maybe_open_context import pendulum @@ -300,6 +300,9 @@ async def start_backfill( log.info(f'Pushing {to_push.size} to shm!') shm.push(to_push) + # TODO: *** THIS IS A BUG *** + # we need to only broadcast to subscribers for this fqsn.. + # otherwise all fsps get reset on every chart.. for delay_s in sampler.subscribers: await broadcast(delay_s) @@ -337,79 +340,31 @@ async def start_backfill( last_tsdb_dt = start_dt.subtract(**kwargs) # configure async query throttling - erlangs = config.get('erlangs', 1) - rate = config.get('rate', 1) - frames = {} + # rate = config.get('rate', 1) + # XXX: legacy from ``trimeter`` code but unsupported now. + # erlangs = config.get('erlangs', 1) - def iter_dts(start: datetime): + # inline sequential loop where we simply pass the + # last retrieved start dt to the next request as + # it's end dt. + starts: set[datetime] = set() - while True: - - hist_period = pendulum.period( - start, - last_tsdb_dt, - ) - dtrange = list(hist_period.range('seconds', frame_size_s)) - log.debug(f'New datetime index:\n{pformat(dtrange)}') - - for end_dt in dtrange: - log.info(f'Yielding next frame start {end_dt}') - start = yield end_dt - - # if caller sends a new start date, reset to that - if start is not None: - log.warning(f'Resetting date range: {start}') - break - else: - # from while - return - - # pull new history frames until we hit latest - # already in the tsdb or a max count. - count = 0 - - # NOTE: when gaps are detected in the retreived history (by - # comparisor of the end - start versus the expected "frame size" - # in seconds) we need a way to alert the async request code not - # to continue to query for data "within the gap". This var is - # set in such cases such that further requests in that period - # are discarded and further we reset the "datetimem query frame - # index" in such cases to avoid needless noop requests. - earliest_end_dt: Optional[datetime] = start_dt - - async def get_ohlc_frame( - input_end_dt: datetime, - iter_dts_gen: Generator[datetime], - - ) -> np.ndarray: - - nonlocal count, frames, earliest_end_dt, frame_size_s - count += 1 - - if input_end_dt > earliest_end_dt: - # if a request comes in for an inter-gap frame we - # discard it since likely this request is still - # lingering from before the reset of ``iter_dts()`` via - # ``.send()`` below. - log.info(f'Discarding request history ending @ {input_end_dt}') - - # signals to ``trimeter`` loop to discard and - # ``continue`` in it's schedule loop. - return None + while start_dt > last_tsdb_dt: + print(f"QUERY end_dt={start_dt}") try: log.info( - f'Requesting {step_size_s}s frame ending in {input_end_dt}' + f'Requesting {step_size_s}s frame ending in {start_dt}' ) array, start_dt, end_dt = await hist( timeframe, - end_dt=input_end_dt, + end_dt=start_dt, ) assert array['time'][0] == start_dt.timestamp() except NoData: log.warning( - f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?' + f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?' ) return None # discard signal @@ -433,41 +388,27 @@ async def start_backfill( # XXX: query result includes a start point prior to our # expected "frame size" and thus is likely some kind of # history gap (eg. market closed period, outage, etc.) - # so indicate to the request loop that this gap is - # expected by both, - # - resetting the ``iter_dts()`` generator to start at - # the new start point delivered in this result - # - setting the non-locally scoped ``earliest_end_dt`` - # to this new value so that the request loop doesn't - # get tripped up thinking there's an out of order - # request-result condition. - + # so just report it to console for now. log.warning( f'History frame ending @ {end_dt} appears to have a gap:\n' f'{diff} ~= {frame_time_diff_s} seconds' ) - # reset dtrange gen to new start point - try: - next_end = iter_dts_gen.send(start_dt) - log.info( - f'Reset frame index to start at {start_dt}\n' - f'Was at {next_end}' - ) + array, _start_dt, end_dt = await hist( + timeframe, + end_dt=start_dt, + ) - # NOTE: manually set "earliest end datetime" index-value - # to avoid the request loop getting confused about - # new frames that are earlier in history - i.e. this - # **is not** the case of out-of-order frames from - # an async batch request. - earliest_end_dt = start_dt + if ( + _start_dt in starts + ): + print("SKIPPING DUPLICATE FRAME @ {_start_dt}") + start_dt = min(starts) + continue - except StopIteration: - # gen already terminated meaning we probably already - # exhausted it via frame requests. - log.info( - "Datetime index already exhausted, can't reset.." - ) + # only update new start point if new + start_dt = _start_dt + starts.add(start_dt) to_push = diff_history( array, @@ -478,195 +419,53 @@ async def start_backfill( ln = len(to_push) if ln: log.info(f'{ln} bars for {start_dt} -> {end_dt}') - frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) - return to_push, start_dt, end_dt else: log.warning( f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' ) - return None - # initial dt index starts at the start of the first query result - idts = iter_dts(start_dt) + # bail gracefully on shm allocation overrun/full condition + try: + shm.push(to_push, prepend=True) + except ValueError: + log.info( + f'Shm buffer overrun on: {start_dt} -> {end_dt}?' + ) + break - async with trimeter.amap( - partial( - get_ohlc_frame, - # we close in the ``iter_dt()`` gen in so we can send - # reset signals as needed for gap dection in the - # history. - iter_dts_gen=idts, - ), - idts, + log.info( + f'Shm pushed {ln} frame:\n' + f'{start_dt} -> {end_dt}' + ) - capture_outcome=True, - include_value=True, + if ( + storage is not None + and write_tsdb + ): + log.info( + f'Writing {ln} frame to storage:\n' + f'{start_dt} -> {end_dt}' + ) + await storage.write_ohlcv( + f'{bfqsn}.{mod.name}', # lul.. + to_push, + timeframe, + ) - # better technical names bruv... - max_at_once=erlangs, - max_per_second=rate, + # TODO: can we only trigger this if the respective + # history in "in view"?!? - ) as outcomes: - - # Then iterate over the return values, as they become available - # (i.e., not necessarily in the original order) - async for input_end_dt, outcome in outcomes: - - try: - out = outcome.unwrap() - - if out is None: - # skip signal - continue - - elif isinstance(out, DataUnavailable): - # no data available case signal.. so just kill - # further requests and basically just stop - # trying... - break - - except Exception: - log.exception('uhh trimeter bail') - raise - else: - to_push, start_dt, end_dt = out - - if not len(to_push): - # diff returned no new data (i.e. we probablyl hit - # the ``last_tsdb_dt`` point). - # TODO: raise instead? - log.warning(f'No history for range {start_dt} -> {end_dt}') - continue - - # pipeline-style pull frames until we need to wait for - # the next in order to arrive. - # i = end_dts.index(input_end_dt) - # print(f'latest end_dt {end_dt} found at index {i}') - - epochs = list(reversed(sorted(frames))) - for epoch in epochs: - - start = shm.array['time'][0] - last_shm_prepend_dt = pendulum.from_timestamp(start) - earliest_frame_queue_dt = pendulum.from_timestamp(epoch) - - diff = start - epoch - - if diff < 0: - log.warning( - 'Discarding out of order frame:\n' - f'{earliest_frame_queue_dt}' - ) - frames.pop(epoch) - continue - - if diff > step_size_s: - - if earliest_end_dt < earliest_frame_queue_dt: - # XXX: an expected gap was encountered (see - # logic in ``get_ohlc_frame()``, so allow - # this frame through to the storage layer. - log.warning( - f'Expected history gap of {diff}s:\n' - f'{earliest_frame_queue_dt} <- ' - f'{earliest_end_dt}' - ) - - elif ( - erlangs > 1 - ): - # we don't yet have the next frame to push - # so break back to the async request loop - # while we wait for more async frame-results - # to arrive. - if len(frames) >= erlangs: - log.warning( - 'Frame count in async-queue is greater ' - 'then erlangs?\n' - 'There seems to be a gap between:\n' - f'{earliest_frame_queue_dt} <- ' - f'{last_shm_prepend_dt}\n' - 'Conducting manual call for frame ending: ' - f'{last_shm_prepend_dt}' - ) - ( - to_push, - start_dt, - end_dt, - ) = await get_ohlc_frame( - input_end_dt=last_shm_prepend_dt, - iter_dts_gen=idts, - ) - last_epoch = to_push['time'][-1] - diff = start - last_epoch - - if diff > step_size_s: - await tractor.breakpoint() - raise DataUnavailable( - 'An awkward frame was found:\n' - f'{start_dt} -> {end_dt}:\n{to_push}' - ) - - else: - frames[last_epoch] = ( - to_push, start_dt, end_dt) - break - - expect_end = pendulum.from_timestamp(start) - expect_start = expect_end.subtract( - seconds=frame_size_s) - log.warning( - 'waiting on out-of-order history frame:\n' - f'{expect_end - expect_start}' - ) - break - - to_push, start_dt, end_dt = frames.pop(epoch) - ln = len(to_push) - - # bail gracefully on shm allocation overrun/full condition - try: - shm.push(to_push, prepend=True) - except ValueError: - log.info( - f'Shm buffer overrun on: {start_dt} -> {end_dt}?' - ) - break - - log.info( - f'Shm pushed {ln} frame:\n' - f'{start_dt} -> {end_dt}' - ) - # keep track of most recent "prepended" ``start_dt`` - # both for detecting gaps and ensuring async - # frame-result order. - earliest_end_dt = start_dt - - if ( - storage is not None - and write_tsdb - ): - log.info( - f'Writing {ln} frame to storage:\n' - f'{start_dt} -> {end_dt}' - ) - await storage.write_ohlcv( - f'{bfqsn}.{mod.name}', # lul.. - to_push, - timeframe, - ) - - # TODO: can we only trigger this if the respective - # history in "in view"?!? - # XXX: extremely important, there can be no checkpoints - # in the block above to avoid entering new ``frames`` - # values while we're pipelining the current ones to - # memory... - for delay_s in sampler.subscribers: - await broadcast(delay_s) + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + for delay_s in sampler.subscribers: + await broadcast(delay_s) + # short-circuit (for now) bf_done.set() + return async def manage_history( @@ -789,7 +588,6 @@ async def manage_history( else: dt_diff_s = 0 - # await trio.sleep_forever() # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field