diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 2d13541a..cb934187 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -49,7 +49,12 @@ from bidict import bidict import trio import tractor from tractor import to_asyncio -import pendulum +from pendulum import ( + from_timestamp, + DateTime, + Duration, + duration as mk_duration, +) from eventkit import Event from ib_insync import ( client as ib_client, @@ -221,16 +226,20 @@ def bars_to_np(bars: list) -> np.ndarray: # https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd _samplings: dict[int, tuple[str, str]] = { 1: ( + # ib strs '1 secs', f'{int(2e3)} S', - pendulum.duration(seconds=2e3), + + mk_duration(seconds=2e3), ), # TODO: benchmark >1 D duration on query to see if # throughput can be made faster during backfilling. 60: ( + # ib strs '1 min', '2 D', - pendulum.duration(days=2), + + mk_duration(days=2), ), } @@ -315,7 +324,7 @@ class Client: **kwargs, - ) -> tuple[BarDataList, np.ndarray, pendulum.Duration]: + ) -> tuple[BarDataList, np.ndarray, Duration]: ''' Retreive OHLCV bars for a fqme over a range to the present. @@ -324,11 +333,20 @@ class Client: # https://interactivebrokers.github.io/tws-api/historical_data.html bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs.update(kwargs) - bar_size, duration, dt_duration = _samplings[sample_period_s] + ( + bar_size, + ib_duration_str, + default_dt_duration, + ) = _samplings[sample_period_s] + + dt_duration: DateTime = ( + duration + or default_dt_duration + ) global _enters log.info( - f"REQUESTING {duration}'s worth {bar_size} BARS\n" + f"REQUESTING {ib_duration_str}'s worth {bar_size} BARS\n" f'{_enters} @ end={end_dt}"' ) @@ -353,7 +371,7 @@ class Client: # time history length values format: # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` - durationStr=duration, + durationStr=ib_duration_str, # always use extended hours useRTH=False, @@ -383,29 +401,55 @@ class Client: # => we recursively call this method until we get at least # as many bars such that they sum in aggregate to the the # desired total time (duration) at most. - elif ( - end_dt - and ( - (len(bars) * sample_period_s) < dt_duration.in_seconds() - ) - ): - log.warning( - f'Recursing to get more bars from {end_dt} for {dt_duration}' - ) - end_dt -= dt_duration - ( - r_bars, - r_arr, - r_duration, - ) = await self.bars( - fqme, - start_dt=start_dt, - end_dt=end_dt, - ) - r_bars.extend(bars) - bars = r_bars + if end_dt: + nparr: np.ndarray = bars_to_np(bars) + times: np.ndarray = nparr['time'] + first: float = times[0] + tdiff: float = times[-1] - first + + if ( + # len(bars) * sample_period_s) < dt_duration.in_seconds() + tdiff < dt_duration.in_seconds() + ): + end_dt: DateTime = from_timestamp(first) + log.warning( + f'Frame result was shorter then {dt_duration}!?\n' + 'Recursing for more bars:\n' + f'end_dt: {end_dt}\n' + f'dt_duration: {dt_duration}\n' + ) + ( + r_bars, + r_arr, + r_duration, + ) = await self.bars( + fqme, + start_dt=start_dt, + end_dt=end_dt, + sample_period_s=sample_period_s, + + # TODO: make a table for Duration to + # the ib str values in order to use this? + # duration=duration, + ) + r_bars.extend(bars) + bars = r_bars nparr = bars_to_np(bars) + + # timestep should always be at least as large as the + # period step. + tdiff: np.ndarray = np.diff(nparr['time']) + to_short: np.ndarray = tdiff < sample_period_s + if (to_short).any(): + # raise ValueError( + log.error( + f'OHLC frame for {sample_period_s} has {to_short.size} ' + 'time steps which are shorter then expected?!"' + ) + # OOF: this will break teardown? + # breakpoint() + return bars, nparr, dt_duration async def con_deats( diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 1b144e3c..b317da22 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -20,7 +20,7 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations from contextlib import ExitStack -from collections import ChainMap +# from collections import ChainMap from functools import partial from pprint import pformat import time diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index bf84cba4..99b8aef9 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -196,10 +196,8 @@ async def open_history_client( f'mean: {mean}' ) - if ( - out is None - ): - # could be trying to retreive bars over weekend + # could be trying to retreive bars over weekend + if out is None: log.error(f"Can't grab bars starting at {end_dt}!?!?") raise NoData( f'{end_dt}', @@ -213,7 +211,24 @@ async def open_history_client( ): raise DataUnavailable(f'First timestamp is {head_dt}') - bars, bars_array, first_dt, last_dt = out + # also see return type for `get_bars()` + bars: ibis.objects.BarDataList + bars_array: np.ndarray + first_dt: datetime + last_dt: datetime + ( + bars, + bars_array, + first_dt, + last_dt, + ) = out + + # TODO: audit the sampling period here as well? + # timestep should always be at least as large as the + # period step. + # tdiff: np.ndarray = np.diff(bars_array['time']) + # if (tdiff < timeframe).any(): + # await tractor.pause() # volume cleaning since there's -ve entries, # wood luv to know what crookery that is..