diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 30e36f2e..d35c2578 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -50,6 +50,7 @@ class SymbolNotFound(BrokerError): "Symbol not found by broker search" +# TODO: these should probably be moved to `.tsp/.data`? class NoData(BrokerError): ''' Symbol data not permitted or no data @@ -59,14 +60,15 @@ class NoData(BrokerError): def __init__( self, *args, - frame_size: int = 1000, + info: dict, ) -> None: super().__init__(*args) + self.info: dict = info # when raised, machinery can check if the backend # set a "frame size" for doing datetime calcs. - self.frame_size: int = 1000 + # self.frame_size: int = 1000 class DataUnavailable(BrokerError): diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 431c889f..2fe540bd 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -41,7 +41,6 @@ import time from typing import ( Any, Callable, - Union, ) from types import SimpleNamespace @@ -312,8 +311,8 @@ class Client: fqme: str, # EST in ISO 8601 format is required... below is EPOCH - start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", - end_dt: Union[datetime, str] = "", + start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00", + end_dt: datetime | str = "", # ohlc sample period in seconds sample_period_s: int = 1, @@ -339,17 +338,13 @@ class Client: default_dt_duration, ) = _samplings[sample_period_s] - dt_duration: DateTime = ( + dt_duration: Duration = ( duration or default_dt_duration ) + # TODO: maybe remove all this? global _enters - log.info( - f"REQUESTING {ib_duration_str}'s worth {bar_size} BARS\n" - f'{_enters} @ end={end_dt}"' - ) - if not end_dt: end_dt = '' @@ -358,8 +353,8 @@ class Client: contract: Contract = (await self.find_contracts(fqme))[0] bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) - bars = await self.ib.reqHistoricalDataAsync( - contract, + kwargs: dict[str, Any] = dict( + contract=contract, endDateTime=end_dt, formatDate=2, @@ -381,17 +376,38 @@ class Client: # whatToShow='MIDPOINT', # whatToShow='TRADES', ) + log.info( + f'REQUESTING {ib_duration_str} worth {bar_size} BARS\n' + f'fqme: {fqme}\n' + f'global _enters: {_enters}\n' + f'kwargs: {pformat(kwargs)}\n' + ) + + bars = await self.ib.reqHistoricalDataAsync( + **kwargs, + ) # tail case if no history for range or none prior. if not bars: - # NOTE: there's 2 cases here to handle (and this should be - # read alongside the implementation of - # ``.reqHistoricalDataAsync()``): - # - no data is returned for the period likely due to - # a weekend, holiday or other non-trading period prior to - # ``end_dt`` which exceeds the ``duration``, + # NOTE: there's actually 3 cases here to handle (and + # this should be read alongside the implementation of + # `.reqHistoricalDataAsync()`): # - a timeout occurred in which case insync internals return - # an empty list thing with bars.clear()... + # an empty list thing with bars.clear()... + # - no data exists for the period likely due to + # a weekend, holiday or other non-trading period prior to + # ``end_dt`` which exceeds the ``duration``, + # - LITERALLY this is the start of the mkt's history! + + + # sync requester for debugging empty frame cases + def get_hist(): + return self.ib.reqHistoricalData(**kwargs) + + assert get_hist + import pdbp + pdbp.set_trace() + return [], np.empty(0), dt_duration # TODO: we could maybe raise ``NoData`` instead if we # rewrite the method in the first case? right now there's no @@ -444,7 +460,7 @@ class Client: r_bars.extend(bars) bars = r_bars - nparr = bars_to_np(bars) + nparr: np.ndarray = bars_to_np(bars) # timestep should always be at least as large as the # period step. @@ -457,9 +473,17 @@ class Client: 'time steps which are shorter then expected?!"' ) # OOF: this will break teardown? + # -[ ] check if it's greenback + # -[ ] why tf are we leaking shm entries.. + # -[ ] make a test on the debugging asyncio testing + # branch.. # breakpoint() - return bars, nparr, dt_duration + return ( + bars, + nparr, + dt_duration, + ) async def con_deats( self, @@ -803,6 +827,23 @@ class Client: return contracts + async def maybe_get_head_time( + self, + fqme: str, + + ) -> datetime | None: + ''' + Return the first datetime stamp for `fqme` or `None` + on request failure. + + ''' + try: + head_dt: datetime = await self.get_head_time(fqme=fqme) + return head_dt + except RequestError: + log.warning(f'Unable to get head time: {fqme} ?') + return None + async def get_head_time( self, fqme: str, @@ -1391,7 +1432,7 @@ class MethodProxy: self, pattern: str, - ) -> Union[dict[str, Any], trio.Event]: + ) -> dict[str, Any] | trio.Event: ev = self.event_table.get(pattern) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index c6ed6350..005d9046 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -37,7 +37,13 @@ from typing import ( from async_generator import aclosing import ib_insync as ibis import numpy as np -import pendulum +from pendulum import ( + now, + from_timestamp, + # DateTime, + Duration, + duration as mk_duration, +) import tractor import trio from trio_typing import TaskStatus @@ -46,10 +52,9 @@ from piker.accounting import ( MktPair, ) from piker.data.validate import FeedInit -from .._util import ( +from piker.brokers._util import ( NoData, DataUnavailable, - SymbolNotFound, ) from .api import ( # _adhoc_futes_set, @@ -160,13 +165,13 @@ async def open_history_client( head_dt: None | datetime = None if ( # fx cons seem to not provide this endpoint? + # TODO: guard against all contract types which don't + # support it? 'idealpro' not in fqme ): - try: - head_dt = await proxy.get_head_time(fqme=fqme) - except RequestError: - log.warning(f'Unable to get head time: {fqme} ?') - pass + head_dt: datetime | None = await proxy.maybe_get_head_time( + fqme=fqme + ) async def get_hist( timeframe: float, @@ -206,17 +211,26 @@ async def open_history_client( # could be trying to retreive bars over weekend if out is None: log.error(f"Can't grab bars starting at {end_dt}!?!?") - raise NoData( - f'{end_dt}', - # frame_size=2000, - ) + if ( + end_dt + and head_dt + and end_dt <= head_dt + ): + raise DataUnavailable( + f'First timestamp is {head_dt}\n' + f'But {end_dt} was requested..' + ) - if ( - end_dt - and head_dt - and end_dt <= head_dt - ): - raise DataUnavailable(f'First timestamp is {head_dt}') + else: + raise NoData( + info={ + 'fqme': fqme, + 'head_dt': head_dt, + 'start_dt': start_dt, + 'end_dt': end_dt, + 'timedout': timedout, + }, + ) # also see return type for `get_bars()` bars: ibis.objects.BarDataList @@ -249,7 +263,18 @@ async def open_history_client( # quite sure why.. needs some tinkering and probably # a lookthrough of the ``ib_insync`` machinery, for eg. maybe # we have to do the batch queries on the `asyncio` side? - yield get_hist, {'erlangs': 1, 'rate': 3} + yield ( + get_hist, + { + 'erlangs': 1, # max conc reqs + 'rate': 3, # max req rate + 'frame_types': { # expected frame sizes + 1: mk_duration(seconds=2e3), + 60: mk_duration(days=2), + } + + }, + ) _pacing: str = ( @@ -394,7 +419,11 @@ async def get_bars( while _failed_resets < max_failed_resets: try: - out = await proxy.bars( + ( + bars, + bars_array, + dt_duration, + ) = await proxy.bars( fqme=fqme, end_dt=end_dt, sample_period_s=timeframe, @@ -405,13 +434,6 @@ async def get_bars( # current impl) to detect a cancel case. # timeout=timeout, ) - if out is None: - raise NoData(f'{end_dt}') - - bars, bars_array, dt_duration = out - - if bars_array is None: - raise SymbolNotFound(fqme) # not enough bars signal, likely due to venue # operational gaps. @@ -425,11 +447,16 @@ async def get_bars( f'end_dt: {end_dt}\n' f'duration: {dt_duration}\n' ) - raise NoData(f'{end_dt}') + result = None + return None + # raise NoData( + # f'{fqme}\n' + # f'end_dt:{end_dt}\n' + # ) else: dur_s: float = len(bars) * timeframe - bars_dur = pendulum.Duration(seconds=dur_s) + bars_dur = Duration(seconds=dur_s) dt_dur_s: float = dt_duration.in_seconds() if dur_s < dt_dur_s: log.warning( @@ -459,10 +486,10 @@ async def get_bars( # continue # await tractor.pause() - first_dt = pendulum.from_timestamp( + first_dt = from_timestamp( bars[0].date.timestamp()) - last_dt = pendulum.from_timestamp( + last_dt = from_timestamp( bars[-1].date.timestamp()) time = bars_array['time'] @@ -475,6 +502,7 @@ async def get_bars( if data_cs: data_cs.cancel() + # NOTE: setting this is critical! result = ( bars, # ib native bars_array, # numpy @@ -485,6 +513,7 @@ async def get_bars( # signal data reset loop parent task result_ready.set() + # NOTE: this isn't getting collected anywhere! return result except RequestError as err: @@ -510,7 +539,7 @@ async def get_bars( if end_dt is not None: end_dt = end_dt.subtract(days=1) elif end_dt is None: - end_dt = pendulum.now().subtract(days=1) + end_dt = now().subtract(days=1) log.warning( f'NO DATA found ending @ {end_dt}\n'