From 75ff3921b6a0decaf7352d231c6ec98e44524caf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Jun 2023 19:34:34 -0400 Subject: [PATCH] ib: fix mega borked hist queries on gappy assets Explains why stuff always seemed wrong before XD Previously whenever a time-gappy asset (like a stock due to it's venue operating hours) was being loaded, we weren't querying for a "durations worth" of bars and this was causing all sorts of actual gaps in our data set that shouldn't exist.. Fix that by always attempting to retrieve a min aggregate-time's worth/duration of bars/datums in the history manager. Actually, i implemented this in both the feed and api layers for this backend since it doesn't seem to strictly work just implementing it at the `Client.bars()` level, not sure why but.. Also, buncha `ruff` linting cleanups and fix the logger nameeee, lel. --- piker/brokers/ib/_util.py | 14 +++------- piker/brokers/ib/api.py | 34 +++++++++++++++++++++--- piker/brokers/ib/feed.py | 54 +++++++++++++++++++++++++++------------ 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 585ea18d..5e64ab0b 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -21,22 +21,14 @@ runnable script-programs. ''' from __future__ import annotations from functools import partial -from typing import ( - Literal, - TYPE_CHECKING, -) +from typing import Literal import subprocess import tractor -from .._util import log - -if TYPE_CHECKING: - from .api import ( - MethodProxy, - ib_Client - ) +from .._util import get_logger +log = get_logger('piker.brokers.ib') _reset_tech: Literal[ 'vnc', diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index a5069e95..5d49b14e 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -423,7 +423,7 @@ class Client: # optional "duration of time" equal to the # length of the returned history frame. - duration: Optional[str] = None, + duration: str | None = None, **kwargs, @@ -475,6 +475,8 @@ class Client: # whatToShow='MIDPOINT', # whatToShow='TRADES', ) + + # tail case if no history for range or none prior. if not bars: # NOTE: there's 2 cases here to handle (and this should be # read alongside the implementation of @@ -489,6 +491,32 @@ class Client: # rewrite the method in the first case? right now there's no # way to detect a timeout. + # NOTE XXX: ensure minimum duration in bars B) + # => we recursively call this method until we get at least + # as many bars such that they sum in aggregate to the the + # desired total time (duration) at most. + elif ( + end_dt + and ( + (len(bars) * sample_period_s) < dt_duration.in_seconds() + ) + ): + log.warning( + f'Recursing to get more bars from {end_dt} for {dt_duration}' + ) + end_dt -= dt_duration + ( + r_bars, + r_arr, + r_duration, + ) = await self.bars( + fqme, + start_dt=start_dt, + end_dt=end_dt, + ) + r_bars.extend(bars) + bars = r_bars + nparr = bars_to_np(bars) return bars, nparr, dt_duration @@ -921,7 +949,7 @@ class Client: done, pending = await asyncio.wait( [ready], - timeout=0.1, + timeout=0.01, ) if ready in done: break @@ -1401,7 +1429,7 @@ async def open_client_proxies() -> tuple[ # TODO: maybe this should be the default in tractor? key=tractor.current_actor().uid, - ) as (cache_hit, (clients, from_aio)), + ) as (cache_hit, (clients, _)), AsyncExitStack() as stack ): diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index b4edae17..d855539a 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -30,8 +30,8 @@ from functools import partial from math import isnan import time from typing import ( + Any, Callable, - Optional, Awaitable, ) @@ -180,8 +180,8 @@ async def open_history_client( async def get_hist( timeframe: float, - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, + end_dt: datetime | None = None, + start_dt: datetime | None = None, ) -> tuple[np.ndarray, str]: nonlocal max_timeout, mean, count @@ -192,6 +192,7 @@ async def open_history_client( fqme, timeframe, end_dt=end_dt, + start_dt=start_dt, ) latency = time.time() - query_start if ( @@ -325,6 +326,7 @@ async def wait_on_data_reset( _data_resetter_task: trio.Task | None = None _failed_resets: int = 0 + async def get_bars( proxy: MethodProxy, @@ -333,6 +335,7 @@ async def get_bars( # blank to start which tells ib to look up the latest datum end_dt: str = '', + start_dt: str | None = '', # TODO: make this more dynamic based on measured frame rx latency? # how long before we trigger a feed reset (seconds) @@ -387,15 +390,31 @@ async def get_bars( bars, bars_array, dt_duration = out + # not enough bars signal, likely due to venue + # operational gaps. + too_little: bool = False if ( - not bars - and end_dt - ): - log.warning( - f'History is blank for {dt_duration} from {end_dt}' + end_dt + and ( + not bars + or (too_little := + start_dt + and (len(bars) * timeframe) + < dt_duration.in_seconds() + ) ) - end_dt -= dt_duration - continue + ): + if ( + end_dt + or too_little + ): + log.warning( + f'History is blank for {dt_duration} from {end_dt}' + ) + end_dt -= dt_duration + continue + + raise NoData(f'{end_dt}') if bars_array is None: raise SymbolNotFound(fqme) @@ -544,6 +563,7 @@ async def get_bars( await reset_done.wait() _data_resetter_task = None if unset_resetter else _data_resetter_task + assert result return result, data_cs is not None @@ -602,13 +622,12 @@ async def _setup_quote_stream( ''' global _quote_streams - to_trio.send_nowait(None) - async with load_aio_clients( disconnect_on_exit=False, ) as accts2clients: caccount_name, client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) + to_trio.send_nowait(contract) # cuz why not ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) # NOTE: it's batch-wise and slow af but I guess could @@ -700,7 +719,9 @@ async def open_aio_quote_stream( symbol=symbol, contract=contract, - ) as (first, from_aio): + ) as (contract, from_aio): + + assert contract # cache feed for later consumers _quote_streams[symbol] = from_aio @@ -783,7 +804,6 @@ async def get_mkt_info( # bs_fqme, _, broker = fqme.partition('.') proxy: MethodProxy - get_details: bool = False if proxy is not None: client_ctx = nullcontext(proxy) else: @@ -800,7 +820,6 @@ async def get_mkt_info( raise # TODO: more consistent field translation - init_info: dict = {} atype = _asset_type_map[con.secType] if atype == 'commodity': @@ -912,7 +931,8 @@ async def stream_quotes( con: Contract = details.contract first_ticker: Ticker = await proxy.get_quote(contract=con) first_quote: dict = normalize(first_ticker) - log.runtime(f'FIRST QUOTE: {first_quote}') + + log.warning(f'FIRST QUOTE: {first_quote}') # TODO: we should instead spawn a task that waits on a feed to start # and let it wait indefinitely..instead of this hard coded stuff. @@ -1045,7 +1065,7 @@ async def open_symbol_search( await ctx.started({}) async with ( - open_client_proxies() as (proxies, clients), + open_client_proxies() as (proxies, _), open_data_client() as data_proxy, ): async with ctx.open_stream() as stream: