diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 3eb4b8df..53910f38 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -107,7 +107,7 @@ async def open_data_client() -> MethodProxy: @acm async def open_history_client( - symbol: str, + fqsn: str, ) -> tuple[Callable, int]: ''' @@ -130,7 +130,7 @@ async def open_history_client( mean: float = 0 count: int = 0 - head_dt = await proxy.get_head_time(fqsn=symbol) + head_dt = await proxy.get_head_time(fqsn=fqsn) async def get_hist( timeframe: float, @@ -143,7 +143,7 @@ async def open_history_client( query_start = time.time() out, timedout = await get_bars( proxy, - symbol, + fqsn, timeframe, end_dt=end_dt, ) @@ -169,7 +169,9 @@ async def open_history_client( # frame_size=2000, ) - if end_dt and end_dt <= head_dt: + if ( + end_dt and end_dt <= head_dt + ): raise DataUnavailable(f'First timestamp is {head_dt}') bars, bars_array, first_dt, last_dt = out @@ -277,8 +279,14 @@ async def get_bars( # blank to start which tells ib to look up the latest datum end_dt: str = '', - # TODO: make this more dynamic based on measured frame rx latency.. - timeout: float = 3, # how long before we trigger a feed reset + # TODO: make this more dynamic based on measured frame rx latency? + # how long before we trigger a feed reset (seconds) + feed_reset_timeout: float = 3, + + # how many days to subtract before giving up on further + # history queries for instrument, presuming that most don't + # not trade for a week XD + max_nodatas: int = 6, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -289,6 +297,7 @@ async def get_bars( ''' global _data_resetter_task + nodatas_count: int = 0 data_cs: trio.CancelScope | None = None result: tuple[ @@ -300,7 +309,7 @@ async def get_bars( result_ready = trio.Event() async def query(): - nonlocal result, data_cs, end_dt + nonlocal result, data_cs, end_dt, nodatas_count while True: try: out = await proxy.bars( @@ -362,24 +371,34 @@ async def get_bars( ) elif err.code == 162: - if 'HMDS query returned no data' in err.message: + if ( + 'HMDS query returned no data' in msg + ): # XXX: this is now done in the storage mgmt # layer and we shouldn't implicitly decrement # the frame dt index since the upper layer may # be doing so concurrently and we don't want to # be delivering frames that weren't asked for. - log.warning( - f'NO DATA found ending @ {end_dt}\n' - ) - # try to decrement start point and look further back # end_dt = end_dt.subtract(seconds=2000) + logmsg = "SUBTRACTING DAY from DT index" if end_dt is not None: end_dt = end_dt.subtract(days=1) - print("SUBTRACTING DAY") - else: + elif end_dt is None: end_dt = pendulum.now().subtract(days=1) + log.warning( + f'NO DATA found ending @ {end_dt}\n' + + logmsg + ) + + if nodatas_count >= max_nodatas: + raise DataUnavailable( + f'Presuming {fqsn} has no further history ' + f'after {max_nodatas} tries..' + ) + + nodatas_count += 1 continue elif 'API historical data query cancelled' in err.message: @@ -434,7 +453,7 @@ async def get_bars( # for a result before triggering a data feed reset. while not result_ready.is_set(): - with trio.move_on_after(timeout): + with trio.move_on_after(feed_reset_timeout): await result_ready.wait() break