diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index e23ad50f..05fd47f4 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -43,6 +43,7 @@ from bidict import bidict import trio import tractor from tractor import to_asyncio +import pendulum import ib_insync as ibis from ib_insync.contract import ( Contract, @@ -52,6 +53,7 @@ from ib_insync.contract import ( from ib_insync.order import Order from ib_insync.ticker import Ticker from ib_insync.objects import ( + BarDataList, Position, Fill, Execution, @@ -248,7 +250,7 @@ _enters = 0 def bars_to_np(bars: list) -> np.ndarray: ''' - Convert a "bars list thing" (``BarsList`` type from ibis) + Convert a "bars list thing" (``BarDataList`` type from ibis) into a numpy struct array. ''' @@ -274,10 +276,18 @@ def bars_to_np(bars: list) -> np.ndarray: # but they say "use with discretion": # https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd _samplings: dict[int, tuple[str, str]] = { - 1: ('1 secs', f'{int(2e3)} S'), + 1: ( + '1 secs', + f'{int(2e3)} S', + pendulum.duration(seconds=2e3), + ), # TODO: benchmark >1 D duration on query to see if # throughput can be made faster during backfilling. - 60: ('1 min', '1 D'), + 60: ( + '1 min', + '1 D', + pendulum.duration(days=1), + ), } @@ -344,7 +354,7 @@ class Client: **kwargs, - ) -> list[dict[str, Any]]: + ) -> tuple[BarDataList, np.ndarray, pendulum.Duration]: ''' Retreive OHLCV bars for a fqsn over a range to the present. @@ -353,7 +363,7 @@ class Client: # https://interactivebrokers.github.io/tws-api/historical_data.html bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs.update(kwargs) - bar_size, duration = _samplings[sample_period_s] + bar_size, duration, dt_duration = _samplings[sample_period_s] global _enters # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') @@ -408,7 +418,7 @@ class Client: # way to detect a timeout. nparr = bars_to_np(bars) - return bars, nparr + return bars, nparr, dt_duration async def con_deats( self, diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 6bef877c..614c7f4b 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -122,19 +122,36 @@ async def open_history_client( async with open_data_client() as proxy: + max_timeout: float = 2. + mean: float = 0 + count: int = 0 + async def get_hist( timeframe: float, end_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None, ) -> tuple[np.ndarray, str]: + nonlocal max_timeout, mean, count - out = await get_bars( + query_start = time.time() + out, timedout = await get_bars( proxy, symbol, timeframe, end_dt=end_dt, ) + latency = time.time() - query_start + if ( + not timedout + # and latency <= max_timeout + ): + count += 1 + mean += latency / count + print( + f'HISTORY FRAME QUERY LATENCY: {latency}\n' + f'mean: {mean}' + ) if out is None: # could be trying to retreive bars over weekend @@ -245,6 +262,8 @@ async def get_bars( # blank to start which tells ib to look up the latest datum end_dt: str = '', + + # TODO: make this more dynamic based on measured frame rx latency.. timeout: float = 1.5, # how long before we trigger a feed reset task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -280,17 +299,15 @@ async def get_bars( # timeout=timeout, ) if out is None: - raise NoData( - f'{end_dt}', - # frame_size=2000, - ) + raise NoData(f'{end_dt}') - bars, bars_array = out + bars, bars_array, dt_duration = out if not bars: - # TODO: duration lookup for this - end_dt = end_dt.subtract(days=1) - print("SUBTRACTING DAY") + log.warning( + f'History is blank for {dt_duration} from {end_dt}' + ) + end_dt = end_dt.subtract(dt_duration) continue if bars_array is None: @@ -328,42 +345,35 @@ async def get_bars( f'Symbol: {fqsn}', ) - elif ( - err.code == 162 and - 'HMDS query returned no data' in err.message - ): - # XXX: this is now done in the storage mgmt layer - # and we shouldn't implicitly decrement the frame dt - # index since the upper layer may be doing so - # concurrently and we don't want to be delivering frames - # that weren't asked for. - log.warning( - f'NO DATA found ending @ {end_dt}\n' - ) + elif err.code == 162: + if 'HMDS query returned no data' in err.message: + # XXX: this is now done in the storage mgmt + # layer and we shouldn't implicitly decrement + # the frame dt index since the upper layer may + # be doing so concurrently and we don't want to + # be delivering frames that weren't asked for. + log.warning( + f'NO DATA found ending @ {end_dt}\n' + ) - # try to decrement start point and look further back - # end_dt = end_dt.subtract(seconds=2000) - end_dt = end_dt.subtract(days=1) - print("SUBTRACTING DAY") - continue + # try to decrement start point and look further back + # end_dt = end_dt.subtract(seconds=2000) + end_dt = end_dt.subtract(days=1) + print("SUBTRACTING DAY") + continue - elif ( - err.code == 162 and - 'API historical data query cancelled' in err.message - ): - log.warning( - 'Query cancelled by IB (:eyeroll:):\n' - f'{err.message}' - ) - continue - - # elif ( - # err.code == 162 and - # 'Trading TWS session is connected from a different IP - # address' in err.message - # ): - # log.warning("ignoring ip address warning") - # continue + elif 'API historical data query cancelled' in err.message: + log.warning( + 'Query cancelled by IB (:eyeroll:):\n' + f'{err.message}' + ) + continue + elif ( + 'Trading TWS session is connected from a different IP' + in err.message + ): + log.warning("ignoring ip address warning") + continue # XXX: more or less same as above timeout case elif _pacing in msg: @@ -402,7 +412,7 @@ async def get_bars( with trio.move_on_after(timeout): await result_ready.wait() - continue + break # spawn new data reset task data_cs, reset_done = await nurse.start( @@ -410,13 +420,12 @@ async def get_bars( wait_on_data_reset, proxy, timeout=float('inf'), - # timeout=timeout, ) ) # sync wait on reset to complete await reset_done.wait() - return result + return result, data_cs is not None asset_type_map = {