From b44786e5b7263bc906a450ae38021e89f0fab1bf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 3 May 2022 13:19:49 -0400 Subject: [PATCH] Support async-batched ohlc queries in all backends Expect each backend to deliver a `config: dict[str, Any]` which provides concurrency controls to `trimeter`'s batch task scheduler such that backends can define their own concurrency limits. The dirty deats in this patch include handling history "gaps" where a query returns a history-frame-result which spans more then the typical frame size (in seconds). In such cases we reset the target frame index (datetime index sequence implemented with a `pendulum.Period`) using a generator protocol `.send()` such that the sequence can be dynamically re-indexed starting at the new (possibly) pre-gap datetime. The new gap logic also allows us to detect out of order frames easier and thus wait for the next-in-order to arrive before making more requests. --- piker/brokers/binance.py | 2 +- piker/brokers/ib.py | 35 +++-- piker/brokers/kraken.py | 2 +- piker/data/feed.py | 271 +++++++++++++++++++++++++++++---------- 4 files changed, 232 insertions(+), 78 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 7678c173..5b6a3da6 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -402,7 +402,7 @@ async def open_history_client( end_dt = pendulum.from_timestamp(array[-1]['time']) return array, start_dt, end_dt - yield get_ohlc + yield get_ohlc, {'erlangs': 4, 'rate': 4} async def backfill_bars( diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index ad752cce..20865e30 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -57,6 +57,8 @@ from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client from fuzzywuzzy import process as fuzzy import numpy as np +import pendulum + from .. import config from ..log import get_logger, get_console_log @@ -1442,8 +1444,6 @@ async def get_bars( a ``MethoProxy``. ''' - import pendulum - fails = 0 bars: Optional[list] = None first_dt: datetime = None @@ -1471,7 +1471,9 @@ async def get_bars( time = bars_array['time'] assert time[-1] == last_dt.timestamp() assert time[0] == first_dt.timestamp() - log.info(f'bars retreived for dts {first_dt}:{last_dt}') + log.info( + f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' + ) return (bars, bars_array, first_dt, last_dt), fails @@ -1485,20 +1487,27 @@ async def get_bars( raise NoData( f'Symbol: {fqsn}', ) - break elif ( err.code == 162 and 'HMDS query returned no data' in err.message ): - # try to decrement start point and look further back - end_dt = last_dt = last_dt.subtract(seconds=2000) + # XXX: this is now done in the storage mgmt layer + # and we shouldn't implicitly decrement the frame dt + # index since the upper layer may be doing so + # concurrently and we don't want to be delivering frames + # that weren't asked for. log.warning( - f'No data found ending @ {end_dt}\n' - f'Starting another request for {end_dt}' + f'NO DATA found ending @ {end_dt}\n' ) - continue + # try to decrement start point and look further back + # end_dt = last_dt = last_dt.subtract(seconds=2000) + + raise NoData( + f'Symbol: {fqsn}', + frame_size=2000, + ) elif _pacing in msg: @@ -1578,7 +1587,12 @@ async def open_history_client( return bars_array, first_dt, last_dt - yield get_hist + # TODO: it seems like we can do async queries for ohlc + # but getting the order right still isn't working and I'm not + # quite sure why.. needs some tinkering and probably + # a lookthrough of the ``ib_insync`` machinery, for eg. maybe + # we have to do the batch queries on the `asyncio` side? + yield get_hist, {'erlangs': 1, 'rate': 6} async def backfill_bars( @@ -1840,6 +1854,7 @@ async def stream_quotes( symbol=sym, ) first_quote = normalize(first_ticker) + # print(f'first quote: {first_quote}') def mk_init_msgs() -> dict[str, dict]: # pass back some symbol info like min_tick, trading_hours, etc. diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 0f5e2f2a..30e57b9e 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -1066,7 +1066,7 @@ async def open_history_client( end_dt = pendulum.from_timestamp(array[-1]['time']) return array, start_dt, end_dt - yield get_ohlc + yield get_ohlc, {'erlangs': 1, 'rate': 1} async def backfill_bars( diff --git a/piker/data/feed.py b/piker/data/feed.py index eb57e71b..272add05 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -29,6 +29,7 @@ from types import ModuleType from typing import ( Any, AsyncIterator, Optional, + Generator, Awaitable, ) @@ -241,7 +242,7 @@ async def start_backfill( ) -> int: - async with mod.open_history_client(bfqsn) as hist: + async with mod.open_history_client(bfqsn) as (hist, config): # get latest query's worth of history all the way # back to what is recorded in the tsdb @@ -260,7 +261,9 @@ async def start_backfill( for delay_s in sampler.subscribers: await broadcast(delay_s) + # signal that backfilling to tsdb's end datum is complete bf_done = trio.Event() + # let caller unblock and deliver latest history frame task_status.started((shm, start_dt, end_dt, bf_done)) @@ -269,7 +272,7 @@ async def start_backfill( pendulum.from_timestamp(times[-1]) - pendulum.from_timestamp(times[-2]) ).seconds - frame_step_s = (end_dt - start_dt).seconds + frame_size_s = len(to_push) * step_size_s if last_tsdb_dt is None: # maybe a better default (they don't seem to define epoch?!) @@ -277,7 +280,7 @@ async def start_backfill( # based on the sample step size load a certain amount # history if step_size_s == 1: - last_tsdb_dt = pendulum.now().subtract(weeks=2) + last_tsdb_dt = pendulum.now().subtract(days=6) elif step_size_s == 60: last_tsdb_dt = pendulum.now().subtract(years=2) @@ -290,69 +293,159 @@ async def start_backfill( 'do dat bruh.' ) - hist_period = pendulum.period( - start_dt.subtract(seconds=step_size_s), - last_tsdb_dt, - ) - end_dts = list(hist_period.range('seconds', frame_step_s)) + # configure async query throttling + erlangs = config.get('erlangs', 1) + rate = config.get('rate', 1) + frames = {} + + def iter_dts(start: datetime): + while True: + + hist_period = pendulum.period( + start.subtract(seconds=step_size_s), + last_tsdb_dt, + ) + dtrange = hist_period.range('seconds', frame_size_s) + + for end_dt in dtrange: + log.warning(f'Yielding next frame start {end_dt}') + start = yield end_dt + + # if caller sends a new start date, reset to that + if start is not None: + log.warning(f'Resetting date range: {start}') + # import pdbpp + # pdbpp.set_trace() + break + else: + # from while + return # pull new history frames until we hit latest # already in the tsdb or a max count. count = 0 - frames = {} + + # NOTE: when gaps are detected in the retreived history (by + # comparisor of the end - start versus the expected "frame size" + # in seconds) we need a way to alert the async request code not + # to continue to query for data "within the gap". This var is + # set in such cases such that further requests in that period + # are discarded and further we reset the "datetimem query frame + # index" in such cases to avoid needless noop requests. + earliest_end_dt: Optional[datetime] = start_dt async def get_ohlc_frame( input_end_dt: datetime, + iter_dts_gen: Generator[datetime], ) -> np.ndarray: - nonlocal count + nonlocal count, frames, earliest_end_dt, frame_size_s count += 1 + + if input_end_dt > earliest_end_dt: + # if a request comes in for an inter-gap frame we + # discard it since likely this request is still + # lingering from before the reset of ``iter_dts()`` via + # ``.send()`` below. + log.info(f'Discarding request history ending @ {input_end_dt}') + + # signals to ``trimeter`` loop to discard and + # ``continue`` in it's schedule loop. + return None + try: + log.info( + f'Requesting {step_size_s}s frame ending in {input_end_dt}' + ) array, start_dt, end_dt = await hist(end_dt=input_end_dt) - # if input_end_dt.timestamp() == end_dts[0].timestamp(): - # await tractor.breakpoint() + assert array['time'][0] == start_dt.timestamp() except NoData: - # decrement by the diff in time last delivered. - end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds) - log.warning('no data for range {(end_dt - start_dt)} ?!?') - # continue + log.warning( + f'NO DATA for {frame_size_s}s frame @ {end_dt} ?!?' + ) + return None # discard signal - except DataUnavailable: + except DataUnavailable as duerr: # broker is being a bish and we can't pull # any more.. log.warning('backend halted on data deliver !?!?') - return input_end_dt, None + + # ugh, what's a better way? + # TODO: fwiw, we probably want a way to signal a throttle + # condition (eg. with ib) so that we can halt the + # request loop until the condition is resolved? + return duerr + + diff = end_dt - start_dt + frame_time_diff_s = diff.seconds + expected_frame_size_s = frame_size_s + step_size_s + + if frame_time_diff_s > expected_frame_size_s: + + # XXX: query result includes a start point prior to our + # expected "frame size" and thus is likely some kind of + # history gap (eg. market closed period, outage, etc.) + # so indicate to the request loop that this gap is + # expected by both, + # - resetting the ``iter_dts()`` generator to start at + # the new start point delivered in this result + # - setting the non-locally scoped ``earliest_end_dt`` + # to this new value so that the request loop doesn't + # get tripped up thinking there's an out of order + # request-result condition. + + log.warning( + f'History frame ending @ {end_dt} appears to have a gap:\n' + f'{diff} ~= {frame_time_diff_s} seconds' + ) + + # reset dtrange gen to new start point + next_end = iter_dts_gen.send(start_dt) + log.info( + f'Reset frame index to start at {start_dt}\n' + f'Was at {next_end}' + ) + + # TODO: can we avoid this? + earliest_end_dt = start_dt to_push = diff_history( array, start_dt, end_dt, - last_tsdb_dt=last_tsdb_dt, - # XXX: hacky, just run indefinitely - # last_tsdb_dt=None, ) - print(f"PULLING {count}") - log.info(f'Pushing {to_push.size} to shm!') + ln = len(to_push) + if ln: + log.info(f'{ln} bars for {start_dt} -> {end_dt}') + frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) + return to_push, start_dt, end_dt - frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) + else: + log.warning( + f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' + ) + return None - return to_push, start_dt, end_dt + # initial dt index starts at the start of the first query result + idts = iter_dts(start_dt) - # if to_push.size < 1: - # print('UHHH SIZE <1 BREAKING!?') - # break - - rate = erlangs = 5 async with trimeter.amap( - - get_ohlc_frame, - end_dts, + partial( + get_ohlc_frame, + # we close in the ``iter_dt()`` gen in so we can send + # reset signals as needed for gap dection in the + # history. + iter_dts_gen=idts, + ), + idts, capture_outcome=True, include_value=True, + + # better technical names bruv... max_at_once=erlangs, max_per_second=rate, @@ -362,59 +455,101 @@ async def start_backfill( # (i.e., not necessarily in the original order) async for input_end_dt, outcome in outcomes: - # no data available case.. - if outcome is None: - break - try: out = outcome.unwrap() + + if out is None: + # skip signal + continue + + elif isinstance(out, DataUnavailable): + # no data available case signal.. so just kill + # further requests and basically just stop + # trying... + break + except Exception: log.exception('uhh trimeter bail') raise else: to_push, start_dt, end_dt = out + if not len(to_push): + # diff returned no new data (i.e. we probablyl hit + # the ``last_tsdb_dt`` point). + # TODO: raise instead? + log.warning(f'No history for range {start_dt} -> {end_dt}') + continue + # pipeline-style pull frames until we need to wait for # the next in order to arrive. - i = end_dts.index(input_end_dt) - print(f'latest end_dt {end_dt} found at index {i}') + # i = end_dts.index(input_end_dt) + # print(f'latest end_dt {end_dt} found at index {i}') - for epoch in reversed(sorted(frames)): + epochs = list(reversed(sorted(frames))) + for epoch in epochs: start = shm.array['time'][0] - # we don't yet have the next frame to push - # so break back to the async request loop. diff = epoch - start if abs(diff) > step_size_s: - if len(frames) > 20: + + if earliest_end_dt < end_dt: + # XXX: an expected gap was encountered (see + # logic in ``get_ohlc_frame()``, so allow + # this frame through to the storage layer. log.warning( - f'there appears to be a history gap of {diff}?' + f'there is an expected history gap of {diff}s:' + ) + + elif ( + erlangs > 1 + and len(epochs) < erlangs + ): + # we don't yet have the next frame to push + # so break back to the async request loop + # while we wait for more async frame-results + # to arrive. + expect_end = pendulum.from_timestamp(start) + expect_start = expect_end.subtract( + seconds=frame_size_s) + log.warning( + 'waiting on out-of-order history frame:\n' + f'{expect_end - expect_start}' ) - # from pprint import pprint - # await tractor.breakpoint() - else: break to_push, start_dt, end_dt = frames.pop(epoch) - print(f'pushing frame ending at {end_dt}') - if not len(to_push): - break - - # bail on shm allocation overrun + # bail gracefully on shm allocation overrun/full condition try: shm.push(to_push, prepend=True) except ValueError: - await tractor.breakpoint() + log.info( + f'Shm buffer overrun on: {start_dt} -> {end_dt}?' + ) + # await tractor.breakpoint() break - for delay_s in sampler.subscribers: - await broadcast(delay_s) + log.info( + f'Shm pushed {len(to_push)} frame:\n' + f'{start_dt} -> {end_dt}' + ) + + # keep track of most recent "prepended" ``start_dt`` + # both for detecting gaps and ensuring async + # frame-result order. + earliest_end_dt = start_dt + + # TODO: can we only trigger this if the respective + # history in "in view"?!? + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + for delay_s in sampler.subscribers: + await broadcast(delay_s) bf_done.set() - # update start index to include all tsdb history - # that was pushed in the caller parent task. - # shm._first.value = 0 async def manage_history( @@ -490,6 +625,17 @@ async def manage_history( last_tsdb_dt=last_dt, ) ) + + # if len(shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. + task_status.started(shm) some_data_ready.set() @@ -524,14 +670,7 @@ async def manage_history( prepend=True, # update_first=False, # start=prepend_start, - field_map={ - 'Epoch': 'time', - 'Open': 'open', - 'High': 'high', - 'Low': 'low', - 'Close': 'close', - 'Volume': 'volume', - }, + field_map=marketstore.ohlc_key_map, ) # load as much from storage into shm as spacec will