diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 18981f60..e0da8d1c 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -114,12 +114,18 @@ async def open_history_client( async with open_data_client() as proxy: async def get_hist( + timeframe: float, end_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None, ) -> tuple[np.ndarray, str]: - out, fails = await get_bars(proxy, symbol, end_dt=end_dt) + out, fails = await get_bars( + proxy, + symbol, + timeframe, + end_dt=end_dt, + ) # TODO: add logic here to handle tradable hours and only grab # valid bars in the range @@ -145,7 +151,7 @@ async def open_history_client( # quite sure why.. needs some tinkering and probably # a lookthrough of the ``ib_insync`` machinery, for eg. maybe # we have to do the batch queries on the `asyncio` side? - yield get_hist, {'erlangs': 1, 'rate': 6} + yield get_hist, {'erlangs': 1, 'rate': 3} _pacing: str = ( @@ -154,14 +160,99 @@ _pacing: str = ( ) +async def wait_on_data_reset( + proxy: MethodProxy, + tries: int = 2, + timeout: float = 16, + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +): + + # TODO: we might have to put a task lock around this + # method.. + hist_ev = proxy.status_event( + 'HMDS data farm connection is OK:ushmds' + ) + + # XXX: other event messages we might want to try and + # wait for but i wasn't able to get any of this + # reliable.. + # reconnect_start = proxy.status_event( + # 'Market data farm is connecting:usfuture' + # ) + # live_ev = proxy.status_event( + # 'Market data farm connection is OK:usfuture' + # ) + # try to wait on the reset event(s) to arrive, a timeout + # will trigger a retry up to 6 times (for now). + + # try 3 time with a data reset then fail over to + # a connection reset. + for i in range(1, tries): + + log.warning('Sending DATA RESET request') + await data_reset_hack(reset_type='data') + task_status.started() + + with trio.move_on_after(timeout) as cs: + for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + break + + if cs.cancelled_caught: + # fails += 1 + log.warning( + f'Data reset {name} timeout, retrying {i}.' + ) + + continue + else: + + log.warning('Sending CONNECTION RESET') + res = await data_reset_hack(reset_type='connection') + if not res: + log.warning( + 'NO VNC DETECTED!\n' + 'Manually press ctrl-alt-f on your IB java app' + ) + # break + + with trio.move_on_after(timeout) as cs: + for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + + if cs.cancelled_caught: + # fails += 1 + log.warning('Data CONNECTION RESET timeout!?') + + async def get_bars( proxy: MethodProxy, fqsn: str, + period: float, # blank to start which tells ib to look up the latest datum end_dt: str = '', + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> (dict, np.ndarray): ''' Retrieve historical data from a ``trio``-side task using @@ -176,157 +267,111 @@ async def get_bars( if end_dt: last_dt = pendulum.from_timestamp(end_dt.timestamp()) - for _ in range(10): - try: - out = await proxy.bars( - fqsn=fqsn, - end_dt=end_dt, - ) - if out: - bars, bars_array = out + timeout: float = float('inf') + async with trio.open_nursery() as nurse: + for _ in range(10): + try: + out = None + with trio.move_on_after(timeout) as cs: + out = await proxy.bars( + fqsn=fqsn, + end_dt=end_dt, + sample_period_s=period, + ) + timeout = 3 - else: - await tractor.breakpoint() + if ( + cs.cancelled_caught + and out is None + ): + print(f"RESETTING DATA after {timeout}") + await nurse.start( + wait_on_data_reset, + proxy, + timeout=float('inf'), + tries=100, + ) + # scale timeout up exponentially to avoid + # request-overruning the history farm. + # timeout *= 2 + continue - if bars_array is None: - raise SymbolNotFound(fqsn) + if out: + bars, bars_array = out - first_dt = pendulum.from_timestamp( - bars[0].date.timestamp()) - - last_dt = pendulum.from_timestamp( - bars[-1].date.timestamp()) - - time = bars_array['time'] - assert time[-1] == last_dt.timestamp() - assert time[0] == first_dt.timestamp() - log.info( - f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' - ) - - return (bars, bars_array, first_dt, last_dt), fails - - except RequestError as err: - msg = err.message - - if 'No market data permissions for' in msg: - # TODO: signalling for no permissions searches - raise NoData( - f'Symbol: {fqsn}', - ) - - elif ( - err.code == 162 and - 'HMDS query returned no data' in err.message - ): - # XXX: this is now done in the storage mgmt layer - # and we shouldn't implicitly decrement the frame dt - # index since the upper layer may be doing so - # concurrently and we don't want to be delivering frames - # that weren't asked for. - log.warning( - f'NO DATA found ending @ {end_dt}\n' - ) - - # try to decrement start point and look further back - # end_dt = last_dt = last_dt.subtract(seconds=2000) - - raise NoData( - f'Symbol: {fqsn}', - frame_size=2000, - ) - - # elif ( - # err.code == 162 and - # 'Trading TWS session is connected from a different IP - # address' in err.message - # ): - # log.warning("ignoring ip address warning") - # continue - - elif _pacing in msg: - - log.warning( - 'History throttle rate reached!\n' - 'Resetting farms with `ctrl-alt-f` hack\n' - ) - # TODO: we might have to put a task lock around this - # method.. - hist_ev = proxy.status_event( - 'HMDS data farm connection is OK:ushmds' - ) - - # XXX: other event messages we might want to try and - # wait for but i wasn't able to get any of this - # reliable.. - # reconnect_start = proxy.status_event( - # 'Market data farm is connecting:usfuture' - # ) - # live_ev = proxy.status_event( - # 'Market data farm connection is OK:usfuture' - # ) - - # try to wait on the reset event(s) to arrive, a timeout - # will trigger a retry up to 6 times (for now). - tries: int = 2 - timeout: float = 10 - - # try 3 time with a data reset then fail over to - # a connection reset. - for i in range(1, tries): - - log.warning('Sending DATA RESET request') - await data_reset_hack(reset_type='data') - - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") - break - - if cs.cancelled_caught: - fails += 1 - log.warning( - f'Data reset {name} timeout, retrying {i}.' - ) - - continue else: + raise NoData( + f'{end_dt}', + # frame_size=2000, + ) - log.warning('Sending CONNECTION RESET') - res = await data_reset_hack(reset_type='connection') - if not res: - log.warning( - 'NO VNC DETECTED!\n' - 'Manually press ctrl-alt-f on your IB java app' - ) - # break + if bars_array is None: + raise SymbolNotFound(fqsn) - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") + first_dt = pendulum.from_timestamp( + bars[0].date.timestamp()) - if cs.cancelled_caught: - fails += 1 - log.warning('Data CONNECTION RESET timeout!?') + last_dt = pendulum.from_timestamp( + bars[-1].date.timestamp()) - else: - raise + time = bars_array['time'] + assert time[-1] == last_dt.timestamp() + assert time[0] == first_dt.timestamp() + log.info( + f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' + ) + + return (bars, bars_array, first_dt, last_dt), fails + + except RequestError as err: + msg = err.message + + if 'No market data permissions for' in msg: + # TODO: signalling for no permissions searches + raise NoData( + f'Symbol: {fqsn}', + ) + + elif ( + err.code == 162 and + 'HMDS query returned no data' in err.message + ): + # XXX: this is now done in the storage mgmt layer + # and we shouldn't implicitly decrement the frame dt + # index since the upper layer may be doing so + # concurrently and we don't want to be delivering frames + # that weren't asked for. + log.warning( + f'NO DATA found ending @ {end_dt}\n' + ) + + # try to decrement start point and look further back + # end_dt = last_dt = last_dt.subtract(seconds=2000) + + raise NoData( + f'Symbol: {fqsn}', + # TODO: fix this since we don't strictly use 1s + # ohlc any more XD + frame_size=2000, + ) + + # elif ( + # err.code == 162 and + # 'Trading TWS session is connected from a different IP + # address' in err.message + # ): + # log.warning("ignoring ip address warning") + # continue + + elif _pacing in msg: + log.warning( + 'History throttle rate reached!\n' + 'Resetting farms with `ctrl-alt-f` hack\n' + ) + await wait_on_data_reset(proxy) + + else: + raise return None, None # else: # throttle wasn't fixed so error out immediately @@ -337,6 +382,7 @@ async def backfill_bars( fqsn: str, shm: ShmArray, # type: ignore # noqa + timeframe: float = 1, # in seconds # TODO: we want to avoid overrunning the underlying shm array buffer # and we should probably calc the number of calls to make depending @@ -362,7 +408,7 @@ async def backfill_bars( async with open_data_client() as proxy: - out, fails = await get_bars(proxy, fqsn) + out, fails = await get_bars(proxy, fqsn, timeframe) if out is None: raise RuntimeError("Could not pull currrent history?!") @@ -380,7 +426,12 @@ async def backfill_bars( i = 0 while i < count: - out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) + out, fails = await get_bars( + proxy, + fqsn, + timeframe, + end_dt=first_dt, + ) if out is None: # could be trying to retreive bars over weekend