diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 5117962f..6bef877c 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -22,6 +22,7 @@ import asyncio from contextlib import asynccontextmanager as acm from dataclasses import asdict from datetime import datetime +from functools import partial from math import isnan import time from typing import ( @@ -38,7 +39,6 @@ import tractor import trio from trio_typing import TaskStatus -from piker.data._sharedmem import ShmArray from .._util import SymbolNotFound, NoData from .api import ( # _adhoc_futes_set, @@ -111,6 +111,15 @@ async def open_history_client( that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. ''' + # TODO: + # - add logic to handle tradable hours and only grab + # valid bars in the range? + # - we want to avoid overrunning the underlying shm array buffer and + # we should probably calc the number of calls to make depending on + # that until we have the `marketstore` daemon in place in which case + # the shm size will be driven by user config and available sys + # memory. + async with open_data_client() as proxy: async def get_hist( @@ -120,21 +129,19 @@ async def open_history_client( ) -> tuple[np.ndarray, str]: - out, fails = await get_bars( + out = await get_bars( proxy, symbol, timeframe, end_dt=end_dt, ) - # TODO: add logic here to handle tradable hours and only grab - # valid bars in the range if out is None: # could be trying to retreive bars over weekend log.error(f"Can't grab bars starting at {end_dt}!?!?") raise NoData( f'{end_dt}', - frame_size=2000, + # frame_size=2000, ) bars, bars_array, first_dt, last_dt = out @@ -162,11 +169,16 @@ _pacing: str = ( async def wait_on_data_reset( proxy: MethodProxy, - tries: int = 2, + reset_type: str = 'data', timeout: float = 16, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -): + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + trio.Event, + ] + ] = trio.TASK_STATUS_IGNORED, +) -> bool: # TODO: we might have to put a task lock around this # method.. @@ -186,59 +198,43 @@ async def wait_on_data_reset( # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). - # try 3 time with a data reset then fail over to - # a connection reset. - for i in range(1, tries): + done = trio.Event() + with trio.move_on_after(timeout) as cs: + + task_status.started((cs, done)) log.warning('Sending DATA RESET request') - await data_reset_hack(reset_type='data') + res = await data_reset_hack(reset_type=reset_type) - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - task_status.started(cs) - await ev.wait() - log.info(f"{name} DATA RESET") - break - - if ( - cs.cancelled_caught - and not cs.cancel_called - ): - log.warning( - f'Data reset {name} timeout, retrying {i}.' - ) - continue - else: - - log.warning('Sending CONNECTION RESET') - res = await data_reset_hack(reset_type='connection') if not res: log.warning( 'NO VNC DETECTED!\n' 'Manually press ctrl-alt-f on your IB java app' ) + done.set() + return False - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") + # TODO: not sure if waiting on other events + # is all that useful here or not. + # - in theory you could wait on one of the ones above first + # to verify the reset request was sent? + # - we need the same for real-time quote feeds which can + # sometimes flake out and stop delivering.. + for name, ev in [ + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + done.set() + return True - if cs.cancelled_caught: - log.warning('Data CONNECTION RESET timeout!?') + if cs.cancel_called: + log.warning( + 'Data reset task canceled?' + ) + + done.set() + return False async def get_bars( @@ -249,6 +245,7 @@ async def get_bars( # blank to start which tells ib to look up the latest datum end_dt: str = '', + timeout: float = 1.5, # how long before we trigger a feed reset task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -258,52 +255,44 @@ async def get_bars( a ``MethoProxy``. ''' - fails = 0 - bars: Optional[list] = None - first_dt: datetime = None - last_dt: datetime = None + data_cs: Optional[trio.CancelScope] = None + result: Optional[tuple[ + ibis.objects.BarDataList, + np.ndarray, + datetime, + datetime, + ]] = None + result_ready = trio.Event() - if end_dt: - last_dt = pendulum.from_timestamp(end_dt.timestamp()) - - timeout: float = float('inf') - async with trio.open_nursery() as nurse: - for _ in range(10): + async def query(): + nonlocal result, data_cs, end_dt + while True: try: - out = None - with trio.move_on_after(timeout) as cs: - out = await proxy.bars( - fqsn=fqsn, - end_dt=end_dt, - sample_period_s=timeframe, - ) - timeout = 3 + out = await proxy.bars( + fqsn=fqsn, + end_dt=end_dt, + sample_period_s=timeframe, - if ( - cs.cancelled_caught - and out is None - ): - print(f"RESETTING DATA after {timeout}") - await nurse.start( - wait_on_data_reset, - proxy, - timeout=float('inf'), - tries=100, - ) - # scale timeout up exponentially to avoid - # request-overruning the history farm. - # timeout *= 2 - continue - - if out: - bars, bars_array = out - - else: + # ideally we cancel the request just before we + # cancel on the ``trio``-side and trigger a data + # reset hack.. the problem is there's no way (with + # current impl) to detect a cancel case. + # timeout=timeout, + ) + if out is None: raise NoData( f'{end_dt}', # frame_size=2000, ) + bars, bars_array = out + + if not bars: + # TODO: duration lookup for this + end_dt = end_dt.subtract(days=1) + print("SUBTRACTING DAY") + continue + if bars_array is None: raise SymbolNotFound(fqsn) @@ -317,10 +306,18 @@ async def get_bars( assert time[-1] == last_dt.timestamp() assert time[0] == first_dt.timestamp() log.info( - f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' + f'{len(bars)} bars retreived {first_dt} -> {last_dt}' ) - return (bars, bars_array, first_dt, last_dt), fails + if data_cs: + data_cs.cancel() + + result = (bars, bars_array, first_dt, last_dt) + + # signal data reset loop parent task + result_ready.set() + + return result except RequestError as err: msg = err.message @@ -345,14 +342,20 @@ async def get_bars( ) # try to decrement start point and look further back - # end_dt = last_dt = last_dt.subtract(seconds=2000) + # end_dt = end_dt.subtract(seconds=2000) + end_dt = end_dt.subtract(days=1) + print("SUBTRACTING DAY") + continue - raise NoData( - f'Symbol: {fqsn}', - # TODO: fix this since we don't strictly use 1s - # ohlc any more XD - frame_size=2000, + elif ( + err.code == 162 and + 'API historical data query cancelled' in err.message + ): + log.warning( + 'Query cancelled by IB (:eyeroll:):\n' + f'{err.message}' ) + continue # elif ( # err.code == 162 and @@ -362,103 +365,58 @@ async def get_bars( # log.warning("ignoring ip address warning") # continue + # XXX: more or less same as above timeout case elif _pacing in msg: log.warning( 'History throttle rate reached!\n' 'Resetting farms with `ctrl-alt-f` hack\n' ) - await wait_on_data_reset(proxy) + + # cancel any existing reset task + if data_cs: + data_cs.cancel() + + # spawn new data reset task + data_cs, reset_done = await nurse.start( + partial( + wait_on_data_reset, + proxy, + timeout=float('inf'), + reset_type='connection' + ) + ) + continue else: raise - return None, None - # else: # throttle wasn't fixed so error out immediately - # raise _err + async with trio.open_nursery() as nurse: + # start history request that we allow + # to run indefinitely until a result is acquired + nurse.start_soon(query) -async def backfill_bars( + # start history reset loop which waits up to the timeout + # for a result before triggering a data feed reset. + while not result_ready.is_set(): - fqsn: str, - shm: ShmArray, # type: ignore # noqa - timeframe: float = 1, # in seconds + with trio.move_on_after(timeout): + await result_ready.wait() + continue - # TODO: we want to avoid overrunning the underlying shm array buffer - # and we should probably calc the number of calls to make depending - # on that until we have the `marketstore` daemon in place in which - # case the shm size will be driven by user config and available sys - # memory. - count: int = 16, - - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - - TODO: avoid pacing constraints: - https://github.com/pikers/piker/issues/128 - - ''' - # last_dt1 = None - last_dt = None - - with trio.CancelScope() as cs: - - async with open_data_client() as proxy: - - out, fails = await get_bars(proxy, fqsn, timeframe) - - if out is None: - raise RuntimeError("Could not pull currrent history?!") - - (first_bars, bars_array, first_dt, last_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - last_dt = first_dt - - # write historical data to buffer - shm.push(bars_array) - - task_status.started(cs) - - i = 0 - while i < count: - - out, fails = await get_bars( + # spawn new data reset task + data_cs, reset_done = await nurse.start( + partial( + wait_on_data_reset, proxy, - fqsn, - timeframe, - end_dt=first_dt, + timeout=float('inf'), + # timeout=timeout, ) + ) + # sync wait on reset to complete + await reset_done.wait() - if out is None: - # could be trying to retreive bars over weekend - # TODO: add logic here to handle tradable hours and - # only grab valid bars in the range - log.error(f"Can't grab bars starting at {first_dt}!?!?") - - # XXX: get_bars() should internally decrement dt by - # 2k seconds and try again. - continue - - (first_bars, bars_array, first_dt, last_dt) = out - # last_dt1 = last_dt - # last_dt = first_dt - - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - - # TODO we should probably dig into forums to see what peeps - # think this data "means" and then use it as an indicator of - # sorts? dinkus has mentioned that $vlms for the day dont' - # match other platforms nor the summary stat tws shows in - # the monitor - it's probably worth investigating. - - shm.push(bars_array, prepend=True) - i += 1 + return result asset_type_map = {