Make `ib` failed history requests more debug-able

Been hitting wayy too many cases like this so, finally put my foot down
and stuck in a buncha helper code to figure why (especially for gappy
ass pennies) this can/is happening XD

inside the `.ib.api.Client()`:
- in `.bars()` pack all `.reqHistoricalDataAsync()` kwargs into a dict such that
  wen/if we rx a blank frame we can enter pdb and make sync calls using
  a little `get_hist()` closure from the REPL.
  - tidy up type annots a bit too.
- add a new `.maybe_get_head_time()` meth which will return `None` when
  the dt can't be retrieved for the contract.

inside `.feed.open_history_client()`:
- use new `Client.maybe_get_head_time()` and only do `DataUnavailable`
  raises when the request `end_dt` is actually earlier.
- when `get_bars()` returns a `None` and the `head_dt` is not earlier
  then the `end_dt` submitted, raise a `NoData` with more `.info: dict`.
- deliver a new `frame_types: dict[int, pendulum.Duration]` as part
  of the yielded `config: dict`.
- in `.get_bars()` always assume a `tuple` returned from
  `Client.bars()`.
  - return a `None` on empty frames instead of raising `NoData` at this
    call frame.
- do more explicit imports from `pendulum` for brevity.

inside `.brokers._util`:
- make `NoData` take an `info: dict` as input to allow backends to pack
  in empty frame meta-data for (eventual) use in the tsp back-filling
  layer.
distribute_dis
Tyler Goodlet 2023-12-29 21:46:39 -05:00
parent c82ca812a8
commit 9be29a707d
3 changed files with 127 additions and 55 deletions

View File

@ -50,6 +50,7 @@ class SymbolNotFound(BrokerError):
"Symbol not found by broker search" "Symbol not found by broker search"
# TODO: these should probably be moved to `.tsp/.data`?
class NoData(BrokerError): class NoData(BrokerError):
''' '''
Symbol data not permitted or no data Symbol data not permitted or no data
@ -59,14 +60,15 @@ class NoData(BrokerError):
def __init__( def __init__(
self, self,
*args, *args,
frame_size: int = 1000, info: dict,
) -> None: ) -> None:
super().__init__(*args) super().__init__(*args)
self.info: dict = info
# when raised, machinery can check if the backend # when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs. # set a "frame size" for doing datetime calcs.
self.frame_size: int = 1000 # self.frame_size: int = 1000
class DataUnavailable(BrokerError): class DataUnavailable(BrokerError):

View File

@ -41,7 +41,6 @@ import time
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Union,
) )
from types import SimpleNamespace from types import SimpleNamespace
@ -312,8 +311,8 @@ class Client:
fqme: str, fqme: str,
# EST in ISO 8601 format is required... below is EPOCH # EST in ISO 8601 format is required... below is EPOCH
start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00",
end_dt: Union[datetime, str] = "", end_dt: datetime | str = "",
# ohlc sample period in seconds # ohlc sample period in seconds
sample_period_s: int = 1, sample_period_s: int = 1,
@ -339,17 +338,13 @@ class Client:
default_dt_duration, default_dt_duration,
) = _samplings[sample_period_s] ) = _samplings[sample_period_s]
dt_duration: DateTime = ( dt_duration: Duration = (
duration duration
or default_dt_duration or default_dt_duration
) )
# TODO: maybe remove all this?
global _enters global _enters
log.info(
f"REQUESTING {ib_duration_str}'s worth {bar_size} BARS\n"
f'{_enters} @ end={end_dt}"'
)
if not end_dt: if not end_dt:
end_dt = '' end_dt = ''
@ -358,8 +353,8 @@ class Client:
contract: Contract = (await self.find_contracts(fqme))[0] contract: Contract = (await self.find_contracts(fqme))[0]
bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
bars = await self.ib.reqHistoricalDataAsync( kwargs: dict[str, Any] = dict(
contract, contract=contract,
endDateTime=end_dt, endDateTime=end_dt,
formatDate=2, formatDate=2,
@ -381,17 +376,38 @@ class Client:
# whatToShow='MIDPOINT', # whatToShow='MIDPOINT',
# whatToShow='TRADES', # whatToShow='TRADES',
) )
log.info(
f'REQUESTING {ib_duration_str} worth {bar_size} BARS\n'
f'fqme: {fqme}\n'
f'global _enters: {_enters}\n'
f'kwargs: {pformat(kwargs)}\n'
)
bars = await self.ib.reqHistoricalDataAsync(
**kwargs,
)
# tail case if no history for range or none prior. # tail case if no history for range or none prior.
if not bars: if not bars:
# NOTE: there's 2 cases here to handle (and this should be # NOTE: there's actually 3 cases here to handle (and
# read alongside the implementation of # this should be read alongside the implementation of
# ``.reqHistoricalDataAsync()``): # `.reqHistoricalDataAsync()`):
# - no data is returned for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - a timeout occurred in which case insync internals return # - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()... # an empty list thing with bars.clear()...
# - no data exists for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - LITERALLY this is the start of the mkt's history!
# sync requester for debugging empty frame cases
def get_hist():
return self.ib.reqHistoricalData(**kwargs)
assert get_hist
import pdbp
pdbp.set_trace()
return [], np.empty(0), dt_duration return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we # TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no # rewrite the method in the first case? right now there's no
@ -444,7 +460,7 @@ class Client:
r_bars.extend(bars) r_bars.extend(bars)
bars = r_bars bars = r_bars
nparr = bars_to_np(bars) nparr: np.ndarray = bars_to_np(bars)
# timestep should always be at least as large as the # timestep should always be at least as large as the
# period step. # period step.
@ -457,9 +473,17 @@ class Client:
'time steps which are shorter then expected?!"' 'time steps which are shorter then expected?!"'
) )
# OOF: this will break teardown? # OOF: this will break teardown?
# -[ ] check if it's greenback
# -[ ] why tf are we leaking shm entries..
# -[ ] make a test on the debugging asyncio testing
# branch..
# breakpoint() # breakpoint()
return bars, nparr, dt_duration return (
bars,
nparr,
dt_duration,
)
async def con_deats( async def con_deats(
self, self,
@ -803,6 +827,23 @@ class Client:
return contracts return contracts
async def maybe_get_head_time(
self,
fqme: str,
) -> datetime | None:
'''
Return the first datetime stamp for `fqme` or `None`
on request failure.
'''
try:
head_dt: datetime = await self.get_head_time(fqme=fqme)
return head_dt
except RequestError:
log.warning(f'Unable to get head time: {fqme} ?')
return None
async def get_head_time( async def get_head_time(
self, self,
fqme: str, fqme: str,
@ -1391,7 +1432,7 @@ class MethodProxy:
self, self,
pattern: str, pattern: str,
) -> Union[dict[str, Any], trio.Event]: ) -> dict[str, Any] | trio.Event:
ev = self.event_table.get(pattern) ev = self.event_table.get(pattern)

View File

@ -37,7 +37,13 @@ from typing import (
from async_generator import aclosing from async_generator import aclosing
import ib_insync as ibis import ib_insync as ibis
import numpy as np import numpy as np
import pendulum from pendulum import (
now,
from_timestamp,
# DateTime,
Duration,
duration as mk_duration,
)
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -46,10 +52,9 @@ from piker.accounting import (
MktPair, MktPair,
) )
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from .._util import ( from piker.brokers._util import (
NoData, NoData,
DataUnavailable, DataUnavailable,
SymbolNotFound,
) )
from .api import ( from .api import (
# _adhoc_futes_set, # _adhoc_futes_set,
@ -160,13 +165,13 @@ async def open_history_client(
head_dt: None | datetime = None head_dt: None | datetime = None
if ( if (
# fx cons seem to not provide this endpoint? # fx cons seem to not provide this endpoint?
# TODO: guard against all contract types which don't
# support it?
'idealpro' not in fqme 'idealpro' not in fqme
): ):
try: head_dt: datetime | None = await proxy.maybe_get_head_time(
head_dt = await proxy.get_head_time(fqme=fqme) fqme=fqme
except RequestError: )
log.warning(f'Unable to get head time: {fqme} ?')
pass
async def get_hist( async def get_hist(
timeframe: float, timeframe: float,
@ -206,17 +211,26 @@ async def open_history_client(
# could be trying to retreive bars over weekend # could be trying to retreive bars over weekend
if out is None: if out is None:
log.error(f"Can't grab bars starting at {end_dt}!?!?") log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(
f'{end_dt}',
# frame_size=2000,
)
if ( if (
end_dt end_dt
and head_dt and head_dt
and end_dt <= head_dt and end_dt <= head_dt
): ):
raise DataUnavailable(f'First timestamp is {head_dt}') raise DataUnavailable(
f'First timestamp is {head_dt}\n'
f'But {end_dt} was requested..'
)
else:
raise NoData(
info={
'fqme': fqme,
'head_dt': head_dt,
'start_dt': start_dt,
'end_dt': end_dt,
'timedout': timedout,
},
)
# also see return type for `get_bars()` # also see return type for `get_bars()`
bars: ibis.objects.BarDataList bars: ibis.objects.BarDataList
@ -249,7 +263,18 @@ async def open_history_client(
# quite sure why.. needs some tinkering and probably # quite sure why.. needs some tinkering and probably
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe # a lookthrough of the ``ib_insync`` machinery, for eg. maybe
# we have to do the batch queries on the `asyncio` side? # we have to do the batch queries on the `asyncio` side?
yield get_hist, {'erlangs': 1, 'rate': 3} yield (
get_hist,
{
'erlangs': 1, # max conc reqs
'rate': 3, # max req rate
'frame_types': { # expected frame sizes
1: mk_duration(seconds=2e3),
60: mk_duration(days=2),
}
},
)
_pacing: str = ( _pacing: str = (
@ -394,7 +419,11 @@ async def get_bars(
while _failed_resets < max_failed_resets: while _failed_resets < max_failed_resets:
try: try:
out = await proxy.bars( (
bars,
bars_array,
dt_duration,
) = await proxy.bars(
fqme=fqme, fqme=fqme,
end_dt=end_dt, end_dt=end_dt,
sample_period_s=timeframe, sample_period_s=timeframe,
@ -405,13 +434,6 @@ async def get_bars(
# current impl) to detect a cancel case. # current impl) to detect a cancel case.
# timeout=timeout, # timeout=timeout,
) )
if out is None:
raise NoData(f'{end_dt}')
bars, bars_array, dt_duration = out
if bars_array is None:
raise SymbolNotFound(fqme)
# not enough bars signal, likely due to venue # not enough bars signal, likely due to venue
# operational gaps. # operational gaps.
@ -425,11 +447,16 @@ async def get_bars(
f'end_dt: {end_dt}\n' f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n' f'duration: {dt_duration}\n'
) )
raise NoData(f'{end_dt}') result = None
return None
# raise NoData(
# f'{fqme}\n'
# f'end_dt:{end_dt}\n'
# )
else: else:
dur_s: float = len(bars) * timeframe dur_s: float = len(bars) * timeframe
bars_dur = pendulum.Duration(seconds=dur_s) bars_dur = Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds() dt_dur_s: float = dt_duration.in_seconds()
if dur_s < dt_dur_s: if dur_s < dt_dur_s:
log.warning( log.warning(
@ -459,10 +486,10 @@ async def get_bars(
# continue # continue
# await tractor.pause() # await tractor.pause()
first_dt = pendulum.from_timestamp( first_dt = from_timestamp(
bars[0].date.timestamp()) bars[0].date.timestamp())
last_dt = pendulum.from_timestamp( last_dt = from_timestamp(
bars[-1].date.timestamp()) bars[-1].date.timestamp())
time = bars_array['time'] time = bars_array['time']
@ -475,6 +502,7 @@ async def get_bars(
if data_cs: if data_cs:
data_cs.cancel() data_cs.cancel()
# NOTE: setting this is critical!
result = ( result = (
bars, # ib native bars, # ib native
bars_array, # numpy bars_array, # numpy
@ -485,6 +513,7 @@ async def get_bars(
# signal data reset loop parent task # signal data reset loop parent task
result_ready.set() result_ready.set()
# NOTE: this isn't getting collected anywhere!
return result return result
except RequestError as err: except RequestError as err:
@ -510,7 +539,7 @@ async def get_bars(
if end_dt is not None: if end_dt is not None:
end_dt = end_dt.subtract(days=1) end_dt = end_dt.subtract(days=1)
elif end_dt is None: elif end_dt is None:
end_dt = pendulum.now().subtract(days=1) end_dt = now().subtract(days=1)
log.warning( log.warning(
f'NO DATA found ending @ {end_dt}\n' f'NO DATA found ending @ {end_dt}\n'