Support async-batched ohlc queries in all backends
Expect each backend to deliver a `config: dict[str, Any]` which provides concurrency controls to `trimeter`'s batch task scheduler such that backends can define their own concurrency limits. The dirty deats in this patch include handling history "gaps" where a query returns a history-frame-result which spans more then the typical frame size (in seconds). In such cases we reset the target frame index (datetime index sequence implemented with a `pendulum.Period`) using a generator protocol `.send()` such that the sequence can be dynamically re-indexed starting at the new (possibly) pre-gap datetime. The new gap logic also allows us to detect out of order frames easier and thus wait for the next-in-order to arrive before making more requests.l1_precision_fix
							parent
							
								
									7e951f17ca
								
							
						
					
					
						commit
						b44786e5b7
					
				|  | @ -402,7 +402,7 @@ async def open_history_client( | |||
|             end_dt = pendulum.from_timestamp(array[-1]['time']) | ||||
|             return array, start_dt, end_dt | ||||
| 
 | ||||
|         yield get_ohlc | ||||
|         yield get_ohlc, {'erlangs': 4, 'rate': 4} | ||||
| 
 | ||||
| 
 | ||||
| async def backfill_bars( | ||||
|  |  | |||
|  | @ -57,6 +57,8 @@ from ib_insync.wrapper import Wrapper | |||
| from ib_insync.client import Client as ib_Client | ||||
| from fuzzywuzzy import process as fuzzy | ||||
| import numpy as np | ||||
| import pendulum | ||||
| 
 | ||||
| 
 | ||||
| from .. import config | ||||
| from ..log import get_logger, get_console_log | ||||
|  | @ -1442,8 +1444,6 @@ async def get_bars( | |||
|     a ``MethoProxy``. | ||||
| 
 | ||||
|     ''' | ||||
|     import pendulum | ||||
| 
 | ||||
|     fails = 0 | ||||
|     bars: Optional[list] = None | ||||
|     first_dt: datetime = None | ||||
|  | @ -1471,7 +1471,9 @@ async def get_bars( | |||
|             time = bars_array['time'] | ||||
|             assert time[-1] == last_dt.timestamp() | ||||
|             assert time[0] == first_dt.timestamp() | ||||
|             log.info(f'bars retreived for dts {first_dt}:{last_dt}') | ||||
|             log.info( | ||||
|                 f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' | ||||
|             ) | ||||
| 
 | ||||
|             return (bars, bars_array, first_dt, last_dt), fails | ||||
| 
 | ||||
|  | @ -1485,20 +1487,27 @@ async def get_bars( | |||
|                 raise NoData( | ||||
|                     f'Symbol: {fqsn}', | ||||
|                 ) | ||||
|                 break | ||||
| 
 | ||||
|             elif ( | ||||
|                 err.code == 162 | ||||
|                 and 'HMDS query returned no data' in err.message | ||||
|             ): | ||||
|                 # try to decrement start point and look further back | ||||
|                 end_dt = last_dt = last_dt.subtract(seconds=2000) | ||||
|                 # XXX: this is now done in the storage mgmt layer | ||||
|                 # and we shouldn't implicitly decrement the frame dt | ||||
|                 # index since the upper layer may be doing so | ||||
|                 # concurrently and we don't want to be delivering frames | ||||
|                 # that weren't asked for. | ||||
|                 log.warning( | ||||
|                     f'No data found ending @ {end_dt}\n' | ||||
|                     f'Starting another request for {end_dt}' | ||||
|                     f'NO DATA found ending @ {end_dt}\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 continue | ||||
|                 # try to decrement start point and look further back | ||||
|                 # end_dt = last_dt = last_dt.subtract(seconds=2000) | ||||
| 
 | ||||
|                 raise NoData( | ||||
|                     f'Symbol: {fqsn}', | ||||
|                     frame_size=2000, | ||||
|                 ) | ||||
| 
 | ||||
|             elif _pacing in msg: | ||||
| 
 | ||||
|  | @ -1578,7 +1587,12 @@ async def open_history_client( | |||
| 
 | ||||
|             return bars_array, first_dt, last_dt | ||||
| 
 | ||||
|         yield get_hist | ||||
|         # TODO: it seems like we can do async queries for ohlc | ||||
|         # but getting the order right still isn't working and I'm not | ||||
|         # quite sure why.. needs some tinkering and probably | ||||
|         # a lookthrough of the ``ib_insync`` machinery, for eg. maybe | ||||
|         # we have to do the batch queries on the `asyncio` side? | ||||
|         yield get_hist, {'erlangs': 1, 'rate': 6} | ||||
| 
 | ||||
| 
 | ||||
| async def backfill_bars( | ||||
|  | @ -1840,6 +1854,7 @@ async def stream_quotes( | |||
|         symbol=sym, | ||||
|     ) | ||||
|     first_quote = normalize(first_ticker) | ||||
|     # print(f'first quote: {first_quote}') | ||||
| 
 | ||||
|     def mk_init_msgs() -> dict[str, dict]: | ||||
|         # pass back some symbol info like min_tick, trading_hours, etc. | ||||
|  |  | |||
|  | @ -1066,7 +1066,7 @@ async def open_history_client( | |||
|             end_dt = pendulum.from_timestamp(array[-1]['time']) | ||||
|             return array, start_dt, end_dt | ||||
| 
 | ||||
|         yield get_ohlc | ||||
|         yield get_ohlc, {'erlangs': 1, 'rate': 1} | ||||
| 
 | ||||
| 
 | ||||
| async def backfill_bars( | ||||
|  |  | |||
|  | @ -29,6 +29,7 @@ from types import ModuleType | |||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncIterator, Optional, | ||||
|     Generator, | ||||
|     Awaitable, | ||||
| ) | ||||
| 
 | ||||
|  | @ -241,7 +242,7 @@ async def start_backfill( | |||
| 
 | ||||
| ) -> int: | ||||
| 
 | ||||
|     async with mod.open_history_client(bfqsn) as hist: | ||||
|     async with mod.open_history_client(bfqsn) as (hist, config): | ||||
| 
 | ||||
|         # get latest query's worth of history all the way | ||||
|         # back to what is recorded in the tsdb | ||||
|  | @ -260,7 +261,9 @@ async def start_backfill( | |||
|         for delay_s in sampler.subscribers: | ||||
|             await broadcast(delay_s) | ||||
| 
 | ||||
|         # signal that backfilling to tsdb's end datum is complete | ||||
|         bf_done = trio.Event() | ||||
| 
 | ||||
|         # let caller unblock and deliver latest history frame | ||||
|         task_status.started((shm, start_dt, end_dt, bf_done)) | ||||
| 
 | ||||
|  | @ -269,7 +272,7 @@ async def start_backfill( | |||
|             pendulum.from_timestamp(times[-1]) - | ||||
|             pendulum.from_timestamp(times[-2]) | ||||
|         ).seconds | ||||
|         frame_step_s = (end_dt - start_dt).seconds | ||||
|         frame_size_s = len(to_push) * step_size_s | ||||
| 
 | ||||
|         if last_tsdb_dt is None: | ||||
|             # maybe a better default (they don't seem to define epoch?!) | ||||
|  | @ -277,7 +280,7 @@ async def start_backfill( | |||
|             # based on the sample step size load a certain amount | ||||
|             # history | ||||
|             if step_size_s == 1: | ||||
|                 last_tsdb_dt = pendulum.now().subtract(weeks=2) | ||||
|                 last_tsdb_dt = pendulum.now().subtract(days=6) | ||||
| 
 | ||||
|             elif step_size_s == 60: | ||||
|                 last_tsdb_dt = pendulum.now().subtract(years=2) | ||||
|  | @ -290,69 +293,159 @@ async def start_backfill( | |||
|                     'do dat bruh.' | ||||
|                 ) | ||||
| 
 | ||||
|         hist_period = pendulum.period( | ||||
|             start_dt.subtract(seconds=step_size_s), | ||||
|             last_tsdb_dt, | ||||
|         ) | ||||
|         end_dts = list(hist_period.range('seconds', frame_step_s)) | ||||
|         # configure async query throttling | ||||
|         erlangs = config.get('erlangs', 1) | ||||
|         rate = config.get('rate', 1) | ||||
|         frames = {} | ||||
| 
 | ||||
|         def iter_dts(start: datetime): | ||||
|             while True: | ||||
| 
 | ||||
|                 hist_period = pendulum.period( | ||||
|                     start.subtract(seconds=step_size_s), | ||||
|                     last_tsdb_dt, | ||||
|                 ) | ||||
|                 dtrange = hist_period.range('seconds', frame_size_s) | ||||
| 
 | ||||
|                 for end_dt in dtrange: | ||||
|                     log.warning(f'Yielding next frame start {end_dt}') | ||||
|                     start = yield end_dt | ||||
| 
 | ||||
|                     # if caller sends a new start date, reset to that | ||||
|                     if start is not None: | ||||
|                         log.warning(f'Resetting date range: {start}') | ||||
|                         # import pdbpp | ||||
|                         # pdbpp.set_trace() | ||||
|                         break | ||||
|                 else: | ||||
|                     # from while | ||||
|                     return | ||||
| 
 | ||||
|         # pull new history frames until we hit latest | ||||
|         # already in the tsdb or a max count. | ||||
|         count = 0 | ||||
|         frames = {} | ||||
| 
 | ||||
|         # NOTE: when gaps are detected in the retreived history (by | ||||
|         # comparisor of the end - start versus the expected "frame size" | ||||
|         # in seconds) we need a way to alert the async request code not | ||||
|         # to continue to query for data "within the gap". This var is | ||||
|         # set in such cases such that further requests in that period | ||||
|         # are discarded and further we reset the "datetimem query frame | ||||
|         # index" in such cases to avoid needless noop requests. | ||||
|         earliest_end_dt: Optional[datetime] = start_dt | ||||
| 
 | ||||
|         async def get_ohlc_frame( | ||||
|             input_end_dt: datetime, | ||||
|             iter_dts_gen: Generator[datetime], | ||||
| 
 | ||||
|         ) -> np.ndarray: | ||||
| 
 | ||||
|             nonlocal count | ||||
|             nonlocal count, frames, earliest_end_dt, frame_size_s | ||||
|             count += 1 | ||||
| 
 | ||||
|             if input_end_dt > earliest_end_dt: | ||||
|                 # if a request comes in for an inter-gap frame we | ||||
|                 # discard it since likely this request is still | ||||
|                 # lingering from before the reset of ``iter_dts()`` via | ||||
|                 # ``.send()`` below. | ||||
|                 log.info(f'Discarding request history ending @ {input_end_dt}') | ||||
| 
 | ||||
|                 # signals to ``trimeter`` loop to discard and | ||||
|                 # ``continue`` in it's schedule loop. | ||||
|                 return None | ||||
| 
 | ||||
|             try: | ||||
|                 log.info( | ||||
|                     f'Requesting {step_size_s}s frame ending in {input_end_dt}' | ||||
|                 ) | ||||
|                 array, start_dt, end_dt = await hist(end_dt=input_end_dt) | ||||
|                 # if input_end_dt.timestamp() == end_dts[0].timestamp(): | ||||
|                 #     await tractor.breakpoint() | ||||
|                 assert array['time'][0] == start_dt.timestamp() | ||||
| 
 | ||||
|             except NoData: | ||||
|                 # decrement by the diff in time last delivered. | ||||
|                 end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds) | ||||
|                 log.warning('no data for range {(end_dt - start_dt)} ?!?') | ||||
|                 # continue | ||||
|                 log.warning( | ||||
|                     f'NO DATA for {frame_size_s}s frame @ {end_dt} ?!?' | ||||
|                 ) | ||||
|                 return None  # discard signal | ||||
| 
 | ||||
|             except DataUnavailable: | ||||
|             except DataUnavailable as duerr: | ||||
|                 # broker is being a bish and we can't pull | ||||
|                 # any more.. | ||||
|                 log.warning('backend halted on data deliver !?!?') | ||||
|                 return input_end_dt, None | ||||
| 
 | ||||
|                 # ugh, what's a better way? | ||||
|                 # TODO: fwiw, we probably want a way to signal a throttle | ||||
|                 # condition (eg. with ib) so that we can halt the | ||||
|                 # request loop until the condition is resolved? | ||||
|                 return duerr | ||||
| 
 | ||||
|             diff = end_dt - start_dt | ||||
|             frame_time_diff_s = diff.seconds | ||||
|             expected_frame_size_s = frame_size_s + step_size_s | ||||
| 
 | ||||
|             if frame_time_diff_s > expected_frame_size_s: | ||||
| 
 | ||||
|                 # XXX: query result includes a start point prior to our | ||||
|                 # expected "frame size" and thus is likely some kind of | ||||
|                 # history gap (eg. market closed period, outage, etc.) | ||||
|                 # so indicate to the request loop that this gap is | ||||
|                 # expected by both, | ||||
|                 # - resetting the ``iter_dts()`` generator to start at | ||||
|                 #   the new start point delivered in this result | ||||
|                 # - setting the non-locally scoped ``earliest_end_dt`` | ||||
|                 #   to this new value so that the request loop doesn't | ||||
|                 #   get tripped up thinking there's an out of order | ||||
|                 #   request-result condition. | ||||
| 
 | ||||
|                 log.warning( | ||||
|                     f'History frame ending @ {end_dt} appears to have a gap:\n' | ||||
|                     f'{diff} ~= {frame_time_diff_s} seconds' | ||||
|                 ) | ||||
| 
 | ||||
|                 # reset dtrange gen to new start point | ||||
|                 next_end = iter_dts_gen.send(start_dt) | ||||
|                 log.info( | ||||
|                     f'Reset frame index to start at {start_dt}\n' | ||||
|                     f'Was at {next_end}' | ||||
|                 ) | ||||
| 
 | ||||
|                 # TODO: can we avoid this? | ||||
|                 earliest_end_dt = start_dt | ||||
| 
 | ||||
|             to_push = diff_history( | ||||
|                 array, | ||||
|                 start_dt, | ||||
|                 end_dt, | ||||
| 
 | ||||
|                 last_tsdb_dt=last_tsdb_dt, | ||||
|                 # XXX: hacky, just run indefinitely | ||||
|                 # last_tsdb_dt=None, | ||||
|             ) | ||||
|             print(f"PULLING {count}") | ||||
|             log.info(f'Pushing {to_push.size} to shm!') | ||||
|             ln = len(to_push) | ||||
|             if ln: | ||||
|                 log.info(f'{ln} bars for {start_dt} -> {end_dt}') | ||||
|                 frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) | ||||
|                 return to_push, start_dt, end_dt | ||||
| 
 | ||||
|             frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) | ||||
|             else: | ||||
|                 log.warning( | ||||
|                     f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' | ||||
|                 ) | ||||
|                 return None | ||||
| 
 | ||||
|             return to_push, start_dt, end_dt | ||||
|         # initial dt index starts at the start of the first query result | ||||
|         idts = iter_dts(start_dt) | ||||
| 
 | ||||
|             # if to_push.size < 1: | ||||
|             #     print('UHHH SIZE <1 BREAKING!?') | ||||
|             #     break | ||||
| 
 | ||||
|         rate = erlangs = 5 | ||||
|         async with trimeter.amap( | ||||
| 
 | ||||
|             get_ohlc_frame, | ||||
|             end_dts, | ||||
|             partial( | ||||
|                 get_ohlc_frame, | ||||
|                 # we close in the ``iter_dt()`` gen in so we can send | ||||
|                 # reset signals as needed for gap dection in the | ||||
|                 # history. | ||||
|                 iter_dts_gen=idts, | ||||
|             ), | ||||
|             idts, | ||||
| 
 | ||||
|             capture_outcome=True, | ||||
|             include_value=True, | ||||
| 
 | ||||
|             # better technical names bruv... | ||||
|             max_at_once=erlangs, | ||||
|             max_per_second=rate, | ||||
| 
 | ||||
|  | @ -362,59 +455,101 @@ async def start_backfill( | |||
|             # (i.e., not necessarily in the original order) | ||||
|             async for input_end_dt, outcome in outcomes: | ||||
| 
 | ||||
|                 # no data available case.. | ||||
|                 if outcome is None: | ||||
|                     break | ||||
| 
 | ||||
|                 try: | ||||
|                     out = outcome.unwrap() | ||||
| 
 | ||||
|                     if out is None: | ||||
|                         # skip signal | ||||
|                         continue | ||||
| 
 | ||||
|                     elif isinstance(out, DataUnavailable): | ||||
|                         # no data available case signal.. so just kill | ||||
|                         # further requests and basically just stop | ||||
|                         # trying... | ||||
|                         break | ||||
| 
 | ||||
|                 except Exception: | ||||
|                     log.exception('uhh trimeter bail') | ||||
|                     raise | ||||
|                 else: | ||||
|                     to_push, start_dt, end_dt = out | ||||
| 
 | ||||
|                 if not len(to_push): | ||||
|                     # diff returned no new data (i.e. we probablyl hit | ||||
|                     # the ``last_tsdb_dt`` point). | ||||
|                     # TODO: raise instead? | ||||
|                     log.warning(f'No history for range {start_dt} -> {end_dt}') | ||||
|                     continue | ||||
| 
 | ||||
|                 # pipeline-style pull frames until we need to wait for | ||||
|                 # the next in order to arrive. | ||||
|                 i = end_dts.index(input_end_dt) | ||||
|                 print(f'latest end_dt {end_dt} found at index {i}') | ||||
|                 # i = end_dts.index(input_end_dt) | ||||
|                 # print(f'latest end_dt {end_dt} found at index {i}') | ||||
| 
 | ||||
|                 for epoch in reversed(sorted(frames)): | ||||
|                 epochs = list(reversed(sorted(frames))) | ||||
|                 for epoch in epochs: | ||||
|                     start = shm.array['time'][0] | ||||
| 
 | ||||
|                     # we don't yet have the next frame to push | ||||
|                     # so break back to the async request loop. | ||||
|                     diff = epoch - start | ||||
|                     if abs(diff) > step_size_s: | ||||
|                         if len(frames) > 20: | ||||
| 
 | ||||
|                         if earliest_end_dt < end_dt: | ||||
|                             # XXX: an expected gap was encountered (see | ||||
|                             # logic in ``get_ohlc_frame()``, so allow | ||||
|                             # this frame through to the storage layer. | ||||
|                             log.warning( | ||||
|                                 f'there appears to be a history gap of {diff}?' | ||||
|                                 f'there is an expected history gap of {diff}s:' | ||||
|                             ) | ||||
| 
 | ||||
|                         elif ( | ||||
|                             erlangs > 1 | ||||
|                             and len(epochs) < erlangs | ||||
|                         ): | ||||
|                             # we don't yet have the next frame to push | ||||
|                             # so break back to the async request loop | ||||
|                             # while we wait for more async frame-results | ||||
|                             # to arrive. | ||||
|                             expect_end = pendulum.from_timestamp(start) | ||||
|                             expect_start = expect_end.subtract( | ||||
|                                 seconds=frame_size_s) | ||||
|                             log.warning( | ||||
|                                 'waiting on out-of-order history frame:\n' | ||||
|                                 f'{expect_end - expect_start}' | ||||
|                             ) | ||||
|                             # from pprint import pprint | ||||
|                             # await tractor.breakpoint() | ||||
|                         else: | ||||
|                             break | ||||
| 
 | ||||
|                     to_push, start_dt, end_dt = frames.pop(epoch) | ||||
|                     print(f'pushing frame ending at {end_dt}') | ||||
| 
 | ||||
|                     if not len(to_push): | ||||
|                         break | ||||
| 
 | ||||
|                     # bail on shm allocation overrun | ||||
|                     # bail gracefully on shm allocation overrun/full condition | ||||
|                     try: | ||||
|                         shm.push(to_push, prepend=True) | ||||
|                     except ValueError: | ||||
|                         await tractor.breakpoint() | ||||
|                         log.info( | ||||
|                             f'Shm buffer overrun on: {start_dt} -> {end_dt}?' | ||||
|                         ) | ||||
|                         # await tractor.breakpoint() | ||||
|                         break | ||||
| 
 | ||||
|                     for delay_s in sampler.subscribers: | ||||
|                         await broadcast(delay_s) | ||||
|                     log.info( | ||||
|                         f'Shm pushed {len(to_push)} frame:\n' | ||||
|                         f'{start_dt} -> {end_dt}' | ||||
|                     ) | ||||
| 
 | ||||
|                     # keep track of most recent "prepended" ``start_dt`` | ||||
|                     # both for detecting gaps and ensuring async | ||||
|                     # frame-result order. | ||||
|                     earliest_end_dt = start_dt | ||||
| 
 | ||||
|                 # TODO: can we only trigger this if the respective | ||||
|                 # history in "in view"?!? | ||||
|                 # XXX: extremely important, there can be no checkpoints | ||||
|                 # in the block above to avoid entering new ``frames`` | ||||
|                 # values while we're pipelining the current ones to | ||||
|                 # memory... | ||||
|                 for delay_s in sampler.subscribers: | ||||
|                     await broadcast(delay_s) | ||||
| 
 | ||||
|         bf_done.set() | ||||
|         # update start index to include all tsdb history | ||||
|         # that was pushed in the caller parent task. | ||||
|         # shm._first.value = 0 | ||||
| 
 | ||||
| 
 | ||||
| async def manage_history( | ||||
|  | @ -490,6 +625,17 @@ async def manage_history( | |||
|                     last_tsdb_dt=last_dt, | ||||
|                 ) | ||||
|             ) | ||||
| 
 | ||||
|             # if len(shm.array) < 2: | ||||
|             # TODO: there's an edge case here to solve where if the last | ||||
|             # frame before market close (at least on ib) was pushed and | ||||
|             # there was only "1 new" row pushed from the first backfill | ||||
|             # query-iteration, then the sample step sizing calcs will | ||||
|             # break upstream from here since you can't diff on at least | ||||
|             # 2 steps... probably should also add logic to compute from | ||||
|             # the tsdb series and stash that somewhere as meta data on | ||||
|             # the shm buffer?.. no se. | ||||
| 
 | ||||
|             task_status.started(shm) | ||||
|             some_data_ready.set() | ||||
| 
 | ||||
|  | @ -524,14 +670,7 @@ async def manage_history( | |||
|                     prepend=True, | ||||
|                     # update_first=False, | ||||
|                     # start=prepend_start, | ||||
|                     field_map={ | ||||
|                         'Epoch': 'time', | ||||
|                         'Open': 'open', | ||||
|                         'High': 'high', | ||||
|                         'Low': 'low', | ||||
|                         'Close': 'close', | ||||
|                         'Volume': 'volume', | ||||
|                     }, | ||||
|                     field_map=marketstore.ohlc_key_map, | ||||
|                 ) | ||||
| 
 | ||||
|                 # load as much from storage into shm as spacec will | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue